kgot-python-executor / python_executor.py
chmielvu's picture
Upload folder using huggingface_hub
1712ab6 verified
Raw
History Blame Contribute Delete
2.03 kB
import re
import subprocess
import sys
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import requests
from flask import Flask, jsonify, request
from langchain_experimental.utilities import PythonREPL
app = Flask(__name__)
PYTHON_EXECUTOR_HOST = "0.0.0.0"
PYTHON_EXECUTOR_PORT = 7860
def is_standard_lib(package: str) -> bool:
return package in sys.stdlib_module_names
def install(package: str) -> None:
subprocess.check_call([sys.executable, "-m", "pip", "install", package])
@app.get("/")
def root():
return {"service": "kgot-python-executor", "port": PYTHON_EXECUTOR_PORT}
@app.get("/health")
def health():
return {"status": "ok"}
@app.route('/run', methods=['POST'])
def run_code() -> tuple[requests.Response, int]:
timeout_seconds = 240
required_modules = request.json.get('required_modules', [])
if required_modules:
for module in required_modules:
if not is_standard_lib(module):
install(module)
code = request.json.get('code')
if not code:
return jsonify({"error": "No code provided"}), 400
python_repl = PythonREPL()
def execute_code() -> tuple[requests.Response, int]:
def is_error_string(s: str) -> bool:
error_pattern = re.compile(r'^[a-zA-Z_]+Error\((.*)\)$')
return bool(error_pattern.match(s))
result = python_repl.run(code)
if is_error_string(result):
return {"error": result}, 400
return {"output": result}, 200
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(execute_code)
try:
result, status_code = future.result(timeout=timeout_seconds)
except TimeoutError:
return jsonify({"error": "Code execution timed out"}), 408
except Exception as e:
return jsonify({"error": str(e)}), 400
return jsonify(result), status_code
if __name__ == '__main__':
from waitress import serve
serve(app, host=PYTHON_EXECUTOR_HOST, port=PYTHON_EXECUTOR_PORT)