Spaces:
Paused
Paused
#!/usr/bin/python3 | |
# -*- coding: utf-8 -*- | |
import argparse | |
import logging | |
import dingtalk_stream | |
from dingtalk_stream import AckMessage | |
from project_settings import project_path | |
from project_settings import environment | |
def get_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument( | |
"--client_id", | |
default=environment.get("client_id"), | |
type=str, | |
) | |
parser.add_argument( | |
"--client_secret", | |
default=environment.get("client_secret"), | |
type=str, | |
) | |
args = parser.parse_args() | |
return args | |
class EchoTextHandler(dingtalk_stream.ChatbotHandler): | |
def __init__(self): | |
super(EchoTextHandler, self).__init__() | |
async def process(self, callback: dingtalk_stream.CallbackMessage): | |
incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) | |
text = incoming_message.text.content.strip() | |
self.reply_text(text, incoming_message) | |
return AckMessage.STATUS_OK, "OK" | |
def main(): | |
args = get_args() | |
credential = dingtalk_stream.Credential( | |
client_id=args.client_id, | |
client_secret=args.client_secret, | |
) | |
client = dingtalk_stream.DingTalkStreamClient(credential) | |
client.register_callback_handler( | |
dingtalk_stream.chatbot.ChatbotMessage.TOPIC, | |
EchoTextHandler() | |
) | |
client.start_forever() | |
return | |
if __name__ == '__main__': | |
main() | |