This repository was archived by the owner on May 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
114 lines (91 loc) · 3.44 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
import asyncio
import json
import logging
import websockets
from aiocqhttp import CQHttp
from aiocqhttp.message import Message
from typing import Dict, Any, Optional
from config import BotConfig
from converter import cq_2_xhup, xhup_2_cq
logging.basicConfig(level=BotConfig.LOG_LEVEL)
"""
xhup-qq-bot 的入口
测试启动:python run.py
"""
logger = logging.getLogger("xhup-qq-bot")
bot = CQHttp(access_token=BotConfig.ACCESS_TOKEN,
message_class=Message,
enable_http_post=False)
xhup_club_ws = None
async def get_xhup_club_ws():
"""连接 xhup_club_api,不断重试直到连上为止"""
global xhup_club_ws
while True: # 如果连接失败,永远重试
connect = websockets.connect(BotConfig.XHUP_CLUB_API,
extra_headers={
"Authorization": f"Token {BotConfig.XHUP_CLUB_TOKEN}"
})
try:
xhup_club_ws = await connect
logger.info("后端连接成功")
return xhup_club_ws
except OSError as e:
logger.warning(e)
logger.warning("后端连接失败,三秒后重连")
await asyncio.sleep(3)
async def handle_xhup_club_event():
"""
在收到 xhup 发过来的消息时,将它处理后转发给 CoolQ
"""
global xhup_club_ws
while True: # 这个是一个 run_forever 的 task
if xhup_club_ws:
try:
reply = json.loads(await xhup_club_ws.recv())
if reply:
cq_reply = xhup_2_cq(reply)
logger.info(f"机器人回复消息:{cq_reply}")
await bot.send_msg(**cq_reply)
except websockets.ConnectionClosed as e:
logger.warning(e)
logger.warning("后端连接断开,三秒后自动重连")
await asyncio.sleep(3)
xhup_club_ws = await get_xhup_club_ws()
except Exception as e:
logger.warning(e)
else:
logger.info("后端尚未连接,现在尝试连接。")
xhup_club_ws = await get_xhup_club_ws()
@bot.on_message()
async def handle_msg(context):
"""
在收到 CoolQ 消息时,将消息处理后,转发给 xhup
"""
logger.info(f"收到消息:{context}")
global xhup_club_ws
if xhup_club_ws:
try:
xhup_msg = cq_2_xhup(context)
await xhup_club_ws.send(json.dumps(xhup_msg))
except websockets.ConnectionClosed as e:
logger.warning(e)
logger.warning("后端连接断开,三秒后自动重连")
await asyncio.sleep(3)
xhup_club_ws = await get_xhup_club_ws()
else:
logger.info("后端尚未连接,现在尝试连接。")
xhup_club_ws = await get_xhup_club_ws()
# @bot.on_notice('group_increase')
# async def handle_group_increase(context):
# await bot.send(context, message='欢迎新人~',
# at_sender=True, auto_escape=True)
if __name__ == '__main__':
# obtain the event loop from asyncio
loop = asyncio.get_event_loop()
# 先连接上 xhup-club-api 后端
loop.run_until_complete(get_xhup_club_ws())
# 后台跑这个任务,将 xhup 消息转发给 CoolQ
loop.create_task(handle_xhup_club_event())
# 启动 aiocqhttp
bot.run(host=BotConfig.HOST, port=BotConfig.PORT)