File size: 1,890 Bytes
1e9cb1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import os
import hashlib
import random
import string
import time
import requests
import json

class ConanClient:
    def __init__(self, ak, sk, url, timeout=30):
        """
        Initialize the Client

        Args:
            ak: Access Key
            sk: Secret Key
            url: Server URL
            timeout: Request timeout in seconds (default: 30)
        """
        self.ak = ak
        self.sk = sk
        self.url = url
        self.timeout = timeout

    def __random_password(self, size=40, chars=None):
        if chars is None:
            chars = string.ascii_uppercase + string.ascii_lowercase + string.digits
        random_chars = random.SystemRandom().choice
        return "".join(random_chars(chars) for _ in range(size))

    def __signature(self, random_str, time_stamp):
        params_str = "%s:%d:%s:%s" % (self.ak, time_stamp, random_str, self.sk)
        encoded_params_str = params_str.encode("utf-8")
        return hashlib.md5(encoded_params_str).hexdigest()

    def get_signature(self):
        timestamp = int(time.time())
        random_str = self.__random_password(20)
        sig = self.__signature(random_str, timestamp)
        params = {
            "timestamp": timestamp,
            "random": random_str,
            "app_id": self.ak,
            "sign": sig,
        }
        return params

    def embed(self, text):
        """
        Embed text using the server

        Args:
            text: The input text to embed

        Returns:
            requests.Response object
        """
        params = self.get_signature()
        params["body"] = text
        params["content_id"] = f"test_{int(time.time())}"

        headers = {"Content-Type": "application/json"}

        rsp = requests.post(self.url, data=json.dumps(params), timeout=self.timeout, headers=headers)
        result = json.loads(rsp.text)
        return result