|
import os |
|
import hashlib |
|
import random |
|
import string |
|
import time |
|
import requests |
|
import json |
|
|
|
class ConanClient: |
|
def __init__(self, ak, sk, url, timeout=30): |
|
""" |
|
Initialize the Client |
|
|
|
Args: |
|
ak: Access Key |
|
sk: Secret Key |
|
url: Server URL |
|
timeout: Request timeout in seconds (default: 30) |
|
""" |
|
self.ak = ak |
|
self.sk = sk |
|
self.url = url |
|
self.timeout = timeout |
|
|
|
def __random_password(self, size=40, chars=None): |
|
if chars is None: |
|
chars = string.ascii_uppercase + string.ascii_lowercase + string.digits |
|
random_chars = random.SystemRandom().choice |
|
return "".join(random_chars(chars) for _ in range(size)) |
|
|
|
def __signature(self, random_str, time_stamp): |
|
params_str = "%s:%d:%s:%s" % (self.ak, time_stamp, random_str, self.sk) |
|
encoded_params_str = params_str.encode("utf-8") |
|
return hashlib.md5(encoded_params_str).hexdigest() |
|
|
|
def get_signature(self): |
|
timestamp = int(time.time()) |
|
random_str = self.__random_password(20) |
|
sig = self.__signature(random_str, timestamp) |
|
params = { |
|
"timestamp": timestamp, |
|
"random": random_str, |
|
"app_id": self.ak, |
|
"sign": sig, |
|
} |
|
return params |
|
|
|
def embed(self, text): |
|
""" |
|
Embed text using the server |
|
|
|
Args: |
|
text: The input text to embed |
|
|
|
Returns: |
|
requests.Response object |
|
""" |
|
params = self.get_signature() |
|
params["body"] = text |
|
params["content_id"] = f"test_{int(time.time())}" |
|
|
|
headers = {"Content-Type": "application/json"} |
|
|
|
rsp = requests.post(self.url, data=json.dumps(params), timeout=self.timeout, headers=headers) |
|
result = json.loads(rsp.text) |
|
return result |