-
-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathprotocol.py
97 lines (79 loc) · 3.68 KB
/
protocol.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from __future__ import annotations
from dataclasses import dataclass, field
from yarl import URL
from avilla.core.application import Avilla
from avilla.core.protocol import BaseProtocol, ProtocolConfig
from graia.ryanvk import merge, ref
from .connection.ws_client import ElizabethWsClientNetworking
from .service import ElizabethService
@dataclass
class ElizabethConfig(ProtocolConfig):
qq: int
host: str
port: int
access_token: str
base_url: URL = field(init=False)
def __post_init__(self):
self.base_url = URL.build(scheme="http", host=self.host, port=self.port)
def _import_performs(): # noqa: F401
from avilla.elizabeth.perform import context, resource_fetch # noqa: F401
from avilla.elizabeth.perform.action import contact # noqa: F401
from avilla.elizabeth.perform.action import (
activity,
announcement,
file,
friend,
group,
member,
message,
request,
)
from avilla.elizabeth.perform.event import activity # noqa: F401, F811
from avilla.elizabeth.perform.event import friend # noqa: F811
from avilla.elizabeth.perform.event import group # noqa: F811
from avilla.elizabeth.perform.event import member # noqa: F401, F811
from avilla.elizabeth.perform.event import message # noqa: F401, F811
from avilla.elizabeth.perform.event import relationship # noqa: F401
from avilla.elizabeth.perform.event import request # noqa: F401, F811
from avilla.elizabeth.perform.message import deserialize, serialize # noqa
from avilla.elizabeth.perform.query import announcement, bot, file, friend, group # noqa
_import_performs()
class ElizabethProtocol(BaseProtocol):
service: ElizabethService
artifacts = {
**merge(
ref("avilla.protocol/elizabeth::action", "activity"),
ref("avilla.protocol/elizabeth::action", "announcement"),
ref("avilla.protocol/elizabeth::action", "contact"),
ref("avilla.protocol/elizabeth::action", "file"),
ref("avilla.protocol/elizabeth::action", "friend"),
ref("avilla.protocol/elizabeth::action", "group"),
ref("avilla.protocol/elizabeth::action", "member"),
ref("avilla.protocol/elizabeth::action", "message"),
ref("avilla.protocol/elizabeth::action", "request"),
ref("avilla.protocol/elizabeth::event", "activity"),
ref("avilla.protocol/elizabeth::event", "friend"),
ref("avilla.protocol/elizabeth::event", "group"),
ref("avilla.protocol/elizabeth::event", "member"),
ref("avilla.protocol/elizabeth::event", "message"),
ref("avilla.protocol/elizabeth::event", "relationship"),
ref("avilla.protocol/elizabeth::event", "request"),
ref("avilla.protocol/elizabeth::message", "deserialize"),
ref("avilla.protocol/elizabeth::message", "serialize"),
ref("avilla.protocol/elizabeth::query", "announcement"),
ref("avilla.protocol/elizabeth::query", "bot"),
ref("avilla.protocol/elizabeth::query", "file"),
ref("avilla.protocol/elizabeth::query", "friend"),
ref("avilla.protocol/elizabeth::query", "group"),
ref("avilla.protocol/elizabeth::resource_fetch"),
ref("avilla.protocol/elizabeth::context"),
)
}
def __init__(self):
self.service = ElizabethService(self)
def ensure(self, avilla: Avilla):
self.avilla = avilla
avilla.launch_manager.add_component(self.service)
def configure(self, config: ElizabethConfig):
self.service.connections.append(ElizabethWsClientNetworking(self, config))
return self