Spaces:
Runtime error
Runtime error
"""Util that calls GitHub.""" | |
from __future__ import annotations | |
import json | |
from typing import TYPE_CHECKING, Any, Dict, List, Optional | |
import requests | |
from langchain_core.pydantic_v1 import BaseModel, Extra, root_validator | |
from langchain_core.utils import get_from_dict_or_env | |
if TYPE_CHECKING: | |
from github.Issue import Issue | |
from github.PullRequest import PullRequest | |
def _import_tiktoken() -> Any: | |
"""Import tiktoken.""" | |
try: | |
import tiktoken | |
except ImportError: | |
raise ImportError( | |
"tiktoken is not installed. " | |
"Please install it with `pip install tiktoken`" | |
) | |
return tiktoken | |
class GitHubAPIWrapper(BaseModel): | |
"""Wrapper for GitHub API.""" | |
github: Any #: :meta private: | |
github_repo_instance: Any #: :meta private: | |
github_repository: Optional[str] = None | |
github_app_id: Optional[str] = None | |
github_app_private_key: Optional[str] = None | |
active_branch: Optional[str] = None | |
github_base_branch: Optional[str] = None | |
class Config: | |
"""Configuration for this pydantic object.""" | |
extra = Extra.forbid | |
def validate_environment(cls, values: Dict) -> Dict: | |
"""Validate that api key and python package exists in environment.""" | |
github_repository = get_from_dict_or_env( | |
values, "github_repository", "GITHUB_REPOSITORY" | |
) | |
github_app_id = get_from_dict_or_env(values, "github_app_id", "GITHUB_APP_ID") | |
github_app_private_key = get_from_dict_or_env( | |
values, "github_app_private_key", "GITHUB_APP_PRIVATE_KEY" | |
) | |
try: | |
from github import Auth, GithubIntegration | |
except ImportError: | |
raise ImportError( | |
"PyGithub is not installed. " | |
"Please install it with `pip install PyGithub`" | |
) | |
try: | |
# interpret the key as a file path | |
# fallback to interpreting as the key itself | |
with open(github_app_private_key, "r") as f: | |
private_key = f.read() | |
except Exception: | |
private_key = github_app_private_key | |
auth = Auth.AppAuth( | |
github_app_id, | |
private_key, | |
) | |
gi = GithubIntegration(auth=auth) | |
installation = gi.get_installations() | |
if not installation: | |
raise ValueError( | |
f"Please make sure to install the created github app with id " | |
f"{github_app_id} on the repo: {github_repository}" | |
"More instructions can be found at " | |
"https://docs.github.com/en/apps/using-" | |
"github-apps/installing-your-own-github-app" | |
) | |
try: | |
installation = installation[0] | |
except ValueError as e: | |
raise ValueError( | |
"Please make sure to give correct github parameters " | |
f"Error message: {e}" | |
) | |
# create a GitHub instance: | |
g = installation.get_github_for_installation() | |
repo = g.get_repo(github_repository) | |
github_base_branch = get_from_dict_or_env( | |
values, | |
"github_base_branch", | |
"GITHUB_BASE_BRANCH", | |
default=repo.default_branch, | |
) | |
active_branch = get_from_dict_or_env( | |
values, | |
"active_branch", | |
"ACTIVE_BRANCH", | |
default=repo.default_branch, | |
) | |
values["github"] = g | |
values["github_repo_instance"] = repo | |
values["github_repository"] = github_repository | |
values["github_app_id"] = github_app_id | |
values["github_app_private_key"] = github_app_private_key | |
values["active_branch"] = active_branch | |
values["github_base_branch"] = github_base_branch | |
return values | |
def parse_issues(self, issues: List[Issue]) -> List[dict]: | |
""" | |
Extracts title and number from each Issue and puts them in a dictionary | |
Parameters: | |
issues(List[Issue]): A list of Github Issue objects | |
Returns: | |
List[dict]: A dictionary of issue titles and numbers | |
""" | |
parsed = [] | |
for issue in issues: | |
title = issue.title | |
number = issue.number | |
opened_by = issue.user.login if issue.user else None | |
issue_dict = {"title": title, "number": number} | |
if opened_by is not None: | |
issue_dict["opened_by"] = opened_by | |
parsed.append(issue_dict) | |
return parsed | |
def parse_pull_requests(self, pull_requests: List[PullRequest]) -> List[dict]: | |
""" | |
Extracts title and number from each Issue and puts them in a dictionary | |
Parameters: | |
issues(List[Issue]): A list of Github Issue objects | |
Returns: | |
List[dict]: A dictionary of issue titles and numbers | |
""" | |
parsed = [] | |
for pr in pull_requests: | |
parsed.append( | |
{ | |
"title": pr.title, | |
"number": pr.number, | |
"commits": str(pr.commits), | |
"comments": str(pr.comments), | |
} | |
) | |
return parsed | |
def get_issues(self) -> str: | |
""" | |
Fetches all open issues from the repo excluding pull requests | |
Returns: | |
str: A plaintext report containing the number of issues | |
and each issue's title and number. | |
""" | |
issues = self.github_repo_instance.get_issues(state="open") | |
# Filter out pull requests (part of GH issues object) | |
issues = [issue for issue in issues if not issue.pull_request] | |
if issues: | |
parsed_issues = self.parse_issues(issues) | |
parsed_issues_str = ( | |
"Found " + str(len(parsed_issues)) + " issues:\n" + str(parsed_issues) | |
) | |
return parsed_issues_str | |
else: | |
return "No open issues available" | |
def list_open_pull_requests(self) -> str: | |
""" | |
Fetches all open PRs from the repo | |
Returns: | |
str: A plaintext report containing the number of PRs | |
and each PR's title and number. | |
""" | |
# issues = self.github_repo_instance.get_issues(state="open") | |
pull_requests = self.github_repo_instance.get_pulls(state="open") | |
if pull_requests.totalCount > 0: | |
parsed_prs = self.parse_pull_requests(pull_requests) | |
parsed_prs_str = ( | |
"Found " + str(len(parsed_prs)) + " pull requests:\n" + str(parsed_prs) | |
) | |
return parsed_prs_str | |
else: | |
return "No open pull requests available" | |
def list_files_in_main_branch(self) -> str: | |
""" | |
Fetches all files in the main branch of the repo. | |
Returns: | |
str: A plaintext report containing the paths and names of the files. | |
""" | |
files: List[str] = [] | |
try: | |
contents = self.github_repo_instance.get_contents( | |
"", ref=self.github_base_branch | |
) | |
for content in contents: | |
if content.type == "dir": | |
files.extend(self.get_files_from_directory(content.path)) | |
else: | |
files.append(content.path) | |
if files: | |
files_str = "\n".join(files) | |
return f"Found {len(files)} files in the main branch:\n{files_str}" | |
else: | |
return "No files found in the main branch" | |
except Exception as e: | |
return str(e) | |
def set_active_branch(self, branch_name: str) -> str: | |
"""Equivalent to `git checkout branch_name` for this Agent. | |
Clones formatting from Github. | |
Returns an Error (as a string) if branch doesn't exist. | |
""" | |
curr_branches = [ | |
branch.name for branch in self.github_repo_instance.get_branches() | |
] | |
if branch_name in curr_branches: | |
self.active_branch = branch_name | |
return f"Switched to branch `{branch_name}`" | |
else: | |
return ( | |
f"Error {branch_name} does not exist," | |
f"in repo with current branches: {str(curr_branches)}" | |
) | |
def list_branches_in_repo(self) -> str: | |
""" | |
Fetches a list of all branches in the repository. | |
Returns: | |
str: A plaintext report containing the names of the branches. | |
""" | |
try: | |
branches = [ | |
branch.name for branch in self.github_repo_instance.get_branches() | |
] | |
if branches: | |
branches_str = "\n".join(branches) | |
return ( | |
f"Found {len(branches)} branches in the repository:" | |
f"\n{branches_str}" | |
) | |
else: | |
return "No branches found in the repository" | |
except Exception as e: | |
return str(e) | |
def create_branch(self, proposed_branch_name: str) -> str: | |
""" | |
Create a new branch, and set it as the active bot branch. | |
Equivalent to `git switch -c proposed_branch_name` | |
If the proposed branch already exists, we append _v1 then _v2... | |
until a unique name is found. | |
Returns: | |
str: A plaintext success message. | |
""" | |
from github import GithubException | |
i = 0 | |
new_branch_name = proposed_branch_name | |
base_branch = self.github_repo_instance.get_branch( | |
self.github_repo_instance.default_branch | |
) | |
for i in range(1000): | |
try: | |
self.github_repo_instance.create_git_ref( | |
ref=f"refs/heads/{new_branch_name}", sha=base_branch.commit.sha | |
) | |
self.active_branch = new_branch_name | |
return ( | |
f"Branch '{new_branch_name}' " | |
"created successfully, and set as current active branch." | |
) | |
except GithubException as e: | |
if e.status == 422 and "Reference already exists" in e.data["message"]: | |
i += 1 | |
new_branch_name = f"{proposed_branch_name}_v{i}" | |
else: | |
# Handle any other exceptions | |
print(f"Failed to create branch. Error: {e}") # noqa: T201 | |
raise Exception( | |
"Unable to create branch name from proposed_branch_name: " | |
f"{proposed_branch_name}" | |
) | |
return ( | |
"Unable to create branch. " | |
"At least 1000 branches exist with named derived from " | |
f"proposed_branch_name: `{proposed_branch_name}`" | |
) | |
def list_files_in_bot_branch(self) -> str: | |
""" | |
Fetches all files in the active branch of the repo, | |
the branch the bot uses to make changes. | |
Returns: | |
str: A plaintext list containing the filepaths in the branch. | |
""" | |
files: List[str] = [] | |
try: | |
contents = self.github_repo_instance.get_contents( | |
"", ref=self.active_branch | |
) | |
for content in contents: | |
if content.type == "dir": | |
files.extend(self.get_files_from_directory(content.path)) | |
else: | |
files.append(content.path) | |
if files: | |
files_str = "\n".join(files) | |
return ( | |
f"Found {len(files)} files in branch `{self.active_branch}`:\n" | |
f"{files_str}" | |
) | |
else: | |
return f"No files found in branch: `{self.active_branch}`" | |
except Exception as e: | |
return f"Error: {e}" | |
def get_files_from_directory(self, directory_path: str) -> str: | |
""" | |
Recursively fetches files from a directory in the repo. | |
Parameters: | |
directory_path (str): Path to the directory | |
Returns: | |
str: List of file paths, or an error message. | |
""" | |
from github import GithubException | |
files: List[str] = [] | |
try: | |
contents = self.github_repo_instance.get_contents( | |
directory_path, ref=self.active_branch | |
) | |
except GithubException as e: | |
return f"Error: status code {e.status}, {e.message}" | |
for content in contents: | |
if content.type == "dir": | |
files.extend(self.get_files_from_directory(content.path)) | |
else: | |
files.append(content.path) | |
return str(files) | |
def get_issue(self, issue_number: int) -> Dict[str, Any]: | |
""" | |
Fetches a specific issue and its first 10 comments | |
Parameters: | |
issue_number(int): The number for the github issue | |
Returns: | |
dict: A dictionary containing the issue's title, | |
body, comments as a string, and the username of the user | |
who opened the issue | |
""" | |
issue = self.github_repo_instance.get_issue(number=issue_number) | |
page = 0 | |
comments: List[dict] = [] | |
while len(comments) <= 10: | |
comments_page = issue.get_comments().get_page(page) | |
if len(comments_page) == 0: | |
break | |
for comment in comments_page: | |
comments.append({"body": comment.body, "user": comment.user.login}) | |
page += 1 | |
opened_by = None | |
if issue.user and issue.user.login: | |
opened_by = issue.user.login | |
return { | |
"number": issue_number, | |
"title": issue.title, | |
"body": issue.body, | |
"comments": str(comments), | |
"opened_by": str(opened_by), | |
} | |
def list_pull_request_files(self, pr_number: int) -> List[Dict[str, Any]]: | |
"""Fetches the full text of all files in a PR. Truncates after first 3k tokens. | |
# TODO: Enhancement to summarize files with ctags if they're getting long. | |
Args: | |
pr_number(int): The number of the pull request on Github | |
Returns: | |
dict: A dictionary containing the issue's title, | |
body, and comments as a string | |
""" | |
tiktoken = _import_tiktoken() | |
MAX_TOKENS_FOR_FILES = 3_000 | |
pr_files = [] | |
pr = self.github_repo_instance.get_pull(number=int(pr_number)) | |
total_tokens = 0 | |
page = 0 | |
while True: # or while (total_tokens + tiktoken()) < MAX_TOKENS_FOR_FILES: | |
files_page = pr.get_files().get_page(page) | |
if len(files_page) == 0: | |
break | |
for file in files_page: | |
try: | |
file_metadata_response = requests.get(file.contents_url) | |
if file_metadata_response.status_code == 200: | |
download_url = json.loads(file_metadata_response.text)[ | |
"download_url" | |
] | |
else: | |
print(f"Failed to download file: {file.contents_url}, skipping") # noqa: T201 | |
continue | |
file_content_response = requests.get(download_url) | |
if file_content_response.status_code == 200: | |
# Save the content as a UTF-8 string | |
file_content = file_content_response.text | |
else: | |
print( # noqa: T201 | |
"Failed downloading file content " | |
f"(Error {file_content_response.status_code}). Skipping" | |
) | |
continue | |
file_tokens = len( | |
tiktoken.get_encoding("cl100k_base").encode( | |
file_content + file.filename + "file_name file_contents" | |
) | |
) | |
if (total_tokens + file_tokens) < MAX_TOKENS_FOR_FILES: | |
pr_files.append( | |
{ | |
"filename": file.filename, | |
"contents": file_content, | |
"additions": file.additions, | |
"deletions": file.deletions, | |
} | |
) | |
total_tokens += file_tokens | |
except Exception as e: | |
print(f"Error when reading files from a PR on github. {e}") # noqa: T201 | |
page += 1 | |
return pr_files | |
def get_pull_request(self, pr_number: int) -> Dict[str, Any]: | |
""" | |
Fetches a specific pull request and its first 10 comments, | |
limited by max_tokens. | |
Parameters: | |
pr_number(int): The number for the Github pull | |
max_tokens(int): The maximum number of tokens in the response | |
Returns: | |
dict: A dictionary containing the pull's title, body, | |
and comments as a string | |
""" | |
max_tokens = 2_000 | |
pull = self.github_repo_instance.get_pull(number=pr_number) | |
total_tokens = 0 | |
def get_tokens(text: str) -> int: | |
tiktoken = _import_tiktoken() | |
return len(tiktoken.get_encoding("cl100k_base").encode(text)) | |
def add_to_dict(data_dict: Dict[str, Any], key: str, value: str) -> None: | |
nonlocal total_tokens # Declare total_tokens as nonlocal | |
tokens = get_tokens(value) | |
if total_tokens + tokens <= max_tokens: | |
data_dict[key] = value | |
total_tokens += tokens # Now this will modify the outer variable | |
response_dict: Dict[str, str] = {} | |
add_to_dict(response_dict, "title", pull.title) | |
add_to_dict(response_dict, "number", str(pr_number)) | |
add_to_dict(response_dict, "body", pull.body) | |
comments: List[str] = [] | |
page = 0 | |
while len(comments) <= 10: | |
comments_page = pull.get_issue_comments().get_page(page) | |
if len(comments_page) == 0: | |
break | |
for comment in comments_page: | |
comment_str = str({"body": comment.body, "user": comment.user.login}) | |
if total_tokens + get_tokens(comment_str) > max_tokens: | |
break | |
comments.append(comment_str) | |
total_tokens += get_tokens(comment_str) | |
page += 1 | |
add_to_dict(response_dict, "comments", str(comments)) | |
commits: List[str] = [] | |
page = 0 | |
while len(commits) <= 10: | |
commits_page = pull.get_commits().get_page(page) | |
if len(commits_page) == 0: | |
break | |
for commit in commits_page: | |
commit_str = str({"message": commit.commit.message}) | |
if total_tokens + get_tokens(commit_str) > max_tokens: | |
break | |
commits.append(commit_str) | |
total_tokens += get_tokens(commit_str) | |
page += 1 | |
add_to_dict(response_dict, "commits", str(commits)) | |
return response_dict | |
def create_pull_request(self, pr_query: str) -> str: | |
""" | |
Makes a pull request from the bot's branch to the base branch | |
Parameters: | |
pr_query(str): a string which contains the PR title | |
and the PR body. The title is the first line | |
in the string, and the body are the rest of the string. | |
For example, "Updated README\nmade changes to add info" | |
Returns: | |
str: A success or failure message | |
""" | |
if self.github_base_branch == self.active_branch: | |
return """Cannot make a pull request because | |
commits are already in the main or master branch.""" | |
else: | |
try: | |
title = pr_query.split("\n")[0] | |
body = pr_query[len(title) + 2 :] | |
pr = self.github_repo_instance.create_pull( | |
title=title, | |
body=body, | |
head=self.active_branch, | |
base=self.github_base_branch, | |
) | |
return f"Successfully created PR number {pr.number}" | |
except Exception as e: | |
return "Unable to make pull request due to error:\n" + str(e) | |
def comment_on_issue(self, comment_query: str) -> str: | |
""" | |
Adds a comment to a github issue | |
Parameters: | |
comment_query(str): a string which contains the issue number, | |
two newlines, and the comment. | |
for example: "1\n\nWorking on it now" | |
adds the comment "working on it now" to issue 1 | |
Returns: | |
str: A success or failure message | |
""" | |
issue_number = int(comment_query.split("\n\n")[0]) | |
comment = comment_query[len(str(issue_number)) + 2 :] | |
try: | |
issue = self.github_repo_instance.get_issue(number=issue_number) | |
issue.create_comment(comment) | |
return "Commented on issue " + str(issue_number) | |
except Exception as e: | |
return "Unable to make comment due to error:\n" + str(e) | |
def create_file(self, file_query: str) -> str: | |
""" | |
Creates a new file on the Github repo | |
Parameters: | |
file_query(str): a string which contains the file path | |
and the file contents. The file path is the first line | |
in the string, and the contents are the rest of the string. | |
For example, "hello_world.md\n# Hello World!" | |
Returns: | |
str: A success or failure message | |
""" | |
if self.active_branch == self.github_base_branch: | |
return ( | |
"You're attempting to commit to the directly to the" | |
f"{self.github_base_branch} branch, which is protected. " | |
"Please create a new branch and try again." | |
) | |
file_path = file_query.split("\n")[0] | |
file_contents = file_query[len(file_path) + 2 :] | |
try: | |
try: | |
file = self.github_repo_instance.get_contents( | |
file_path, ref=self.active_branch | |
) | |
if file: | |
return ( | |
f"File already exists at `{file_path}` " | |
f"on branch `{self.active_branch}`. You must use " | |
"`update_file` to modify it." | |
) | |
except Exception: | |
# expected behavior, file shouldn't exist yet | |
pass | |
self.github_repo_instance.create_file( | |
path=file_path, | |
message="Create " + file_path, | |
content=file_contents, | |
branch=self.active_branch, | |
) | |
return "Created file " + file_path | |
except Exception as e: | |
return "Unable to make file due to error:\n" + str(e) | |
def read_file(self, file_path: str) -> str: | |
""" | |
Read a file from this agent's branch, defined by self.active_branch, | |
which supports PR branches. | |
Parameters: | |
file_path(str): the file path | |
Returns: | |
str: The file decoded as a string, or an error message if not found | |
""" | |
try: | |
file = self.github_repo_instance.get_contents( | |
file_path, ref=self.active_branch | |
) | |
return file.decoded_content.decode("utf-8") | |
except Exception as e: | |
return ( | |
f"File not found `{file_path}` on branch" | |
f"`{self.active_branch}`. Error: {str(e)}" | |
) | |
def update_file(self, file_query: str) -> str: | |
""" | |
Updates a file with new content. | |
Parameters: | |
file_query(str): Contains the file path and the file contents. | |
The old file contents is wrapped in OLD <<<< and >>>> OLD | |
The new file contents is wrapped in NEW <<<< and >>>> NEW | |
For example: | |
/test/hello.txt | |
OLD <<<< | |
Hello Earth! | |
>>>> OLD | |
NEW <<<< | |
Hello Mars! | |
>>>> NEW | |
Returns: | |
A success or failure message | |
""" | |
if self.active_branch == self.github_base_branch: | |
return ( | |
"You're attempting to commit to the directly" | |
f"to the {self.github_base_branch} branch, which is protected. " | |
"Please create a new branch and try again." | |
) | |
try: | |
file_path: str = file_query.split("\n")[0] | |
old_file_contents = ( | |
file_query.split("OLD <<<<")[1].split(">>>> OLD")[0].strip() | |
) | |
new_file_contents = ( | |
file_query.split("NEW <<<<")[1].split(">>>> NEW")[0].strip() | |
) | |
file_content = self.read_file(file_path) | |
updated_file_content = file_content.replace( | |
old_file_contents, new_file_contents | |
) | |
if file_content == updated_file_content: | |
return ( | |
"File content was not updated because old content was not found." | |
"It may be helpful to use the read_file action to get " | |
"the current file contents." | |
) | |
self.github_repo_instance.update_file( | |
path=file_path, | |
message="Update " + str(file_path), | |
content=updated_file_content, | |
branch=self.active_branch, | |
sha=self.github_repo_instance.get_contents( | |
file_path, ref=self.active_branch | |
).sha, | |
) | |
return "Updated file " + str(file_path) | |
except Exception as e: | |
return "Unable to update file due to error:\n" + str(e) | |
def delete_file(self, file_path: str) -> str: | |
""" | |
Deletes a file from the repo | |
Parameters: | |
file_path(str): Where the file is | |
Returns: | |
str: Success or failure message | |
""" | |
if self.active_branch == self.github_base_branch: | |
return ( | |
"You're attempting to commit to the directly" | |
f"to the {self.github_base_branch} branch, which is protected. " | |
"Please create a new branch and try again." | |
) | |
try: | |
self.github_repo_instance.delete_file( | |
path=file_path, | |
message="Delete " + file_path, | |
branch=self.active_branch, | |
sha=self.github_repo_instance.get_contents( | |
file_path, ref=self.active_branch | |
).sha, | |
) | |
return "Deleted file " + file_path | |
except Exception as e: | |
return "Unable to delete file due to error:\n" + str(e) | |
def search_issues_and_prs(self, query: str) -> str: | |
""" | |
Searches issues and pull requests in the repository. | |
Parameters: | |
query(str): The search query | |
Returns: | |
str: A string containing the first 5 issues and pull requests | |
""" | |
search_result = self.github.search_issues(query, repo=self.github_repository) | |
max_items = min(5, search_result.totalCount) | |
results = [f"Top {max_items} results:"] | |
for issue in search_result[:max_items]: | |
results.append( | |
f"Title: {issue.title}, Number: {issue.number}, State: {issue.state}" | |
) | |
return "\n".join(results) | |
def search_code(self, query: str) -> str: | |
""" | |
Searches code in the repository. | |
# Todo: limit total tokens returned... | |
Parameters: | |
query(str): The search query | |
Returns: | |
str: A string containing, at most, the top 5 search results | |
""" | |
search_result = self.github.search_code( | |
query=query, repo=self.github_repository | |
) | |
if search_result.totalCount == 0: | |
return "0 results found." | |
max_results = min(5, search_result.totalCount) | |
results = [f"Showing top {max_results} of {search_result.totalCount} results:"] | |
count = 0 | |
for code in search_result: | |
if count >= max_results: | |
break | |
# Get the file content using the PyGithub get_contents method | |
file_content = self.github_repo_instance.get_contents( | |
code.path, ref=self.active_branch | |
).decoded_content.decode() | |
results.append( | |
f"Filepath: `{code.path}`\nFile contents: " | |
f"{file_content}\n<END OF FILE>" | |
) | |
count += 1 | |
return "\n".join(results) | |
def create_review_request(self, reviewer_username: str) -> str: | |
""" | |
Creates a review request on *THE* open pull request | |
that matches the current active_branch. | |
Parameters: | |
reviewer_username(str): The username of the person who is being requested | |
Returns: | |
str: A message confirming the creation of the review request | |
""" | |
pull_requests = self.github_repo_instance.get_pulls( | |
state="open", sort="created" | |
) | |
# find PR against active_branch | |
pr = next( | |
(pr for pr in pull_requests if pr.head.ref == self.active_branch), None | |
) | |
if pr is None: | |
return ( | |
"No open pull request found for the " | |
f"current branch `{self.active_branch}`" | |
) | |
try: | |
pr.create_review_request(reviewers=[reviewer_username]) | |
return ( | |
f"Review request created for user {reviewer_username} " | |
f"on PR #{pr.number}" | |
) | |
except Exception as e: | |
return f"Failed to create a review request with error {e}" | |
def run(self, mode: str, query: str) -> str: | |
if mode == "get_issue": | |
return json.dumps(self.get_issue(int(query))) | |
elif mode == "get_pull_request": | |
return json.dumps(self.get_pull_request(int(query))) | |
elif mode == "list_pull_request_files": | |
return json.dumps(self.list_pull_request_files(int(query))) | |
elif mode == "get_issues": | |
return self.get_issues() | |
elif mode == "comment_on_issue": | |
return self.comment_on_issue(query) | |
elif mode == "create_file": | |
return self.create_file(query) | |
elif mode == "create_pull_request": | |
return self.create_pull_request(query) | |
elif mode == "read_file": | |
return self.read_file(query) | |
elif mode == "update_file": | |
return self.update_file(query) | |
elif mode == "delete_file": | |
return self.delete_file(query) | |
elif mode == "list_open_pull_requests": | |
return self.list_open_pull_requests() | |
elif mode == "list_files_in_main_branch": | |
return self.list_files_in_main_branch() | |
elif mode == "list_files_in_bot_branch": | |
return self.list_files_in_bot_branch() | |
elif mode == "list_branches_in_repo": | |
return self.list_branches_in_repo() | |
elif mode == "set_active_branch": | |
return self.set_active_branch(query) | |
elif mode == "create_branch": | |
return self.create_branch(query) | |
elif mode == "get_files_from_directory": | |
return self.get_files_from_directory(query) | |
elif mode == "search_issues_and_prs": | |
return self.search_issues_and_prs(query) | |
elif mode == "search_code": | |
return self.search_code(query) | |
elif mode == "create_review_request": | |
return self.create_review_request(query) | |
else: | |
raise ValueError("Invalid mode" + mode) | |