From 1a5030a292d003b43ed754b5f03efd7cb0ecea80 Mon Sep 17 00:00:00 2001 From: captainsuo Date: Mon, 2 Jun 2025 19:49:19 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96=5F=5Fstr=5F=5F?= =?UTF-8?q?=E8=BE=93=E5=87=BA=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dingtalk_stream/chatbot.py | 586 ++++++++++++++++++++++--------------- 1 file changed, 343 insertions(+), 243 deletions(-) diff --git a/dingtalk_stream/chatbot.py b/dingtalk_stream/chatbot.py index 80c717a..558d1c6 100644 --- a/dingtalk_stream/chatbot.py +++ b/dingtalk_stream/chatbot.py @@ -10,8 +10,13 @@ from .utils import DINGTALK_OPENAPI_ENDPOINT from concurrent.futures import ThreadPoolExecutor import uuid -from .card_instance import MarkdownCardInstance, AIMarkdownCardInstance, CarouselCardInstance, \ - MarkdownButtonCardInstance, RPAPluginCardInstance +from .card_instance import ( + MarkdownCardInstance, + AIMarkdownCardInstance, + CarouselCardInstance, + MarkdownButtonCardInstance, + RPAPluginCardInstance, +) import traceback @@ -24,11 +29,11 @@ def __init__(self): @classmethod def from_dict(cls, d): user = AtUser() - data = '' + data = "" for name, value in d.items(): - if name == 'dingtalkId': + if name == "dingtalkId": user.dingtalk_id = value - elif name == 'staffId': + elif name == "staffId": user.staff_id = value else: user.extensions[name] = value @@ -37,9 +42,9 @@ def from_dict(cls, d): def to_dict(self): result = self.extensions.copy() if self.dingtalk_id is not None: - result['dingtalkId'] = self.dingtalk_id + result["dingtalkId"] = self.dingtalk_id if self.staff_id is not None: - result['staffId'] = self.staff_id + result["staffId"] = self.staff_id return result @@ -50,15 +55,15 @@ def __init__(self): self.content = None self.extensions = {} - def __str__(self): - return 'TextContent(content=%s)' % self.content + def __repr__(self): + return f"TextContent(content={self.content!r})" @classmethod def from_dict(cls, d): content = TextContent() - data = '' + data = "" for name, value in d.items(): - if name == 'content': + if name == "content": content.content = value else: content.extensions[name] = value @@ -67,7 +72,7 @@ def from_dict(cls, d): def to_dict(self): result = self.extensions.copy() if self.content is not None: - result['content'] = self.content + result["content"] = self.content return result @@ -80,14 +85,14 @@ def __init__(self): def from_dict(cls, d): content = ImageContent() for name, value in d.items(): - if name == 'downloadCode': + if name == "downloadCode": content.download_code = value return content def to_dict(self): result = {} if self.download_code is not None: - result['downloadCode'] = self.download_code + result["downloadCode"] = self.download_code return result @@ -101,14 +106,14 @@ def from_dict(cls, d): content = RichTextContent() content.rich_text_list = [] for name, value in d.items(): - if name == 'richText': + if name == "richText": content.rich_text_list = value return content def to_dict(self): result = {} if self.rich_text_list is not None: - result['richText'] = self.rich_text_list + result["richText"] = self.rich_text_list return result @@ -150,14 +155,14 @@ def to_dict(self): result = { "readStatus": self.read_status, "senderUserId": self.sender_user_id, - "sendTime": self.send_time + "sendTime": self.send_time, } return result class ChatbotMessage(object): - TOPIC = '/v1.0/im/bot/messages/get' - DELEGATE_TOPIC = '/v1.0/im/bot/messages/delegate' + TOPIC = "/v1.0/im/bot/messages/get" + DELEGATE_TOPIC = "/v1.0/im/bot/messages/delegate" text: TextContent def __init__(self): @@ -190,55 +195,55 @@ def __init__(self): @classmethod def from_dict(cls, d): msg = ChatbotMessage() - data = '' + data = "" for name, value in d.items(): - if name == 'isInAtList': + if name == "isInAtList": msg.is_in_at_list = value - elif name == 'sessionWebhook': + elif name == "sessionWebhook": msg.session_webhook = value - elif name == 'senderNick': + elif name == "senderNick": msg.sender_nick = value - elif name == 'robotCode': + elif name == "robotCode": msg.robot_code = value - elif name == 'sessionWebhookExpiredTime': + elif name == "sessionWebhookExpiredTime": msg.session_webhook_expired_time = int(value) - elif name == 'msgId': + elif name == "msgId": msg.message_id = value - elif name == 'senderId': + elif name == "senderId": msg.sender_id = value - elif name == 'chatbotUserId': + elif name == "chatbotUserId": msg.chatbot_user_id = value - elif name == 'conversationId': + elif name == "conversationId": msg.conversation_id = value - elif name == 'isAdmin': + elif name == "isAdmin": msg.is_admin = value - elif name == 'createAt': + elif name == "createAt": msg.create_at = value - elif name == 'conversationType': + elif name == "conversationType": msg.conversation_type = value - elif name == 'atUsers': + elif name == "atUsers": msg.at_users = [AtUser.from_dict(i) for i in value] - elif name == 'chatbotCorpId': + elif name == "chatbotCorpId": msg.chatbot_corp_id = value - elif name == 'senderCorpId': + elif name == "senderCorpId": msg.sender_corp_id = value - elif name == 'conversationTitle': + elif name == "conversationTitle": msg.conversation_title = value - elif name == 'msgtype': + elif name == "msgtype": msg.message_type = value - if value == 'text': - msg.text = TextContent.from_dict(d['text']) - elif value == 'picture': - msg.image_content = ImageContent.from_dict(d['content']) - elif value == 'richText': - msg.rich_text_content = RichTextContent.from_dict(d['content']) - elif name == 'senderStaffId': + if value == "text": + msg.text = TextContent.from_dict(d["text"]) + elif value == "picture": + msg.image_content = ImageContent.from_dict(d["content"]) + elif value == "richText": + msg.rich_text_content = RichTextContent.from_dict(d["content"]) + elif name == "senderStaffId": msg.sender_staff_id = value - elif name == 'hostingContext': + elif name == "hostingContext": msg.hosting_context = HostingContext() msg.hosting_context.user_id = value["userId"] msg.hosting_context.nick = value["nick"] - elif name == 'conversationMsgContext': + elif name == "conversationMsgContext": msg.conversation_msg_context = [] for v in value: conversation_msg = ConversationMessage() @@ -254,90 +259,92 @@ def from_dict(cls, d): def to_dict(self): result = self.extensions.copy() if self.is_in_at_list is not None: - result['isInAtList'] = self.is_in_at_list + result["isInAtList"] = self.is_in_at_list if self.session_webhook is not None: - result['sessionWebhook'] = self.session_webhook + result["sessionWebhook"] = self.session_webhook if self.sender_nick is not None: - result['senderNick'] = self.sender_nick + result["senderNick"] = self.sender_nick if self.robot_code is not None: - result['robotCode'] = self.robot_code + result["robotCode"] = self.robot_code if self.session_webhook_expired_time is not None: - result['sessionWebhookExpiredTime'] = self.session_webhook_expired_time + result["sessionWebhookExpiredTime"] = self.session_webhook_expired_time if self.message_id is not None: - result['msgId'] = self.message_id + result["msgId"] = self.message_id if self.sender_id is not None: - result['senderId'] = self.sender_id + result["senderId"] = self.sender_id if self.chatbot_user_id is not None: - result['chatbotUserId'] = self.chatbot_user_id + result["chatbotUserId"] = self.chatbot_user_id if self.conversation_id is not None: - result['conversationId'] = self.conversation_id + result["conversationId"] = self.conversation_id if self.is_admin is not None: - result['isAdmin'] = self.is_admin + result["isAdmin"] = self.is_admin if self.create_at is not None: - result['createAt'] = self.create_at + result["createAt"] = self.create_at if self.text is not None: - result['text'] = self.text.to_dict() + result["text"] = self.text.to_dict() if self.image_content is not None: - result['content'] = self.image_content.to_dict() + result["content"] = self.image_content.to_dict() if self.rich_text_content is not None: - result['content'] = self.rich_text_content.to_dict() + result["content"] = self.rich_text_content.to_dict() if self.conversation_type is not None: - result['conversationType'] = self.conversation_type + result["conversationType"] = self.conversation_type if self.at_users is not None: - result['atUsers'] = [i.to_dict() for i in self.at_users] + result["atUsers"] = [i.to_dict() for i in self.at_users] if self.chatbot_corp_id is not None: - result['chatbotCorpId'] = self.chatbot_corp_id + result["chatbotCorpId"] = self.chatbot_corp_id if self.sender_corp_id is not None: - result['senderCorpId'] = self.sender_corp_id + result["senderCorpId"] = self.sender_corp_id if self.conversation_title is not None: - result['conversationTitle'] = self.conversation_title + result["conversationTitle"] = self.conversation_title if self.message_type is not None: - result['msgtype'] = self.message_type + result["msgtype"] = self.message_type if self.sender_staff_id is not None: - result['senderStaffId'] = self.sender_staff_id + result["senderStaffId"] = self.sender_staff_id if self.hosting_context is not None: - result['hostingContext'] = self.hosting_context.to_dict() + result["hostingContext"] = self.hosting_context.to_dict() if self.conversation_msg_context is not None: - result['conversationMsgContext'] = [v.to_dict() for v in self.conversation_msg_context] + result["conversationMsgContext"] = [ + v.to_dict() for v in self.conversation_msg_context + ] return result def get_text_list(self): - if self.message_type == 'text': + if self.message_type == "text": return [self.text.content] - elif self.message_type == 'richText': + elif self.message_type == "richText": text = [] for item in self.rich_text_content.rich_text_list: - if 'text' in item: + if "text" in item: text.append(item["text"]) return text def get_image_list(self): - if self.message_type == 'picture': + if self.message_type == "picture": return [self.image_content.download_code] - elif self.message_type == 'richText': + elif self.message_type == "richText": images = [] for item in self.rich_text_content.rich_text_list: - if 'downloadCode' in item: - images.append(item['downloadCode']) + if "downloadCode" in item: + images.append(item["downloadCode"]) return images - def __str__(self): - return 'ChatbotMessage(message_type=%s, text=%s, sender_nick=%s, conversation_title=%s)' % ( - self.message_type, - self.text, - self.sender_nick, - self.conversation_title, + def __repr__(self): + return ( + f"ChatbotMessage(message_type={self.message_type!r}, text={self.text!r}, " + f"sender_nick={self.sender_nick!r}, conversation_title={self.conversation_title!r})" ) -def reply_specified_single_chat(user_id: str, user_nickname: str = "") -> ChatbotMessage: +def reply_specified_single_chat( + user_id: str, user_nickname: str = "" +) -> ChatbotMessage: d = { "senderId": user_id, "senderStaffId": user_id, "sender": user_nickname, - "conversationType": '1', + "conversationType": "1", "messageId": str(uuid.uuid1()), } return ChatbotMessage.from_dict(d) @@ -346,7 +353,7 @@ def reply_specified_single_chat(user_id: str, user_nickname: str = "") -> Chatbo def reply_specified_group_chat(open_conversation_id: str) -> ChatbotMessage: d = { "conversationId": open_conversation_id, - "conversationType": '2', + "conversationType": "2", "messageId": str(uuid.uuid1()), } return ChatbotMessage.from_dict(d) @@ -357,8 +364,15 @@ class ChatbotHandler(CallbackHandler): def __init__(self): super(ChatbotHandler, self).__init__() - def reply_markdown_card(self, markdown: str, incoming_message: ChatbotMessage, title: str = "", logo: str = "", - at_sender: bool = False, at_all: bool = False) -> MarkdownCardInstance: + def reply_markdown_card( + self, + markdown: str, + incoming_message: ChatbotMessage, + title: str = "", + logo: str = "", + at_sender: bool = False, + at_all: bool = False, + ) -> MarkdownCardInstance: """ 回复一个markdown卡片 :param markdown: @@ -369,22 +383,27 @@ def reply_markdown_card(self, markdown: str, incoming_message: ChatbotMessage, t :param at_all: :return: """ - markdown_card_instance = MarkdownCardInstance(self.dingtalk_client, incoming_message) + markdown_card_instance = MarkdownCardInstance( + self.dingtalk_client, incoming_message + ) markdown_card_instance.set_title_and_logo(title, logo) markdown_card_instance.reply(markdown, at_sender=at_sender, at_all=at_all) return markdown_card_instance - def reply_rpa_plugin_card(self, incoming_message: ChatbotMessage, - plugin_id: str = "", - plugin_version: str = "", - plugin_name: str = "", - ability_name: str = "", - plugin_args: dict = {}, - goal: str = "", - corp_id: str = "", - recipients: list = None) -> RPAPluginCardInstance: + def reply_rpa_plugin_card( + self, + incoming_message: ChatbotMessage, + plugin_id: str = "", + plugin_version: str = "", + plugin_name: str = "", + ability_name: str = "", + plugin_args: dict = {}, + goal: str = "", + corp_id: str = "", + recipients: list = None, + ) -> RPAPluginCardInstance: """ 回复一个markdown卡片 :param ability_name: @@ -399,17 +418,32 @@ def reply_rpa_plugin_card(self, incoming_message: ChatbotMessage, :return: """ - rpa_plugin_card_instance = RPAPluginCardInstance(self.dingtalk_client, incoming_message) + rpa_plugin_card_instance = RPAPluginCardInstance( + self.dingtalk_client, incoming_message + ) rpa_plugin_card_instance.set_goal(goal) rpa_plugin_card_instance.set_corp_id(corp_id) - rpa_plugin_card_instance.reply(plugin_id, plugin_version, plugin_name, ability_name, plugin_args, - recipients=recipients) + rpa_plugin_card_instance.reply( + plugin_id, + plugin_version, + plugin_name, + ability_name, + plugin_args, + recipients=recipients, + ) return rpa_plugin_card_instance - def reply_markdown_button(self, incoming_message: ChatbotMessage, markdown: str, button_list: list, tips: str = "", - title: str = "", logo: str = "") -> MarkdownButtonCardInstance: + def reply_markdown_button( + self, + incoming_message: ChatbotMessage, + markdown: str, + button_list: list, + tips: str = "", + title: str = "", + logo: str = "", + ) -> MarkdownButtonCardInstance: """ 回复一个带button的卡片 :param tips: @@ -420,22 +454,26 @@ def reply_markdown_button(self, incoming_message: ChatbotMessage, markdown: str, :param logo: :return: """ - markdown_button_instance = MarkdownButtonCardInstance(self.dingtalk_client, incoming_message) + markdown_button_instance = MarkdownButtonCardInstance( + self.dingtalk_client, incoming_message + ) markdown_button_instance.set_title_and_logo(title, logo) markdown_button_instance.reply(markdown, button_list, tips=tips) return markdown_button_instance - def reply_ai_markdown_button(self, - incoming_message: ChatbotMessage, - markdown: str, - button_list: list, - tips: str = "", - title: str = "", - logo: str = "", - recipients: list = None, - support_forward: bool = True) -> AIMarkdownCardInstance: + def reply_ai_markdown_button( + self, + incoming_message: ChatbotMessage, + markdown: str, + button_list: list, + tips: str = "", + title: str = "", + logo: str = "", + recipients: list = None, + support_forward: bool = True, + ) -> AIMarkdownCardInstance: """ 回复一个带button的ai卡片 :param support_forward: @@ -448,18 +486,30 @@ def reply_ai_markdown_button(self, :param logo: :return: """ - markdown_button_instance = AIMarkdownCardInstance(self.dingtalk_client, incoming_message) + markdown_button_instance = AIMarkdownCardInstance( + self.dingtalk_client, incoming_message + ) markdown_button_instance.set_title_and_logo(title, logo) - markdown_button_instance.ai_start(recipients=recipients, support_forward=support_forward) + markdown_button_instance.ai_start( + recipients=recipients, support_forward=support_forward + ) markdown_button_instance.ai_streaming(markdown=markdown, append=True) - markdown_button_instance.ai_finish(markdown=markdown, button_list=button_list, tips=tips) + markdown_button_instance.ai_finish( + markdown=markdown, button_list=button_list, tips=tips + ) return markdown_button_instance - def reply_carousel_card(self, incoming_message: ChatbotMessage, markdown: str, image_slider, button_text, - title: str = "", - logo: str = "") -> CarouselCardInstance: + def reply_carousel_card( + self, + incoming_message: ChatbotMessage, + markdown: str, + image_slider, + button_text, + title: str = "", + logo: str = "", + ) -> CarouselCardInstance: """ 回复一个轮播图卡片 :param markdown: @@ -470,15 +520,22 @@ def reply_carousel_card(self, incoming_message: ChatbotMessage, markdown: str, i :param button_text: :return: """ - carousel_card_instance = CarouselCardInstance(self.dingtalk_client, incoming_message) + carousel_card_instance = CarouselCardInstance( + self.dingtalk_client, incoming_message + ) carousel_card_instance.set_title_and_logo(title, logo) carousel_card_instance.reply(markdown, image_slider, button_text) return carousel_card_instance - def ai_markdown_card_start(self, incoming_message: ChatbotMessage, title: str = "", - logo: str = "", recipients: list = None) -> AIMarkdownCardInstance: + def ai_markdown_card_start( + self, + incoming_message: ChatbotMessage, + title: str = "", + logo: str = "", + recipients: list = None, + ) -> AIMarkdownCardInstance: """ 发起一个AI卡片 :param recipients: @@ -487,13 +544,17 @@ def ai_markdown_card_start(self, incoming_message: ChatbotMessage, title: str = :param logo: :return: """ - ai_markdown_card_instance = AIMarkdownCardInstance(self.dingtalk_client, incoming_message) + ai_markdown_card_instance = AIMarkdownCardInstance( + self.dingtalk_client, incoming_message + ) ai_markdown_card_instance.set_title_and_logo(title, logo) ai_markdown_card_instance.ai_start(recipients=recipients) return ai_markdown_card_instance - def extract_text_from_incoming_message(self, incoming_message: ChatbotMessage) -> list: + def extract_text_from_incoming_message( + self, incoming_message: ChatbotMessage + ) -> list: """ 获取文本列表 :param incoming_message: @@ -501,7 +562,9 @@ def extract_text_from_incoming_message(self, incoming_message: ChatbotMessage) - """ return incoming_message.get_text_list() - def extract_image_from_incoming_message(self, incoming_message: ChatbotMessage) -> list: + def extract_image_from_incoming_message( + self, incoming_message: ChatbotMessage + ) -> list: """ 获取用户发送的图片,重新上传,获取新的mediaId列表 :param incoming_message: @@ -516,9 +579,12 @@ def extract_image_from_incoming_message(self, incoming_message: ChatbotMessage) download_url = self.get_image_download_url(download_code) image_content = requests.get(download_url) - mediaid = self.dingtalk_client.upload_to_dingtalk(image_content.content, filetype='image', - filename='image.png', - mimetype='image/png') + mediaid = self.dingtalk_client.upload_to_dingtalk( + image_content.content, + filetype="image", + filename="image.png", + mimetype="image/png", + ) mediaids.append(mediaid) @@ -532,35 +598,41 @@ def get_image_download_url(self, download_code: str) -> str: """ access_token = self.dingtalk_client.get_access_token() if not access_token: - self.logger.error('send_off_duty_prompt failed, cannot get dingtalk access token') + self.logger.error( + "send_off_duty_prompt failed, cannot get dingtalk access token" + ) return None request_headers = { - 'Content-Type': 'application/json', - 'Accept': '*/*', - 'x-acs-dingtalk-access-token': access_token, - 'User-Agent': ('DingTalkStream/1.0 SDK/0.1.0 Python/%s ' - '(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)' - ) % platform.python_version(), + "Content-Type": "application/json", + "Accept": "*/*", + "x-acs-dingtalk-access-token": access_token, + "User-Agent": ( + "DingTalkStream/1.0 SDK/0.1.0 Python/%s " + "(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)" + ) + % platform.python_version(), } values = { - 'robotCode': self.dingtalk_client.credential.client_id, - 'downloadCode': download_code, + "robotCode": self.dingtalk_client.credential.client_id, + "downloadCode": download_code, } - url = DINGTALK_OPENAPI_ENDPOINT + '/v1.0/robot/messageFiles/download' + url = DINGTALK_OPENAPI_ENDPOINT + "/v1.0/robot/messageFiles/download" try: - response_text = '' - response = requests.post(url, - headers=request_headers, - data=json.dumps(values)) + response_text = "" + response = requests.post( + url, headers=request_headers, data=json.dumps(values) + ) response_text = response.text response.raise_for_status() except Exception as e: - self.logger.error(f'get_image_download_url, error={e}, response.text={response_text}') + self.logger.error( + f"get_image_download_url, error={e}, response.text={response_text}" + ) return "" return response.json()["downloadUrl"] @@ -574,7 +646,9 @@ def set_off_duty_prompt(self, text: str, title: str = "", logo: str = ""): """ access_token = self.dingtalk_client.get_access_token() if not access_token: - self.logger.error('send_off_duty_prompt failed, cannot get dingtalk access token') + self.logger.error( + "send_off_duty_prompt failed, cannot get dingtalk access token" + ) return None if title is None or title == "": @@ -583,104 +657,118 @@ def set_off_duty_prompt(self, text: str, title: str = "", logo: str = ""): if logo is None or logo == "": logo = "@lALPDfJ6V_FPDmvNAfTNAfQ" - prompt_card_data = generate_multi_text_line_card_data(title=title, logo=logo, texts=[text]) + prompt_card_data = generate_multi_text_line_card_data( + title=title, logo=logo, texts=[text] + ) request_headers = { - 'Content-Type': 'application/json', - 'Accept': '*/*', - 'x-acs-dingtalk-access-token': access_token, - 'User-Agent': ('DingTalkStream/1.0 SDK/0.1.0 Python/%s ' - '(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)' - ) % platform.python_version(), + "Content-Type": "application/json", + "Accept": "*/*", + "x-acs-dingtalk-access-token": access_token, + "User-Agent": ( + "DingTalkStream/1.0 SDK/0.1.0 Python/%s " + "(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)" + ) + % platform.python_version(), } values = { - 'robotCode': self.dingtalk_client.credential.client_id, - 'cardData': json.dumps(prompt_card_data), - 'cardTemplateId': "StandardCard", + "robotCode": self.dingtalk_client.credential.client_id, + "cardData": json.dumps(prompt_card_data), + "cardTemplateId": "StandardCard", } - url = DINGTALK_OPENAPI_ENDPOINT + '/v1.0/innerApi/robot/stream/away/template/update' + url = ( + DINGTALK_OPENAPI_ENDPOINT + + "/v1.0/innerApi/robot/stream/away/template/update" + ) try: - response_text = '' - response = requests.post(url, - headers=request_headers, - data=json.dumps(values)) + response_text = "" + response = requests.post( + url, headers=request_headers, data=json.dumps(values) + ) response_text = response.text response.raise_for_status() except Exception as e: - self.logger.error(f'set_off_duty_prompt, error={e}, response.text={response_text}') + self.logger.error( + f"set_off_duty_prompt, error={e}, response.text={response_text}" + ) return response.status_code return response.json() - def reply_text(self, - text: str, - incoming_message: ChatbotMessage): + def reply_text(self, text: str, incoming_message: ChatbotMessage): request_headers = { - 'Content-Type': 'application/json', - 'Accept': '*/*', + "Content-Type": "application/json", + "Accept": "*/*", } values = { - 'msgtype': 'text', - 'text': { - 'content': text, + "msgtype": "text", + "text": { + "content": text, + }, + "at": { + "atUserIds": [incoming_message.sender_staff_id], }, - 'at': { - 'atUserIds': [incoming_message.sender_staff_id], - } } try: - response_text = '' - response = requests.post(incoming_message.session_webhook, - headers=request_headers, - data=json.dumps(values)) + response_text = "" + response = requests.post( + incoming_message.session_webhook, + headers=request_headers, + data=json.dumps(values), + ) response_text = response.text - + response.raise_for_status() except Exception as e: - self.logger.error(f'reply text failed, error={e}, response.text={response_text}') + self.logger.error( + f"reply text failed, error={e}, response.text={response_text}" + ) return None return response.json() - def reply_markdown(self, - title: str, - text: str, - incoming_message: ChatbotMessage): + def reply_markdown(self, title: str, text: str, incoming_message: ChatbotMessage): request_headers = { - 'Content-Type': 'application/json', - 'Accept': '*/*', + "Content-Type": "application/json", + "Accept": "*/*", } values = { - 'msgtype': 'markdown', - 'markdown': { - 'title': title, - 'text': text, + "msgtype": "markdown", + "markdown": { + "title": title, + "text": text, + }, + "at": { + "atUserIds": [incoming_message.sender_staff_id], }, - 'at': { - 'atUserIds': [incoming_message.sender_staff_id], - } } try: - response_text = '' - response = requests.post(incoming_message.session_webhook, - headers=request_headers, - data=json.dumps(values)) - response_text = response.text + response_text = "" + response = requests.post( + incoming_message.session_webhook, + headers=request_headers, + data=json.dumps(values), + ) + response_text = response.text response.raise_for_status() except Exception as e: - self.logger.error(f'reply markdown failed, error={e}, response.text={response_text}') + self.logger.error( + f"reply markdown failed, error={e}, response.text={response_text}" + ) return None return response.json() - def reply_card(self, - card_data: dict, - incoming_message: ChatbotMessage, - at_sender: bool = False, - at_all: bool = False, - **kwargs) -> str: + def reply_card( + self, + card_data: dict, + incoming_message: ChatbotMessage, + at_sender: bool = False, + at_all: bool = False, + **kwargs, + ) -> str: """ 机器人回复互动卡片。由于 sessionWebhook 不支持发送互动卡片,所以需要使用 OpenAPI,当前仅支持自建应用。 https://open.dingtalk.com/document/orgapp/robots-send-interactive-cards @@ -694,16 +782,19 @@ def reply_card(self, access_token = self.dingtalk_client.get_access_token() if not access_token: self.logger.error( - 'simple_reply_interactive_card_only_for_inner_app failed, cannot get dingtalk access token') + "simple_reply_interactive_card_only_for_inner_app failed, cannot get dingtalk access token" + ) return None request_headers = { - 'Content-Type': 'application/json', - 'Accept': '*/*', - 'x-acs-dingtalk-access-token': access_token, - 'User-Agent': ('DingTalkStream/1.0 SDK/0.1.0 Python/%s ' - '(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)' - ) % platform.python_version(), + "Content-Type": "application/json", + "Accept": "*/*", + "x-acs-dingtalk-access-token": access_token, + "User-Agent": ( + "DingTalkStream/1.0 SDK/0.1.0 Python/%s " + "(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)" + ) + % platform.python_version(), } card_biz_id = self._gen_card_id(incoming_message) @@ -720,12 +811,10 @@ def reply_card(self, "cardBizId": card_biz_id, } - if incoming_message.conversation_type == '2': + if incoming_message.conversation_type == "2": body["openConversationId"] = incoming_message.conversation_id - elif incoming_message.conversation_type == '1': - single_chat_receiver = { - "userId": incoming_message.sender_staff_id - } + elif incoming_message.conversation_type == "1": + single_chat_receiver = {"userId": incoming_message.sender_staff_id} body["singleChatReceiver"] = json.dumps(single_chat_receiver) if at_all: @@ -737,25 +826,27 @@ def reply_card(self, user_list_json = [ { "nickName": incoming_message.sender_nick, - "userId": incoming_message.sender_staff_id + "userId": incoming_message.sender_staff_id, } ] - body["sendOptions"]["atUserListJson"] = json.dumps(user_list_json, ensure_ascii=False) + body["sendOptions"]["atUserListJson"] = json.dumps( + user_list_json, ensure_ascii=False + ) body.update(**kwargs) - url = DINGTALK_OPENAPI_ENDPOINT + '/v1.0/im/v1.0/robot/interactiveCards/send' + url = DINGTALK_OPENAPI_ENDPOINT + "/v1.0/im/v1.0/robot/interactiveCards/send" try: - response_text = '' - response = requests.post(url, - headers=request_headers, - json=body) + response_text = "" + response = requests.post(url, headers=request_headers, json=body) response_text = response.text response.raise_for_status() return card_biz_id except Exception as e: - self.logger.error(f'reply card failed, error={e}, response.text={response_text}') + self.logger.error( + f"reply card failed, error={e}, response.text={response_text}" + ) return "" def update_card(self, card_biz_id: str, card_data: dict): @@ -768,42 +859,51 @@ def update_card(self, card_biz_id: str, card_data: dict): """ access_token = self.dingtalk_client.get_access_token() if not access_token: - self.logger.error('update_card failed, cannot get dingtalk access token') + self.logger.error("update_card failed, cannot get dingtalk access token") return None request_headers = { - 'Content-Type': 'application/json', - 'Accept': '*/*', - 'x-acs-dingtalk-access-token': access_token, - 'User-Agent': ('DingTalkStream/1.0 SDK/0.1.0 Python/%s ' - '(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)' - ) % platform.python_version(), + "Content-Type": "application/json", + "Accept": "*/*", + "x-acs-dingtalk-access-token": access_token, + "User-Agent": ( + "DingTalkStream/1.0 SDK/0.1.0 Python/%s " + "(+https://github.com/open-dingtalk/dingtalk-stream-sdk-python)" + ) + % platform.python_version(), } values = { - 'cardBizId': card_biz_id, - 'cardData': json.dumps(card_data), + "cardBizId": card_biz_id, + "cardData": json.dumps(card_data), } - url = DINGTALK_OPENAPI_ENDPOINT + '/v1.0/im/robots/interactiveCards' + url = DINGTALK_OPENAPI_ENDPOINT + "/v1.0/im/robots/interactiveCards" try: - response_text = '' - response = requests.put(url, - headers=request_headers, - data=json.dumps(values)) + response_text = "" + response = requests.put( + url, headers=request_headers, data=json.dumps(values) + ) response_text = response.text - + response.raise_for_status() except Exception as e: - self.logger.error(f'update card failed, error={e}, response.text={response_text}') + self.logger.error( + f"update card failed, error={e}, response.text={response_text}" + ) return response.status_code return response.json() @staticmethod def _gen_card_id(msg: ChatbotMessage): - factor = '%s_%s_%s_%s_%s' % ( - msg.sender_id, msg.sender_corp_id, msg.conversation_id, msg.message_id, str(uuid.uuid1())) + factor = "%s_%s_%s_%s_%s" % ( + msg.sender_id, + msg.sender_corp_id, + msg.conversation_id, + msg.message_id, + str(uuid.uuid1()), + ) m = hashlib.sha256() - m.update(factor.encode('utf-8')) + m.update(factor.encode("utf-8")) return m.hexdigest() @@ -819,12 +919,12 @@ def __init__(self, max_workers: int = 8): self.async_executor = ThreadPoolExecutor(max_workers=max_workers) def process(self, message): - ''' + """ 不要用 async 修饰 :param message: :return: - ''' - return AckMessage.STATUS_NOT_IMPLEMENT, 'not implement' + """ + return AckMessage.STATUS_NOT_IMPLEMENT, "not implement" async def raw_process(self, callback_message: CallbackMessage): def func():