#!/usr/bin/python3 # -*- coding: utf-8 -*- import argparse import logging import dingtalk_stream from dingtalk_stream import AckMessage from project_settings import project_path from project_settings import environment def get_args(): parser = argparse.ArgumentParser() parser.add_argument( "--client_id", default=environment.get("client_id"), type=str, ) parser.add_argument( "--client_secret", default=environment.get("client_secret"), type=str, ) args = parser.parse_args() return args class EchoTextHandler(dingtalk_stream.ChatbotHandler): def __init__(self): super(EchoTextHandler, self).__init__() async def process(self, callback: dingtalk_stream.CallbackMessage): incoming_message = dingtalk_stream.ChatbotMessage.from_dict(callback.data) text = incoming_message.text.content.strip() self.reply_text(text, incoming_message) return AckMessage.STATUS_OK, "OK" def main(): args = get_args() credential = dingtalk_stream.Credential( client_id=args.client_id, client_secret=args.client_secret, ) client = dingtalk_stream.DingTalkStreamClient(credential) client.register_callback_handler( dingtalk_stream.chatbot.ChatbotMessage.TOPIC, EchoTextHandler() ) client.start_forever() return if __name__ == '__main__': main()