跨语言通信 (cross_comm)
本模块提供了一个基于WebSocket的跨语言、多设备通信解决方案。通过CrossCommService类,用户可以轻松构建支持多种消息类型的实时通信应用,包括文本、JSON、字典、字节数组、图片、文件和文件夹传输。
核心特性
多角色支持:支持服务器和客户端角色
实时通信:基于WebSocket协议的双向实时通信
多种消息类型:支持文本、JSON、字典、字节数组、图片、文件、文件夹
客户端管理:唯一ID生成、在线状态管理、心跳检测
文件传输:集成阿里云OSS进行大文件传输
消息监听:装饰器方式注册消息处理器,支持类型和来源过滤
状态持久化:YAML文件记录客户端状态
文件下载控制:支持指定下载目录,避免不必要的流量消耗
CommMsgType 枚举类详细说明
- class CommMsgType(Enum)
消息类型枚举,定义了所有支持的消息类型。
枚举值:
TEXT: “text” - 文本消息
JSON: “json” - JSON字符串消息
DICT: “dict” - Python字典消息
BYTES: “bytes” - 字节数组消息
IMAGE: “image” - 图片文件消息
FILE: “file” - 普通文件消息
FOLDER: “folder” - 文件夹消息
HEARTBEAT: “heartbeat” - 心跳消息(内部使用)
LOGIN: “login” - 登录消息(内部使用)
LOGOUT: “logout” - 登出消息(内部使用)
LIST_CLIENTS: “list_clients” - 客户端列表请求(内部使用)
LIST_CLIENTS_RESPONSE: “list_clients_response” - 客户端列表响应(内部使用)
LOGIN_RESPONSE: “login_response” - 登录响应(内部使用)
CrossCommService 类详细说明
- class CrossCommService(role: str, ip: str = '0.0.0.0', port: int = 9898, client_id: str | None = None, heartbeat_interval: int = 30, heartbeat_timeout: int = 60)
跨语言通信服务的主要类,支持服务器和客户端两种角色。
初始化参数:
role: 服务角色,’server’ 或 ‘client’
ip: IP地址,服务器监听地址或客户端连接地址
port: 端口号,默认9898
client_id: 客户端唯一ID,如不指定则自动生成(基于MAC地址和UUID)
heartbeat_interval: 心跳间隔(秒),默认30秒
heartbeat_timeout: 心跳超时(秒),默认60秒
主要方法:
- start_server() None
启动服务器,开始监听客户端连接。仅在role为’server’时可用。
该方法会阻塞执行,直到服务器停止。
- login() bool
客户端登录到服务器。仅在role为’client’时可用。
返回:
布尔值,表示登录是否成功
- logout() None
客户端登出并断开连接。仅在role为’client’时可用。
- send_message(content: Any, msg_type: CommMsgType, to_client_id: str = 'all') bool
发送消息到指定客户端或广播到所有客户端。
参数:
content: 消息内容,支持多种类型
msg_type: 消息类型,必须指定CommMsgType枚举值
to_client_id: 接收方客户端ID,’all’表示广播
返回:
布尔值,表示发送是否成功
注意:
文件类型消息(FILE、IMAGE、FOLDER)会自动上传到OSS
字节类型消息会自动进行base64编码
JSON类型消息会验证JSON格式的有效性
- download_file_manually(oss_key: str, save_directory: str) bool
手动下载文件或文件夹。
参数:
oss_key: OSS中的文件键值
save_directory: 保存目录(OSS管理器会在此目录下重建路径结构)
返回:
布尔值,表示下载是否成功
注意:
OSS管理器会在指定目录下重建完整的oss_key路径结构
例如:oss_key为”cross_comm/client1/file.txt”,save_directory为”./downloads”,则文件会保存为”./downloads/cross_comm/client1/file.txt”
- message_listener(msg_type: CommMsgType | None = None, from_client_id: str | None = None, download_directory: str | None = None)
装饰器方法,用于注册消息监听器。支持按消息类型和发送方过滤,可指定文件下载目录。
参数:
msg_type: 要监听的消息类型,None表示监听所有类型
from_client_id: 要监听的发送方ID,None表示监听所有发送方
download_directory: 文件下载目录,仅对文件类型消息有效(FILE、IMAGE、FOLDER)
特性:
自动过滤自己发送的消息
指定下载目录的处理器会自动下载文件
未指定下载目录的处理器不会自动下载,节省流量
可以为不同类型的文件设置不同的下载目录
示例:
@service.message_listener(msg_type=CommMsgType.TEXT) async def handle_text(message: Message): print(f"收到文本消息: {message.content}") @service.message_listener(msg_type=CommMsgType.FILE, download_directory="./downloads") async def handle_file(message: Message): # message.content 包含下载后的本地文件路径 print(f"文件已下载到: {message.content}")
- get_online_clients() List[str]
获取当前在线的客户端ID列表(仅服务器角色可用)。
返回:
在线客户端ID列表
- list_clients(only_show_online: bool = True, timeout: float = 5.0) dict | None
客户端请求服务器上的客户端列表(仅客户端角色可用)。
参数:
only_show_online: 是否只显示在线客户端,默认True
timeout: 等待响应的超时时间(秒),默认5.0
返回:
包含客户端列表的字典,格式为:
{ 'clients': [ { 'client_id': str, 'status': 'online'/'offline', 'last_heartbeat': float, 'login_time': float }, ... ], 'total_count': int, 'only_show_online': bool }
Message 类详细说明
支持的消息类型
用户可直接使用的消息类型:
CommMsgType.TEXT: 纯文本消息
CommMsgType.JSON: JSON字符串消息
CommMsgType.DICT: Python字典消息
CommMsgType.BYTES: 字节数组消息
CommMsgType.IMAGE: 图片文件消息
CommMsgType.FILE: 普通文件消息
CommMsgType.FOLDER: 文件夹消息
系统内部使用的消息类型(用户无需关注):
CommMsgType.HEARTBEAT: 心跳消息
CommMsgType.LOGIN: 登录消息
CommMsgType.LOGOUT: 登出消息
CommMsgType.LIST_CLIENTS: 客户端列表请求
CommMsgType.LIST_CLIENTS_RESPONSE: 客户端列表响应
CommMsgType.LOGIN_RESPONSE: 登录响应
环境配置
在使用前需要配置阿里云OSS环境变量(用于文件传输):
# .env 文件
OSS_ENDPOINT=xxx
OSS_BUCKET_NAME=xxx
OSS_ACCESS_KEY_ID=xxx
OSS_ACCESS_KEY_SECRET=xxx
使用示例
以下示例展示了如何使用跨语言通信模块:
服务器端示例:
import asyncio
from pywayne.cross_comm import CrossCommService, Message, CommMsgType
async def run_server():
# 创建服务器
server = CrossCommService(
role='server',
ip='0.0.0.0',
port=9898,
heartbeat_interval=30,
heartbeat_timeout=60
)
# 注册消息监听器
@server.message_listener(msg_type=CommMsgType.TEXT)
async def handle_text(message: Message):
print(f"收到来自 {message.from_client_id} 的文本: {message.content}")
@server.message_listener(msg_type=CommMsgType.IMAGE, download_directory="./server_images")
async def handle_image(message: Message):
# message.content 包含下载后的本地文件路径
print(f"收到来自 {message.from_client_id} 的图片: {message.content}")
# 启动服务器
await server.start_server()
# 运行服务器
asyncio.run(run_server())
客户端示例:
import asyncio
from pywayne.cross_comm import CrossCommService, Message, CommMsgType
async def run_client():
# 创建客户端
client = CrossCommService(
role='client',
ip='localhost',
port=9898,
client_id='my_client'
)
# 注册消息监听器
@client.message_listener(msg_type=CommMsgType.TEXT)
async def handle_text(message: Message):
print(f"收到文本消息: {message.content}")
@client.message_listener(msg_type=CommMsgType.FILE, download_directory="./downloads/files")
async def handle_file(message: Message):
# message.content 包含下载后的本地文件路径
print(f"文件已下载到: {message.content}")
# 可以直接使用本地文件
try:
with open(message.content, 'r', encoding='utf-8') as f:
content = f.read()[:100] # 只读取前100个字符
print(f"文件内容预览: {content}...")
except Exception as e:
print(f"读取文件失败: {e}")
# 不设置下载目录的处理器 - 不会自动下载文件
@client.message_listener(msg_type=CommMsgType.FILE, from_client_id="special_client")
async def handle_special_file(message: Message):
print(f"收到特殊客户端的文件消息: {message.content}")
print(f"OSS Key: {message.oss_key}")
# 可以选择手动下载到指定目录
success = client.download_file_manually(message.oss_key, "./manual_downloads/")
if success:
print("文件下载成功")
# 登录到服务器
if await client.login():
print("登录成功!")
# 发送各种类型的消息
# 1. 发送文本消息
await client.send_message("Hello Server!", CommMsgType.TEXT)
# 2. 发送JSON消息
await client.send_message('{"type": "notification", "data": "test"}', CommMsgType.JSON)
# 3. 发送字典消息
await client.send_message({
"type": "data",
"value": 123,
"status": "ok"
}, CommMsgType.DICT)
# 4. 发送字节数据
await client.send_message(b"Binary data", CommMsgType.BYTES)
# 5. 发送图片文件(会自动上传到OSS)
# await client.send_message("/path/to/image.jpg", CommMsgType.IMAGE)
# 6. 发送文件(会自动上传到OSS)
# await client.send_message("/path/to/document.pdf", CommMsgType.FILE)
# 7. 发送文件夹(会自动上传到OSS)
# await client.send_message("/path/to/folder", CommMsgType.FOLDER)
# 8. 发送给指定客户端
# await client.send_message("Private message", CommMsgType.TEXT, to_client_id='target_client_id')
# 获取客户端列表
all_clients = await client.list_clients(only_show_online=False)
if all_clients:
print(f"总客户端数量: {all_clients['total_count']}")
for client_info in all_clients['clients']:
print(f" - {client_info['client_id']}: {client_info['status']}")
# 只获取在线客户端
online_clients = await client.list_clients(only_show_online=True)
if online_clients:
print(f"在线客户端数量: {online_clients['total_count']}")
# 保持连接
await asyncio.sleep(60)
# 登出
await client.logout()
# 运行客户端
asyncio.run(run_client())
多客户端通信示例:
# 客户端A
clientA = CrossCommService(role='client', client_id='clientA')
await clientA.login()
# 客户端B
clientB = CrossCommService(role='client', client_id='clientB')
await clientB.login()
# A向B发送消息
await clientA.send_message("Hello B!", CommMsgType.TEXT, to_client_id='clientB')
# B向A发送文件
await clientB.send_message("/path/to/file.pdf", CommMsgType.FILE, to_client_id='clientA')
文件下载控制示例:
# 为不同类型的文件设置不同的下载目录
@client.message_listener(msg_type=CommMsgType.IMAGE, download_directory="./downloads/images")
async def handle_image(message: Message):
print(f"图片已下载到: {message.content}")
@client.message_listener(msg_type=CommMsgType.FILE, download_directory="./downloads/documents")
async def handle_document(message: Message):
print(f"文档已下载到: {message.content}")
# 不设置下载目录 - 节省流量,只获取消息信息
@client.message_listener(msg_type=CommMsgType.FILE, from_client_id="low_priority_client")
async def handle_low_priority_file(message: Message):
print(f"收到低优先级文件消息,不自动下载: {message.oss_key}")
# 根据需要手动下载
if should_download(message):
success = client.download_file_manually(message.oss_key, "./manual_downloads/")
if success:
print("手动下载成功")
命令行使用
模块支持直接从命令行运行:
# 运行服务器示例
python -m pywayne.cross_comm server
# 运行客户端示例
python -m pywayne.cross_comm client
心跳机制
系统内置心跳机制确保连接稳定性:
客户端定期向服务器发送心跳包
服务器检测客户端心跳超时自动标记为离线
支持自定义心跳间隔和超时时间
自动重连机制(客户端)
状态管理
客户端状态通过YAML文件持久化存储:
client_001:
status: online
last_seen: 1640995200.123
client_002:
status: offline
last_seen: 1640995100.456
文件传输和下载控制
文件上传:
文件类型消息(FILE、IMAGE、FOLDER)会自动上传到阿里云OSS
生成唯一的OSS键值避免冲突
支持大文件和文件夹传输
文件下载控制:
通过message_listener装饰器的download_directory参数控制下载行为
指定下载目录:自动下载文件到指定目录,OSS管理器会在目录下重建完整路径结构
不指定下载目录:不自动下载,节省流量和存储空间
支持手动下载:使用download_file_manually方法
可为不同消息类型设置不同下载目录
支持按发送方过滤下载策略
文件路径结构说明:
OSS中文件的完整路径(oss_key)会在本地目录中重建
例如:oss_key为”cross_comm/sender_id/1234567890_abc123.pdf”
指定下载目录为”./downloads/documents”
最终文件路径为”./downloads/documents/cross_comm/sender_id/1234567890_abc123.pdf”
这样保持了OSS中的目录结构,避免文件名冲突
错误处理
模块提供完善的错误处理机制:
连接异常自动重试
消息发送失败通知
文件上传/下载错误处理
详细的日志记录
消息过滤避免处理自己发送的消息
注意事项
文件传输:大文件通过OSS传输,需要正确配置OSS环境变量
消息类型:必须使用CommMsgType枚举,不再支持字符串类型
文件下载:合理设置下载目录,避免不必要的流量消耗
网络安全:生产环境建议使用TLS/SSL加密
性能考虑:大量并发连接时考虑调整心跳参数
资源清理:程序退出时确保调用logout()方法
唯一ID:客户端ID基于MAC地址和UUID生成,确保全局唯一
API变更说明
v2.0 主要变更:
Breaking Change: 引入CommMsgType枚举,替代字符串类型
Breaking Change: send_message方法参数顺序调整,msg_type变为必需参数
新增: download_file_manually方法支持手动文件下载
增强: message_listener装饰器增加download_directory参数
增强: 消息过滤机制,自动过滤自己发送的消息
优化: 文件下载控制,避免不必要的流量消耗
扩展建议
未来可考虑的扩展功能:
消息加密和数字签名
消息持久化和离线消息
群组和频道功能
消息路由和负载均衡
更多消息类型支持(音频、视频等)
Web界面监控和管理
消息统计和分析功能
分布式部署支持