import os import hashlib import random import string import time import requests import json class ConanClient: def __init__(self, ak, sk, url, timeout=30): """ Initialize the Client Args: ak: Access Key sk: Secret Key url: Server URL timeout: Request timeout in seconds (default: 30) """ self.ak = ak self.sk = sk self.url = url self.timeout = timeout def __random_password(self, size=40, chars=None): if chars is None: chars = string.ascii_uppercase + string.ascii_lowercase + string.digits random_chars = random.SystemRandom().choice return "".join(random_chars(chars) for _ in range(size)) def __signature(self, random_str, time_stamp): params_str = "%s:%d:%s:%s" % (self.ak, time_stamp, random_str, self.sk) encoded_params_str = params_str.encode("utf-8") return hashlib.md5(encoded_params_str).hexdigest() def get_signature(self): timestamp = int(time.time()) random_str = self.__random_password(20) sig = self.__signature(random_str, timestamp) params = { "timestamp": timestamp, "random": random_str, "app_id": self.ak, "sign": sig, } return params def embed(self, text): """ Embed text using the server Args: text: The input text to embed Returns: requests.Response object """ params = self.get_signature() params["body"] = text params["content_id"] = f"test_{int(time.time())}" headers = {"Content-Type": "application/json"} rsp = requests.post(self.url, data=json.dumps(params), timeout=self.timeout, headers=headers) result = json.loads(rsp.text) return result