Spaces:
Running
Running
from mcp.server.fastmcp import FastMCP | |
import time | |
from litellm import completion | |
import os | |
import glob | |
import http.client | |
import json | |
import openpyxl | |
import shutil | |
from google import genai | |
import pexpect | |
client = genai.Client(api_key="AIzaSyDtP05TyoIy9j0uPL7_wLEhgQEE75AZQSc") | |
source_dir = "/app/uploads/temp" | |
destination_dir = "/app/code_interpreter" | |
files_list=[] | |
downloaded_files=[] | |
from openai import OpenAI | |
clienty = OpenAI(api_key="xyz", base_url="https://akiko19191-backend.hf.space/") | |
mcp = FastMCP("code_sandbox") | |
data={} | |
result="" | |
import requests | |
import os | |
from bs4 import BeautifulSoup # For parsing HTML | |
Parent=pexpect.spawn('bash') | |
def transfer_files(): | |
try: | |
for item in os.listdir(source_dir): | |
item_path = os.path.join(source_dir, item) | |
if os.path.isdir(item_path): # Check if it's a directory | |
for filename in os.listdir(item_path): | |
source_file_path = os.path.join(item_path, filename) | |
destination_file_path = os.path.join(destination_dir, filename) | |
if not os.path.exists(destination_file_path): | |
shutil.move(source_file_path, destination_file_path) | |
except: | |
pass | |
def transfer_files2(): | |
try: | |
for item in os.listdir("/app/uploads"): | |
if "temp" not in item: | |
item_path = os.path.join(source_dir, item) | |
if os.path.isdir(item_path): # Check if it's a directory | |
for filename in os.listdir(item_path): | |
source_file_path = os.path.join(item_path, filename) | |
destination_file_path = os.path.join(destination_dir, filename.split("__")[1]) | |
if not os.path.exists(destination_file_path): | |
shutil.move(source_file_path, destination_file_path) | |
except: | |
pass | |
def upload_file(file_path, upload_url): | |
"""Uploads a file to the specified server endpoint.""" | |
try: | |
# Check if the file exists | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"File not found: {file_path}") | |
# Prepare the file for upload | |
with open(file_path, "rb") as file: | |
files = {"file": (os.path.basename(file_path), file)} # Important: Provide filename | |
# Send the POST request | |
response = requests.post(upload_url, files=files) | |
# Check the response status code | |
response.raise_for_status() # Raise an exception for bad status codes (4xx or 5xx) | |
# Parse and print the response | |
if response.status_code == 200: | |
print(f"File uploaded successfully. Filename returned by server: {response.text}") | |
return response.text # Return the filename returned by the server | |
else: | |
print(f"Upload failed. Status code: {response.status_code}, Response: {response.text}") | |
return None | |
except FileNotFoundError as e: | |
print(e) | |
return None # or re-raise the exception if you want the program to halt | |
except requests.exceptions.RequestException as e: | |
print(f"Upload failed. Network error: {e}") | |
return None | |
TOKEN = "5182224145:AAEjkSlPqV-Q3rH8A9X8HfCDYYEQ44v_qy0" | |
chat_id = "5075390513" | |
from requests_futures.sessions import FuturesSession | |
session = FuturesSession() | |
def run(cmd, timeout_sec,forever_cmd): | |
global Parent | |
if forever_cmd == 'true': | |
Parent.close() | |
Parent = pexpect.spawn("bash") | |
command="cd /app/code_interpreter/ && "+cmd | |
Parent.sendline(command) | |
Parent.readline().decode() | |
return str(Parent.readline().decode()) | |
t=time.time() | |
child = pexpect.spawn("bash") | |
output="" | |
command="cd /app/code_interpreter/ && "+cmd | |
child.sendline('PROMPT_COMMAND="echo END"') | |
child.readline().decode() | |
child.readline().decode() | |
child.sendline(command) | |
while (not child.eof() ) and (time.time()-t<timeout_sec): | |
x=child.readline().decode() | |
output=output+x | |
print(x) | |
if "END" in x : | |
output=output.replace("END","") | |
child.close() | |
break | |
if "true" in forever_cmd: | |
break | |
return output | |
def analyse_audio(audiopath,query) -> dict: | |
"""Ask another AI model about audios.The AI model can listen to the audio and give answers.Eg-query:Generate detailed minutes of meeting from the audio clip,audiopath='/app/code_interpreter/<audioname>'.Note:The audios are automatically present in the /app/code_interpreter directory.""" | |
transfer_files2() | |
myfile = client.files.upload(file=audiopath) | |
response = client.models.generate_content( | |
model='gemini-2.0-flash', | |
contents=[query, myfile] | |
) | |
return {"Output":str(response.text)} | |
def analyse_video(videopath,query) -> dict: | |
"""Ask another AI model about videos.The AI model can see the videos and give answers.Eg-query:Create a very detailed transcript and summary of the video,videopath='/app/code_interpreter/<videoname>'Note:The videos are automatically present in the /app/code_interpreter directory.""" | |
transfer_files2() | |
video_file = client.files.upload(file=videopath) | |
while video_file.state.name == "PROCESSING": | |
print('.', end='') | |
time.sleep(1) | |
video_file = client.files.get(name=video_file.name) | |
if video_file.state.name == "FAILED": | |
raise ValueError(video_file.state.name) | |
response = client.models.generate_content( | |
model='gemini-2.0-flash', | |
contents=[query, video_file] | |
) | |
return {"Output":str(response.text)} | |
def analyse_images(imagepath,query) -> dict: | |
"""Ask another AI model about images.The AI model can see the images and give answers.Eg-query:Who is the person in this image?,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" | |
transfer_files2() | |
video_file = client.files.upload(file=imagepath) | |
response = client.models.generate_content( | |
model='gemini-2.0-flash', | |
contents=[query, video_file] | |
) | |
return {"Output":str(response.text)} | |
# @mcp.tool() | |
# def generate_images(imagepath,query) -> dict: | |
# """Ask another AI model to generate images based on the query and the image path.Set image path as an empty string , if you dont want to edit images , but rather generate images.Eg-query:Generate a cartoon version of this image,imagepath='/app/code_interpreter/<imagename>'.Note:The images are automatically present in the /app/code_interpreter directory.""" | |
# transfer_files2() | |
# video_file = client.files.upload(file=imagepath) | |
# response = client.models.generate_content( | |
# model='gemini-2.0-flash', | |
# contents=[query, video_file] | |
# ) | |
# return {"Output":str(response.text)} | |
def create_code_files(filename: str, code) -> dict: | |
"""Create code files by passing the the filename as well the entire code to write.The file is created by default in the /app/code_interpreter directory.Note:All user uploaded files that you might need to work upon are stored in the /app/code_interpreter directory.""" | |
global destination_dir | |
transfer_files() | |
transfer_files2() | |
if not os.path.exists(os.path.join(destination_dir, filename)): | |
if isinstance(code, dict): | |
with open(os.path.join(destination_dir, filename), 'w', encoding='utf-8') as f: | |
json.dump(code, f, ensure_ascii=False, indent=4) | |
else: | |
f = open(os.path.join(destination_dir, filename), "w") | |
f.write(str(code)) | |
f.close() | |
return {"info":"The referenced code files were created successfully."} | |
else: | |
if isinstance(code, dict): | |
with open(os.path.join(destination_dir, filename), 'w', encoding='utf-8') as f: | |
json.dump(code, f, ensure_ascii=False, indent=4) | |
else: | |
f = open(os.path.join(destination_dir, filename), "w") | |
f.write(str(code)) | |
f.close() | |
return {"info":"The referenced code files were created successfully."} | |
# return {"info":"The referenced code files already exist. Please rename the file or delete the existing one."} | |
def run_code(language:str,packages:str,filename: str, code: str,start_cmd:str,forever_cmd:str) -> dict: | |
""" | |
Execute code in a controlled environment with package installation and file handling. | |
Args: | |
language:Programming language of the code (eg:"python", "nodejs", "bash","html",etc). | |
packages: Space-separated list of packages to install.(python packages are installed if language set to python and npm packages are installed if language set to nodejs). | |
Preinstalled python packages: gradio, XlsxWriter, openpyxl , mpxj , jpype1. | |
Preinstalled npm packages: express, ejs, chart.js. | |
filename:Name of the file to create (stored in /app/code_interpreter/). | |
code:Full code to write to the file. | |
start_cmd:Command to execute the file (e.g., "python /app/code_interpreter/app.py" | |
or "bash /app/code_interpreter/app.py"). | |
Leave blank ('') if only file creation is needed / start_cmd not required. | |
forever_cmd:If 'true', the command will run indefinitely.Set to 'true', when runnig a website/server.Run all servers/website on port 1337. If 'false', the command will time out after 300 second and the result will be returned. | |
Notes: | |
- All user-uploaded files are in /app/code_interpreter/. | |
- After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. | |
- bash/apk packages cannot be installed. | |
- When editing and subsequently re-executing the server with the forever_cmd='true' setting, the previous server instance will be automatically terminated, and the updated server will commence operation. This functionality negates the requirement for manual process termination commands such as pkill node. | |
- The opened ports can be externally accessed at https://suitable-liked-ibex.ngrok-free.app/ (ONLY if the website is running successfully) | |
- Do not use `plt.show()` in this headless environment. Save visualizations directly (e.g., `plt.savefig("happiness_img.png")` or export GIFs/videos). | |
""" | |
global destination_dir | |
package_names = packages.strip() | |
if "python" in language: | |
command="pip install --break-system-packages " | |
elif "node" in language: | |
command="npm install " | |
else: | |
command="ls" | |
if packages != "" and packages != " ": | |
package_logs=run( | |
f"{command} {package_names}", timeout_sec=300,forever_cmd= 'false' | |
) | |
if "ERROR" in package_logs: | |
return {"package_installation_log":package_logs,"info":"Package installation failed. Please check the package names. Tip:Try using another package/method to accomplish the task."} | |
transfer_files2() | |
transfer_files() | |
f = open(os.path.join(destination_dir, filename), "w") | |
f.write(code) | |
f.close() | |
global files_list | |
if start_cmd != "" and start_cmd != " ": | |
stdot=run(start_cmd, 120,forever_cmd) | |
else: | |
stdot="File created successfully." | |
onlyfiles = glob.glob("/app/code_interpreter/*") | |
onlyfiles=list(set(onlyfiles)-set(files_list)) | |
uploaded_filenames=[] | |
for files in onlyfiles: | |
try: | |
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") | |
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") | |
except: | |
pass | |
files_list=onlyfiles | |
return {"output":stdot,"Files_download_link":uploaded_filenames} | |
def run_code_files(start_cmd:str,forever_cmd:str) -> dict: | |
"""Executes a shell command to run code files from /app/code_interpreter. | |
Runs the given `start_cmd`. The execution behavior depends on `forever_cmd`. | |
Any server/website started should use port 1337. | |
Args: | |
start_cmd (str): The shell command to execute the code. | |
(e.g., ``python /app/code_interpreter/app.py`` or ``node /app/code_interpreter/server.js``). | |
Files must be in ``/app/code_interpreter``. | |
forever_cmd (str): Execution mode. | |
- ``'true'``: Runs indefinitely (for servers/websites). | |
- ``'false'``: Runs up to 300s, captures output. | |
Returns: | |
dict: A dictionary containing: | |
- ``'output'`` (str): Captured stdout (mainly when forever_cmd='false'). | |
- ``'Files_download_link'`` (Any): Links/identifiers for downloadable files. | |
Notes: | |
- After execution, embed a download link (or display images/gifs/videos directly in markdown format) in your response. | |
- When editing and subsequently re-executing the server with the forever_cmd='true' setting, the previous server instance will be automatically terminated, and the updated server will commence operation. This functionality negates the requirement for manual process termination commands such as pkill node. | |
- The opened ports can be externally accessed at https://suitable-liked-ibex.ngrok-free.app/ (ONLY if the website is running successfully) | |
""" | |
global files_list | |
stdot=run(start_cmd, 300,forever_cmd) | |
onlyfiles = glob.glob("/app/code_interpreter/*") | |
onlyfiles=list(set(onlyfiles)-set(files_list)) | |
uploaded_filenames=[] | |
for files in onlyfiles: | |
try: | |
uploaded_filename = upload_file(files, "https://opengpt-4ik5.onrender.com/upload") | |
uploaded_filenames.append(f"https://opengpt-4ik5.onrender.com/static/{uploaded_filename}") | |
except: | |
pass | |
files_list=onlyfiles | |
return {"output":stdot,"Files_download_link":uploaded_filenames} | |
def run_shell_command(cmd:str,forever_cmd:str) -> dict: | |
"""Executes a shell command in a sandboxed Alpine Linux environment. | |
Runs the provided `cmd` string within a bash shell. Commands are executed | |
relative to the `/app/code_interpreter/` working directory by default. | |
The execution behavior (indefinite run vs. timeout) is controlled by | |
the `forever_cmd` parameter. | |
Important Environment Notes: | |
- The execution environment is **Alpine Linux**. Commands should be | |
compatible . | |
- `sudo` commands are restricted for security reasons.Hence commands which require elevated privelages like `apk add` CANNOT be executed.Instead try to use `pip install` or `npm install` commands. | |
- Standard bash features like `&&`, `||`, pipes (`|`), etc., are supported. | |
Args: | |
cmd (str): The shell command to execute. | |
Example: ``mkdir test_dir && ls -l`` | |
forever_cmd (str): Determines the execution mode. | |
- ``'true'``: Runs the command indefinitely. Suitable | |
for starting servers or long-running processes. | |
Output capture might be limited. | |
- ``'false'``: Runs the command until completion or | |
a 300-second timeout, whichever comes first. | |
Captures standard output. | |
Returns: | |
dict: A dictionary containing the execution results: | |
- ``'output'`` (str): The captured standard output (stdout) and potentially | |
standard error (stderr) from the command. | |
""" | |
transfer_files() | |
transfer_files2() | |
output=run(cmd, 300,forever_cmd) | |
return {"output":output} | |
def install_python_packages(python_packages:str) -> dict: | |
"""python_packages to install seperated by space.eg-(python packages:numpy matplotlib).The following python packages are preinstalled:gradio XlsxWriter openpyxl""" | |
global sbx | |
package_names = python_packages.strip() | |
command="pip install" | |
if not package_names: | |
return | |
stdot=run( | |
f"{command} --break-system-packages {package_names}", timeout_sec=300, forever_cmd= 'false' | |
) | |
return {"stdout":stdot,"info":"Ran package installation command"} | |
def get_youtube_transcript(videoid:str) -> dict: | |
"""Get the transcript of a youtube video by passing the video id.Eg videoid=ZacjOVVgoLY""" | |
conn = http.client.HTTPSConnection("youtube-transcript3.p.rapidapi.com") | |
headers = { | |
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", | |
'x-rapidapi-host': "youtube-transcript3.p.rapidapi.com" | |
} | |
conn.request("GET",f"/api/transcript?videoId={videoid}", headers=headers) | |
res = conn.getresponse() | |
data = res.read() | |
return json.loads(data) | |
def read_excel_file(filename) -> dict: | |
"""Reads the contents of an excel file.Returns a dict with key :value pair = cell location:cell content.Always run this command first , when working with excels.The excel file is automatically present in the /app/code_interpreter directory. """ | |
global destination_dir | |
transfer_files2() | |
transfer_files() | |
workbook = openpyxl.load_workbook(os.path.join(destination_dir, filename)) | |
# Create an empty dictionary to store the data | |
excel_data_dict = {} | |
# Iterate over all sheets | |
for sheet_name in workbook.sheetnames: | |
sheet = workbook[sheet_name] | |
# Iterate over all rows and columns | |
for row in sheet.iter_rows(): | |
for cell in row: | |
# Get cell coordinate (e.g., 'A1') and value | |
cell_coordinate = cell.coordinate | |
cell_value = cell.value | |
if cell_value is not None: | |
excel_data_dict[cell_coordinate] = str(cell_value) | |
return excel_data_dict | |
def scrape_websites(url_list:list,query:str) -> list: | |
"""Scrapes specific website content.query is the question you want to ask about the content of the website.e.g-query:Give .pptx links in the website,Summarise the content in very great detail,etc.Maximum 4 urls can be passed at a time.""" | |
conn = http.client.HTTPSConnection("scrapeninja.p.rapidapi.com") | |
headers = { | |
'x-rapidapi-key': "2a155d4498mshd52b7d6b7a2ff86p10cdd0jsn6252e0f2f529", | |
'x-rapidapi-host': "scrapeninja.p.rapidapi.com", | |
'Content-Type': "application/json" | |
} | |
Output="" | |
links="" | |
content="" | |
for urls in url_list: | |
payload = {"url" :urls} | |
payload=json.dumps(payload) | |
conn.request("POST", "/scrape", payload, headers) | |
res = conn.getresponse() | |
data = res.read() | |
content=content+str(data.decode("utf-8")) | |
#Only thing llama 4 is good for. | |
response = clienty.chat.completions.create( | |
model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", | |
messages=[ | |
{"role": "user", "content": f"{query} [CONTENT]:{content}"} | |
],stream=True | |
) | |
for chunk in response: | |
Output = Output +str(chunk.choices[0].delta.content) | |
#-------------- | |
response2 = clienty.chat.completions.create( | |
model="meta-llama/Llama-4-Maverick-17B-128E-Instruct-FP8", | |
messages=[ | |
{"role": "user", "content": f"Give all relevant and different types of links in this content.The links may be relevant image links , file links , video links , website links , etc .You must give Minimum 30 links and maximum 50 links.[CONTENT]:{content}"} | |
],stream=True | |
) | |
for chunk in response2: | |
links = links +str(chunk.choices[0].delta.content) | |
return {"website_content":Output,"relevant_links":links} | |
if __name__ == "__main__": | |
# Initialize and run the server | |
Ngrok=pexpect.spawn('bash') | |
Ngrok.sendline("ngrok http --url=suitable-liked-ibex.ngrok-free.app 1337 --config /home/node/.config/ngrok/ngrok.yml") | |
Ngrok.readline().decode() | |
mcp.run(transport='stdio') |