File size: 1,890 Bytes
1e9cb1d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
import os
import hashlib
import random
import string
import time
import requests
import json
class ConanClient:
def __init__(self, ak, sk, url, timeout=30):
"""
Initialize the Client
Args:
ak: Access Key
sk: Secret Key
url: Server URL
timeout: Request timeout in seconds (default: 30)
"""
self.ak = ak
self.sk = sk
self.url = url
self.timeout = timeout
def __random_password(self, size=40, chars=None):
if chars is None:
chars = string.ascii_uppercase + string.ascii_lowercase + string.digits
random_chars = random.SystemRandom().choice
return "".join(random_chars(chars) for _ in range(size))
def __signature(self, random_str, time_stamp):
params_str = "%s:%d:%s:%s" % (self.ak, time_stamp, random_str, self.sk)
encoded_params_str = params_str.encode("utf-8")
return hashlib.md5(encoded_params_str).hexdigest()
def get_signature(self):
timestamp = int(time.time())
random_str = self.__random_password(20)
sig = self.__signature(random_str, timestamp)
params = {
"timestamp": timestamp,
"random": random_str,
"app_id": self.ak,
"sign": sig,
}
return params
def embed(self, text):
"""
Embed text using the server
Args:
text: The input text to embed
Returns:
requests.Response object
"""
params = self.get_signature()
params["body"] = text
params["content_id"] = f"test_{int(time.time())}"
headers = {"Content-Type": "application/json"}
rsp = requests.post(self.url, data=json.dumps(params), timeout=self.timeout, headers=headers)
result = json.loads(rsp.text)
return result |