飞书消息监听器 (lark_bot_listener)
pywayne.lark_bot_listener 提供基于飞书 WebSocket 事件流的实时监听能力,适合做自动回复、消息分流、文件落盘、卡片交互回调、群聊事件处理、reaction 联动等场景。
这一层的定位不是“替代 LarkBot”,而是:
LarkBot负责主动调用飞书 OpenAPI 发消息、改消息、回消息、加表情、置顶、群管理。LarkBotListener负责把飞书推送进来的消息与事件整理成好用的 decorator 和上下文对象。两者组合后,可以实现“收到什么消息,就按什么规则自动处理”的完整闭环。
能力概览
当前 LarkBotListener 已覆盖以下两类能力:
消息监听
文本
text图片
image文件
file音频
audio媒体
media贴纸
stickerrich_text 富文本
postcard 卡片消息
interactive任意消息类型统一入口
listen(message_type=None)
事件监听
消息撤回
recalled消息已读
message_readreaction 新增 / 删除
机器人被拉入群
机器人被移出群
机器人进入某个用户私聊
群成员加入 / 被移除 / 主动退出
群信息变更
群解散
卡片 action HTTP 回调
除此之外,监听器还内置了这些增强点:
按 handler 维度的消息去重
过期消息清理
群聊名 / 用户名自动补齐
sync/async handler 统一兼容
图片 / 文件 / 音频 / 媒体自动下载到临时目录
处理函数返回文件路径后自动重新上传回发
MessageContext携带message_id/thread_id/root_id/parent_id/mentions/raw_event
重点能力
如果你想快速抓住这个模块最核心的扩展能力,最值得关注的是这几组:
消息类型补齐
audio_handlermedia_handlersticker_handlerlisten(message_type="interactive")可直接监听“收到一张 card 卡片消息”
事件类型补齐
message_read_handlerreaction_handlerbot_added_handler/bot_removed_handlerbot_p2p_chat_entered_handlermember_changed_handlerchat_updated_handler/chat_disbanded_handler
机器人被 @ 的专门入口
mention_handler只处理“文本里真正 @ 到机器人”的消息
基于
MessageContext的流式卡片回复reply_streaming_cardupdate_streaming_cardrecolor_streaming_cardstream_reply_cardastream_reply_card
流式卡片状态色切换
流式进行中可用
template="blue"完成时可用
final_template="green"失败时可手动
recolor_streaming_card(..., template="red")
卡片交互 HTTP 回调
card_action_handler+get_card_action_handler可直接接入 Web 框架适合做按钮点击、表单提交、审批确认、二次操作等
最常见的闭环组合一般是:
收到消息 -> 临时加 reaction -> 引用回复 -> 取消 reaction
收到文本 -> 走 LLM 流式输出 -> 在同一张卡片内持续更新 -> 完成时变绿色
收到文件 / 图片 / 语音 -> 自动下载 -> 处理 -> 返回新文件路径自动回发
机器人被拉进群 -> 自动发欢迎说明
群成员变化 / 已读 / reaction 事件 -> 同步外部业务系统
MessageContext
- class MessageContext
通用消息上下文对象。凡是通过
listen(...)进入的消息,都会先被整理为MessageContext再传给你的 handler。关键字段如下:
chat_id: 会话 IDuser_id: 发送人 open_idmessage_type: 当前消息类型content: 解析后的消息内容is_group: 是否群聊chat_type: 飞书原始 chat typemessage_id: 消息 IDthread_id: 所在线程 IDroot_id: 根消息 IDparent_id: 父消息 IDmentions: 被 @ 列表raw_event: 飞书 SDK 原始事件对象
典型用法:
from pywayne.lark_bot_listener import LarkBotListener, MessageContext
from pywayne.tools import wayne_print
listener = LarkBotListener(app_id="cli_xxx", app_secret="sec_xxx")
@listener.listen()
async def dump_all(ctx: MessageContext):
wayne_print(
f"{ctx.message_type} {ctx.message_id} {ctx.chat_id} {ctx.thread_id}",
color="cyan"
)
示例:用 MessageContext 同时处理线程、群私聊、@ 信息
@listener.listen()
async def inspect_context(ctx: MessageContext):
if ctx.thread_id:
wayne_print(
f"线程消息 root={ctx.root_id} parent={ctx.parent_id}",
color="yellow",
bold=True
)
if ctx.mentions:
wayne_print(f"本条消息 mentions={ctx.mentions}", color="green")
if ctx.is_group:
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": "这是群聊消息"}
)
else:
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": "这是私聊消息"}
)
LarkBotListener 类
- class LarkBotListener(app_id: str, app_secret: str, message_expiry_time: int = 60)
创建监听器实例。
参数
app_id: 飞书应用 app idapp_secret: 飞书应用 app secretmessage_expiry_time: 去重缓存保留时间,单位秒,默认60
实例属性
bot: 内置LarkBot实例。监听到消息后,如果你要引用回复、加 reaction、撤回、下载资源、更新卡片,或者直接编辑已经发出的文本 / 富文本 / 卡片消息,都直接复用listener.bot即可。
核心方法
通用消息入口
- listen(message_type: str | None = None, group_only: bool = False, user_only: bool = False)
注册通用消息处理函数。
参数
message_type: 指定消息类型;为None表示所有消息都进来group_only: 只处理群聊user_only: 只处理私聊
适用场景
你需要完整的
MessageContext你需要拿
message_id做引用回复或 reaction你要统一处理多种消息类型
你不需要自动下载附件
示例:统一路由不同消息类型
@listener.listen()
async def router(ctx: MessageContext):
if ctx.message_type == "text":
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": "收到文本"}
)
elif ctx.message_type == "image":
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": "收到图片"}
)
示例:只监听群聊文本,并且在线程中继续回复
@listener.listen(message_type="text", group_only=True)
async def reply_in_thread_when_possible(ctx: MessageContext):
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": f"群消息收到:{ctx.content}"},
reply_in_thread=bool(ctx.thread_id)
)
示例:单个 handler 做完整分流
@listener.listen()
async def smart_router(ctx: MessageContext):
if ctx.message_type == "text" and ctx.content.startswith("/pin "):
reply = listener.bot.reply_message(
ctx.message_id,
"text",
{"text": ctx.content[5:]}
)
listener.bot.pin_message(reply["message_id"])
return
if ctx.message_type == "post":
listener.bot.add_reaction(ctx.message_id, "THUMBSUP")
return
if ctx.message_type == "interactive":
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": "收到一条卡片消息"}
)
发送 Markdown 回复
- send_message(chat_id: str, content: str)
一个轻量发送入口。内部使用 rich_text + markdown 内容发送到指定 chat。
它适合快速调试,不适合复杂业务。更推荐在正式业务中直接使用
listener.bot里的完整接口,比如:reply_messagesend_markdown_message_to_chatsend_card_to_chat
流式卡片回复
LarkBotListener 现在也提供了围绕 MessageContext 的流式卡片便捷方法:
reply_streaming_card(target, ...)update_streaming_card(card_message_id, md_text, ...)recolor_streaming_card(card_message_id, md_text, ...)stream_reply_card(target, text_stream, ...)astream_reply_card(target, text_stream, ...)
其中:
target可以直接传MessageContext,也可以传原始message_idupdate_streaming_card更新的是“回复出来的卡片消息 ID”,不是原始用户消息 IDfinal_template可用于在流式结束后自动切换卡片头部颜色,比如从blue切到green常用模板色包括
blue、green、orange、red、grey等;完整常用列表可用CardContentV2.list_header_templates()查看流式更新传入的是“当前完整文本”,不是本次新增的 delta
如果你需要 1 个字 1 个字刷新,请把
update_interval调小,但要注意单消息频率限制
示例:在监听器里直接接 LLM 流式输出
@listener.listen(message_type="text")
async def on_text(ctx: MessageContext):
async def fake_stream():
yield "你好,"
yield "这是"
yield "一段流式卡片回复。"
await listener.astream_reply_card(
ctx,
fake_stream(),
title="AI 回复中",
template="blue",
final_template="green",
final_status_text="已完成"
)
示例:一边流式输出,一边在完成后自动变色
@listener.listen(message_type="text")
async def on_text_stream(ctx: MessageContext):
async def llm_stream():
yield "第一段分析\n"
yield "第二段分析\n"
yield "第三段结论\n"
await listener.astream_reply_card(
ctx,
llm_stream(),
title="问题分析中",
template="wathet",
status_text="生成中...",
final_template="green",
final_status_text="分析完成",
update_interval=0.4
)
示例:先创建卡片,再手动多次更新
@listener.listen(message_type="text")
async def on_text_manual_stream(ctx: MessageContext):
reply = listener.reply_streaming_card(
ctx,
title="任务执行中",
template="blue",
initial_md="开始处理请求..."
)
card_message_id = reply["message_id"]
listener.update_streaming_card(
card_message_id,
"步骤 1:读取输入完成",
title="任务执行中"
)
listener.update_streaming_card(
card_message_id,
"步骤 1:读取输入完成\n步骤 2:执行分析完成",
title="任务执行中"
)
listener.recolor_streaming_card(
card_message_id,
"步骤 1:读取输入完成\n步骤 2:执行分析完成\n步骤 3:输出结果完成",
title="任务已完成",
template="green",
status_text="完成"
)
示例:任务失败时把已有卡片改成红色
@listener.listen(message_type="text")
async def on_text(ctx: MessageContext):
reply = listener.reply_streaming_card(
ctx,
title="任务执行中",
initial_md="开始处理..."
)
card_message_id = reply["message_id"]
try:
listener.update_streaming_card(
card_message_id,
"步骤 1 完成\\n步骤 2 完成",
title="任务执行中"
)
raise RuntimeError("第三步失败")
except Exception as exc:
listener.recolor_streaming_card(
card_message_id,
f"步骤 1 完成\\n步骤 2 完成\\n\\n错误信息:{exc}",
title="任务执行失败",
template="red",
status_text="执行失败"
)
启动服务
- run()
启动飞书 WebSocket 监听。
一个监听器实例一般在进程中只调用一次。
高级消息 decorator
text_handler
- text_handler(group_only: bool = False, user_only: bool = False)
文本消息快捷入口。传入的不是
MessageContext,而是解好的业务参数。处理函数可按需声明以下参数:
textchat_idis_groupgroup_nameuser_name
示例:群聊指令机器人
@listener.text_handler(group_only=True)
async def handle_group_cmd(text: str, chat_id: str, user_name: str):
if text == "/ping":
listener.bot.send_text_to_chat(chat_id, f"{user_name} pong")
示例:文本指令 + 引用回复 + 群私聊差异化处理
@listener.text_handler()
async def handle_text(text: str,
chat_id: str,
is_group: bool,
group_name: str,
user_name: str,
message_id: str):
if text == "/whoami":
target = f"群聊 {group_name}" if is_group else "私聊"
listener.bot.reply_message(
message_id,
"text",
{"text": f"{user_name},你当前在{target}"}
)
elif text.startswith("/say "):
listener.bot.send_text_to_chat(chat_id, text[5:])
image_handler
- image_handler(group_only: bool = False, user_only: bool = False)
自动下载图片到临时文件。处理完成后,如果你的函数返回新的图片路径,会自动重新上传并回发到当前会话。
可声明参数:
image_pathchat_idis_groupgroup_nameuser_name
示例:收到图片后打水印并自动回传
import cv2
import tempfile
from pathlib import Path
@listener.image_handler()
async def add_watermark(image_path: Path) -> Path:
image = cv2.imread(str(image_path))
cv2.putText(image, "processed", (30, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
result = Path(f.name)
cv2.imwrite(str(result), image)
return result
示例:图片消息自动下载后,引用回复说明,再把处理结果回发
@listener.image_handler()
async def detect_and_reply(image_path: Path, message_id: str, user_name: str) -> Path:
listener.bot.reply_message(
message_id,
"text",
{"text": f"{user_name},图片已收到,开始处理"}
)
image = cv2.imread(str(image_path))
cv2.rectangle(image, (20, 20), (220, 220), (0, 255, 0), 3)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
out = Path(f.name)
cv2.imwrite(str(out), image)
return out
file_handler
- file_handler(group_only: bool = False, user_only: bool = False)
自动下载文件到临时路径。若函数返回新文件路径,会自动上传并回发。
可声明参数:
file_pathchat_idis_groupgroup_nameuser_name
示例:收到文件后原样回发
from pathlib import Path
@listener.file_handler()
async def bounce_file(file_path: Path) -> Path:
return file_path
示例:文件自动落盘后解析大小并引用回复
@listener.file_handler()
async def summarize_file(file_path: Path, message_id: str, user_name: str):
size = file_path.stat().st_size
listener.bot.reply_message(
message_id,
"text",
{"text": f"{user_name},文件已收到,大小 {size} 字节"}
)
audio_handler
- audio_handler(group_only: bool = False, user_only: bool = False)
自动下载音频消息,默认保存为
.opus临时文件;若返回新的音频路径,会按opus上传回发。可声明参数:
audio_pathchat_idis_groupgroup_nameuser_namemessage_idthread_id
示例:收到语音后保存一份副本并回一个文字确认
@listener.audio_handler()
async def save_audio(audio_path, message_id):
listener.bot.reply_message(message_id, "text", {"text": "收到音频"})
示例:语音消息 -> 临时 reaction -> 引用回复 -> 转成别名文件回传
import shutil
import tempfile
from pathlib import Path
@listener.audio_handler()
async def handle_audio(audio_path: Path, message_id: str) -> Path:
reaction = listener.bot.add_reaction(message_id, "WITTY")
try:
listener.bot.reply_message(message_id, "text", {"text": "收到音频,已开始处理"})
with tempfile.NamedTemporaryFile(suffix=".opus", delete=False) as f:
out = Path(f.name)
shutil.copy(audio_path, out)
return out
finally:
listener.bot.delete_reaction(message_id, reaction["reaction_id"])
media_handler
- media_handler(group_only: bool = False, user_only: bool = False)
自动下载视频 / 媒体文件,默认临时扩展名为
.mp4;若返回新路径,会按mp4上传回发。可声明参数与
audio_handler类似,只是资源参数名为media_path。
示例:收到视频后只回元信息,不回文件
@listener.media_handler()
async def handle_media(media_path, message_id):
size = media_path.stat().st_size
listener.bot.reply_message(
message_id,
"text",
{"text": f"收到媒体文件,大小 {size} 字节"}
)
sticker_handler
- sticker_handler(group_only: bool = False, user_only: bool = False)
贴纸消息快捷入口。适合做“看到某个贴纸就回复固定文案”的轻量交互。
可声明参数:
sticker_contentraw_contentchat_idis_groupgroup_nameuser_namemessage_idthread_id
示例:不同贴纸触发不同文案
@listener.sticker_handler(group_only=True)
async def respond_sticker(sticker_content, message_id: str):
listener.bot.reply_message(
message_id,
"text",
{"text": f"收到贴纸:{sticker_content}"}
)
mention_handler
- mention_handler(group_only: bool = False, user_only: bool = False)
只处理“机器人被 @ 的文本消息”。
可声明参数:
textmentionschat_idis_groupgroup_nameuser_namemessage_idthread_idroot_idparent_idraw_event
示例:只有被 @ 时才回复
@listener.mention_handler(group_only=True)
async def when_mentioned(text: str, message_id: str, user_name: str):
listener.bot.reply_message(
message_id,
"text",
{"text": f"{user_name},我在。"}
)
示例:群里只对被 @ 的消息做流式卡片回复
@listener.mention_handler(group_only=True)
async def mention_stream(text: str, message_id: str):
def answer_stream():
yield "已收到 @ 请求\n"
yield f"原始文本:{text}\n"
yield "处理中完成"
listener.stream_reply_card(
message_id,
answer_stream(),
title="被 @ 后的自动处理",
template="blue",
final_template="green",
final_status_text="已回复"
)
事件 decorator
recall_handler
- recall_handler()
处理消息撤回事件。
可声明参数:
message_idchat_idrecall_timerecall_typeraw_event
示例:撤回事件写审计日志
from pywayne.tools import wayne_print
@listener.recall_handler()
async def on_recalled(message_id: str, chat_id: str, recall_time: str):
wayne_print(
f"消息被撤回 chat={chat_id} message={message_id} time={recall_time}",
color="red",
bold=True
)
message_read_handler
- message_read_handler()
处理消息已读事件。
可声明参数:
readermessage_id_listraw_event
示例:把已读事件同步到外部状态表
READ_STATE = {}
@listener.message_read_handler()
async def on_read(message_id_list, reader):
reader_open_id = reader["reader_id"]["open_id"]
for message_id in message_id_list:
READ_STATE[message_id] = reader_open_id
reaction_handler
- reaction_handler()
处理 reaction 新增 / 删除事件。
可声明参数:
action:created或deletedmessage_idemoji_typeoperator_typeuser_idapp_idaction_timeraw_event
示例:区分 reaction 创建和删除
@listener.reaction_handler()
async def on_reaction(action: str, message_id: str, emoji_type: str, operator_type: str):
if action == "created":
listener.bot.send_text_to_chat(
"oc_xxx",
f"{message_id} 新增 reaction: {emoji_type} by {operator_type}"
)
else:
listener.bot.send_text_to_chat(
"oc_xxx",
f"{message_id} 移除了 reaction: {emoji_type}"
)
bot_added_handler / bot_removed_handler
- bot_added_handler()
- bot_removed_handler()
处理机器人被拉入群 / 移出群事件。
常用参数:
chat_idoperator_idexternalnameraw_event
示例:入群时自动发欢迎卡片,移除时打日志
from pywayne.tools import wayne_print
@listener.bot_added_handler()
async def on_bot_added(chat_id: str, name: str):
listener.bot.send_markdown_message_to_chat(
chat_id,
md_text=f"# {name}\n\n机器人已加入当前群,可直接开始使用。",
title="机器人入群"
)
@listener.bot_removed_handler()
async def on_bot_removed(chat_id: str, operator_id: str):
wayne_print(f"机器人被移出群 {chat_id}, operator={operator_id}", color="yellow", bold=True)
bot_p2p_chat_entered_handler
- bot_p2p_chat_entered_handler()
处理机器人首次进入某个用户私聊的事件。
适合做“首次欢迎语”场景。
示例:首次进入私聊就发 onboarding
@listener.bot_p2p_chat_entered_handler()
async def on_enter_p2p(chat_id: str, operator_id: str):
listener.bot.send_markdown_message_to_chat(
chat_id,
md_text=(
"# 欢迎使用机器人\n\n"
"- 直接发文本可触发问答\n"
"- 发图片 / 文件可触发对应处理链路\n"
"- 支持流式卡片输出"
),
title=f"欢迎 {operator_id}"
)
member_changed_handler
- member_changed_handler()
将“成员加入 / 被删除 / 主动退出”三类群成员变更事件汇总为一个 decorator。
常用参数:
action:added/deleted/withdrawnchat_idoperator_idusersnameraw_event
示例:成员加入 / 退出时同步群人数缓存
GROUP_MEMBER_COUNT = {}
@listener.member_changed_handler()
async def on_member_change(action: str, chat_id: str, users):
delta = len(users or [])
GROUP_MEMBER_COUNT.setdefault(chat_id, 0)
if action == "added":
GROUP_MEMBER_COUNT[chat_id] += delta
elif action in {"deleted", "withdrawn"}:
GROUP_MEMBER_COUNT[chat_id] = max(0, GROUP_MEMBER_COUNT[chat_id] - delta)
chat_updated_handler / chat_disbanded_handler
- chat_updated_handler()
- chat_disbanded_handler()
用于监听群资料变更与群解散。
示例:群名变化后同步本地配置,群解散后清理状态
CHAT_META = {}
@listener.chat_updated_handler()
async def on_chat_updated(chat_id: str, before_change, after_change):
CHAT_META[chat_id] = after_change
@listener.chat_disbanded_handler()
async def on_chat_disbanded(chat_id: str):
CHAT_META.pop(chat_id, None)
卡片回调
- card_action_handler(verification_token: str, encrypt_key: str = '')
注册 card 卡片 action 回调处理器。
- get_card_action_handler() CardActionHandler
取回 HTTP handler,挂到 Flask / FastAPI / Django 等 Web 框架路由中。
示例:按钮点击后更新原卡片
from pywayne.lark_bot_listener import LarkBotListener
listener = LarkBotListener(app_id="cli_xxx", app_secret="sec_xxx")
@listener.card_action_handler(verification_token="token_xxx", encrypt_key="")
def on_card_action(card_event):
open_message_id = card_event.event.context.open_message_id
card = {
"type": "template",
"data": {
"template_id": "AAqC5c999",
"template_variable": {"status": "已处理"}
}
}
listener.bot.edit_card_message(open_message_id, card)
return {"toast": {"type": "success", "content": "已处理"}}
# FastAPI 示例
# app.post("/feishu/card")(listener.get_card_action_handler())
示例:按钮点击后先返回 toast,再把原卡片改成绿色完成态
@listener.card_action_handler(verification_token="token_xxx")
def on_finish(card_event):
open_message_id = card_event.event.context.open_message_id
listener.bot.update_streaming_card(
open_message_id,
"任务已确认完成",
title="任务状态",
template="green",
done=True,
status_text="已完成"
)
return {"toast": {"type": "success", "content": "已确认完成"}}
组合场景示例
场景 1:只监听部分群和某个私聊用户
ALLOWED_GROUP_NAMES = {"项目群", "值班群"}
ALLOWED_PRIVATE_USER_NAMES = {"Wayne"}
def allow(group_name: str, user_name: str, is_group: bool) -> bool:
if is_group:
return group_name in ALLOWED_GROUP_NAMES
return user_name in ALLOWED_PRIVATE_USER_NAMES
@listener.listen(message_type="text")
async def handle_text(ctx: MessageContext):
group_name, user_name = listener.bot.get_chat_and_user_name(ctx.chat_id, ctx.user_id)
if not allow(group_name, user_name, ctx.is_group):
return
listener.bot.reply_message(ctx.message_id, "text", {"text": "命中过滤条件"})
场景 2:先加 reaction,1 秒后引用回复,再取消 reaction
import asyncio
async def ack_with_temp_reaction(ctx: MessageContext, text: str):
reaction = listener.bot.add_reaction(ctx.message_id, "WITTY")
reaction_id = reaction["reaction_id"]
try:
await asyncio.sleep(1)
listener.bot.reply_message(ctx.message_id, "text", {"text": text})
finally:
listener.bot.delete_reaction(ctx.message_id, reaction_id)
@listener.listen(message_type="text")
async def on_text(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到文本")
@listener.listen(message_type="image")
async def on_image(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到图片")
@listener.listen(message_type="file")
async def on_file(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到文件")
@listener.listen(message_type="audio")
async def on_audio(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到音频")
@listener.listen(message_type="post")
async def on_post(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到post")
@listener.listen(message_type="interactive")
async def on_card_message(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到card")
场景 2.1:收到文本后直接流式卡片回复
@listener.listen(message_type="text")
async def on_text_stream(ctx: MessageContext):
async def llm_stream():
yield "第一段输出\n"
yield "第二段输出\n"
yield "第三段输出\n"
await listener.astream_reply_card(
ctx,
llm_stream(),
title="分析中",
status_text="生成中...",
final_status_text="已完成"
)
场景 2.2:逐字流式卡片回复,完成后自动变绿色
import asyncio
@listener.listen(message_type="text")
async def on_text_char_stream(ctx: MessageContext):
async def char_stream():
sentence = "这是一个逐字流式卡片输出示例。"
for ch in sentence:
yield ch
await asyncio.sleep(0.3)
await listener.astream_reply_card(
ctx,
char_stream(),
title="逐字输出中",
template="blue",
status_text="生成中...",
final_template="green",
final_status_text="已完成",
update_interval=0.25
)
场景 3:线程内回复,而不是回复到主会话
@listener.listen(message_type="text")
async def on_thread(ctx: MessageContext):
if ctx.thread_id:
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": "在线程里回复"},
reply_in_thread=True
)
场景 4:收到图片,识别后回图片;收到文本,只回文本
import cv2
import tempfile
from pathlib import Path
@listener.image_handler()
async def detect_image(image_path: Path) -> Path:
image = cv2.imread(str(image_path))
cv2.rectangle(image, (20, 20), (200, 200), (0, 255, 0), 3)
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f:
out = Path(f.name)
cv2.imwrite(str(out), image)
return out
@listener.text_handler()
async def echo_text(text: str, chat_id: str):
listener.bot.send_text_to_chat(chat_id, f"收到文本: {text}")
场景 5:群里被 @ 才响应,否则忽略
@listener.mention_handler(group_only=True)
async def mention_only(text: str, user_name: str, message_id: str):
listener.bot.reply_message(
message_id,
"text",
{"text": f"{user_name},你刚刚 @ 了我。"}
)
场景 6:收到文件后解析,再把结果摘要引用回去
@listener.file_handler()
async def summarize_file(file_path, message_id):
size = file_path.stat().st_size
listener.bot.reply_message(
message_id,
"text",
{"text": f"文件已收到,大小 {size} 字节"}
)
场景 7:成员变更时自动欢迎 / 记录离群
@listener.member_changed_handler()
async def on_member_change(action: str, chat_id: str, users):
if action == "added":
listener.bot.send_text_to_chat(chat_id, "欢迎新成员")
elif action == "withdrawn":
listener.bot.send_text_to_chat(chat_id, "有人退出了群聊")
场景 7.1:机器人首次进入私聊时主动发 onboarding
@listener.bot_p2p_chat_entered_handler()
async def on_private_chat_ready(chat_id: str):
listener.bot.send_text_to_chat(chat_id, "你好,我已经进入这个私聊,可以直接发消息给我。")
场景 8:机器人被拉进群后自动发入群说明
@listener.bot_added_handler()
async def on_bot_added(chat_id: str, name: str):
listener.bot.send_markdown_message_to_chat(
chat_id,
md_text=(
"# 机器人已上线\n\n"
"- 支持文本/图片/文件自动回复\n"
"- 支持 reaction 与引用回复\n"
"- 支持卡片按钮回调"
),
title=f"{name} 使用说明"
)
场景 9:消息已读后做状态更新
from pywayne.tools import wayne_print
@listener.message_read_handler()
async def on_read(message_id_list, reader):
wayne_print(f"这些消息已读: {message_id_list}", color="cyan")
场景 10:reaction 事件和消息业务联动
from pywayne.tools import wayne_print
@listener.reaction_handler()
async def on_reaction(action: str, message_id: str, emoji_type: str):
if action == "created" and emoji_type == "THUMBSUP":
wayne_print(f"某条消息被点赞了: {message_id}", color="green", bold=True)
场景 10.1:收到文本先加 reaction,后续依据 reaction 事件做监控
from pywayne.tools import wayne_print
@listener.listen(message_type="text")
async def on_text(ctx: MessageContext):
listener.bot.add_reaction(ctx.message_id, "OK")
@listener.reaction_handler()
async def monitor_reaction(action: str, message_id: str, emoji_type: str):
wayne_print(f"{message_id} reaction {action}: {emoji_type}", color="cyan")
场景 11:群配置变化时同步外部系统
from pywayne.tools import wayne_print
@listener.chat_updated_handler()
async def on_chat_updated(chat_id: str, before_change, after_change):
wayne_print(
f"群信息变化: {chat_id} {before_change} -> {after_change}",
color="yellow",
bold=True
)
场景 12:通用路由器 + 精细分派
@listener.listen()
async def dispatch(ctx: MessageContext):
if ctx.message_type == "text" and ctx.content.startswith("/pin "):
msg = listener.bot.reply_message(ctx.message_id, "text", {"text": ctx.content[5:]})
listener.bot.pin_message(msg["message_id"])
elif ctx.message_type == "text" and ctx.content.startswith("/card"):
listener.bot.reply_message(
ctx.message_id,
"interactive",
{
"header": {"title": {"content": "快捷卡片", "tag": "plain_text"}},
"elements": [{"tag": "markdown", "content": "通过 listen 分派生成"}]
}
)
场景 13:消息审核链路,先加表情,再查正文,再转发给值班人
DUTY_USER_OPEN_ID = "ou_xxx"
@listener.listen(message_type="text", group_only=True)
async def moderation(ctx: MessageContext):
reaction = listener.bot.add_reaction(ctx.message_id, "OK")
try:
detail = listener.bot.get_message(ctx.message_id)
text = detail["body"]["content"]
if "紧急" in text:
listener.bot.forward_message(
ctx.message_id,
DUTY_USER_OPEN_ID,
receive_id_type="open_id"
)
finally:
listener.bot.delete_reaction(ctx.message_id, reaction["reaction_id"])
场景 14:从消息上下文里继续下载资源
@listener.listen(message_type="image")
async def save_original(ctx: MessageContext):
content = json.loads(ctx.content)
listener.bot.download_message_resource(
ctx.message_id,
"image",
"/tmp/original.png",
content["image_key"]
)
场景 15:交互卡片按钮 + 原卡片原位更新
@listener.card_action_handler(verification_token="token_xxx")
def on_action(card_event):
open_message_id = card_event.event.context.open_message_id
listener.bot.edit_card_message(
open_message_id,
{
"type": "template",
"data": {
"template_id": "AAqC5c999",
"template_variable": {"status": "完成"}
}
}
)
return {"toast": {"type": "success", "content": "已更新"}}
场景 16:同一个监听器里同时处理文本、图片、文件、音频、rich_text、card
import asyncio
async def ack_with_temp_reaction(ctx: MessageContext, text: str):
reaction = listener.bot.add_reaction(ctx.message_id, "WITTY")
try:
await asyncio.sleep(1)
listener.bot.reply_message(ctx.message_id, "text", {"text": text})
finally:
listener.bot.delete_reaction(ctx.message_id, reaction["reaction_id"])
@listener.listen(message_type="text")
async def on_text(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到文本")
@listener.listen(message_type="image")
async def on_image(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到图片")
@listener.listen(message_type="file")
async def on_file(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到文件")
@listener.listen(message_type="audio")
async def on_audio(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到音频")
@listener.listen(message_type="post")
async def on_post(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到post")
@listener.listen(message_type="interactive")
async def on_card_message(ctx: MessageContext):
await ack_with_temp_reaction(ctx, "收到card")
场景 17:按群名和用户名双重过滤,再进入统一处理链路
ALLOWED_GROUP_NAMES = {"测试3", "项目群"}
ALLOWED_PRIVATE_USER_NAMES = {"Wayne", "Alice"}
def allow(group_name: str, user_name: str, is_group: bool) -> bool:
if is_group:
return group_name in ALLOWED_GROUP_NAMES
return user_name in ALLOWED_PRIVATE_USER_NAMES
@listener.listen()
async def guarded_router(ctx: MessageContext):
group_name, user_name = listener.bot.get_chat_and_user_name(ctx.chat_id, ctx.user_id)
if not allow(group_name, user_name, ctx.is_group):
return
listener.bot.reply_message(
ctx.message_id,
"text",
{"text": f"允许访问:{user_name}"}
)
推荐组合方式
如果你只想做“收到消息就按类型回复”,优先用:
listen(message_type="text" | "image" | "file" | "audio" | "post" | "interactive")这里的
post/interactive是飞书原始消息类型值,分别表示 rich_text / card因为你通常还要拿
message_id做reply_message和add_reaction
如果你更在意附件自动下载和自动回传,优先用:
image_handlerfile_handleraudio_handlermedia_handler
如果你要的是事件驱动自动化,优先用:
member_changed_handlerreaction_handlermessage_read_handlerbot_added_handlerchat_updated_handler
注意事项
listen(message_type=...)注册多个 handler 时,某条消息会依次经过多个处理器;不匹配的 handler 会直接跳过。自动下载类 decorator 会创建临时文件,处理结束后自动清理;如果你把返回路径指向了别的文件,也会尝试清理那个返回文件。
如果你要“引用回复”某条消息,请始终保留
message_id,不要只拿chat_id。interactive消息监听,处理的是“收到一条 card 卡片消息”;卡片按钮点击不是消息,要走card_action_handler。私聊 / 群聊名称过滤便于配置,但不如按 ID 稳定;同名群、同名用户场景下建议自行增加二次校验。