-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathautomator.py
executable file
·258 lines (237 loc) · 9.55 KB
/
automator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
from cv import UIMatcher
from util import *
import uiautomator2 as u2
import random
import logging
class Automator:
def __init__(self, device: str, upgrade_list: list, harvest_filter:list, auto_task = False, auto_policy = True, auto_goods = False, speedup = True, auto_red_bag=["small", "middle", "photo"]):
"""
device: 如果是 USB 连接,则为 adb devices 的返回结果;如果是模拟器,则为模拟器的控制 URL 。
"""
self.d = u2.connect(device)
self.upgrade_list = upgrade_list
self.harvest_filter = harvest_filter
self.dWidth, self.dHeight = self.d.window_size()
logging.info(f'device screen size {self.dWidth}, {self.dHeight}')
self.appRunning = False
self.auto_task = auto_task
self.auto_policy = auto_policy
self.auto_goods = auto_goods
self.loot_speedup = speedup
self.auto_red_bag = auto_red_bag
def start(self):
"""
启动脚本,请确保已进入游戏页面。
"""
while True:
# 判断jgm进程是否在前台, 最多等待20秒,否则唤醒到前台
if self.d.app_wait("com.tencent.jgm", front=True,timeout=20):
if not self.appRunning:
# 从后台换到前台,留一点反应时间
logging.info("App is front. JGM agent start in 5 seconds")
time.sleep(5)
self.appRunning = True
else:
self.d.app_start("com.tencent.jgm")
self.appRunning = False
continue
# 判断是否可升级政策
self.check_policy()
# 判断是否可完成任务
self.check_task()
# 判断是否需要扫货
self.check_goods()
# 简单粗暴的方式,处理 “XX之光” 的荣誉显示。
# 不管它出不出现,每次都点一下 确定 所在的位置
self.d.click(550/1080, 1650/1920)
self.upgrade(self.upgrade_list)
# 滑动屏幕,收割金币。
self.swipe()
self.open_red_bags()
def upgrade(self, upgrade_list):
if not len(upgrade_list):
return
self._open_upgrade_interface()
building,count = random.choice(upgrade_list)
self._upgrade_one_with_count(building,count)
self._close_upgrade_interface()
def swipe(self):
"""
滑动屏幕,收割金币。
"""
try:
# logging.info("[%s] Swiped."%time.asctime())
for i in range(3):
# 横向滑动,共 3 次。
sx, sy = BUILDING_POSITIONS[i * 3 + 1]
ex, ey = BUILDING_POSITIONS[i * 3 + 3]
self.d.swipe(sx-0.1, sy+0.05, ex, ey)
except(Exception):
# 用户在操作手机,暂停10秒
time.sleep(10)
def harvest(self,building_filter,goods:list):
'''
新的傻瓜搬货物方法,先按住截图判断绿光探测货物目的地,再搬
'''
short_wait()
for good in goods:
pos_id = self.guess_good(good)
logging.info(pos_id)
if pos_id != 0 and pos_id in building_filter:
# 搬5次
self._move_good_by_id(good, BUILDING_POSITIONS[pos_id], times=4)
short_wait()
def guess_good(self, good_id):
'''
按住货物,探测绿光出现的位置
这一段应该用numpy来实现,奈何我对numpy不熟。。。
'''
diff_screens = self.get_screenshot_while_touching(GOODS_POSITIONS[good_id])
return UIMatcher.findGreenLight(diff_screens)
def get_screenshot_while_touching(self, location, pressed_time=0.2):
'''
Get screenshot with screen touched.
'''
screen_before = self.d.screenshot(format="opencv")
h,w = len(screen_before),len(screen_before[0])
x,y = (location[0] * w,location[1] *h)
# 按下
self.d.touch.down(x,y)
# logging.info('[%s]Tapped'%time.asctime())
time.sleep(pressed_time)
# 截图
screen = self.d.screenshot(format="opencv")
# logging.info('[%s]Screenning'%time.asctime())
# 松开
self.d.touch.up(x,y)
# 返回按下前后两幅图
return screen_before, screen
def check_policy(self):
if not self.auto_policy:
return
# 看看政策中心那里有没有冒绿色箭头气泡
if len(UIMatcher.findGreenArrow(self.d.screenshot(format="opencv"))):
# 打开政策中心
self.d.click(0.206, 0.097)
mid_wait()
# 确认升级
self.d.click(0.077, 0.122)
# 拉到顶
self._slide_to_top()
# 开始找绿色箭头,找不到就往下滑,最多划5次
for i in range(5):
screen = self.d.screenshot(format="opencv")
arrows = UIMatcher.findGreenArrow(screen)
if len(arrows):
x,y = arrows[0]
self.d.click(x,y) # 点击这个政策
short_wait()
self.d.click(0.511, 0.614) # 确认升级
logging.info("[%s] Policy upgraded. ++++++"%time.asctime())
break
# 如果还没出现绿色箭头,往下划
else:
self.d.swipe(0.5, 0.8, 0.5, 0.3, duration = 0.3)
self.d.click(0.5, 0.5) # 点击一下屏幕正中间,使其停止滑动
self._back_to_main()
def check_task(self):
if not self.auto_task:
return
# 看看任务中心有没有冒黄色气泡
screen = self.d.screenshot(format="opencv")
if UIMatcher.findTaskBubble(screen):
self.d.click(0.16, 0.84) # 打开城市任务
short_wait()
self.d.click(0.51, 0.819) # 点击 完成任务
logging.info("[%s] Task finished. ++++++"%time.asctime())
self._back_to_main()
def check_goods(self):
if not self.auto_goods:
return
# 判断货物那个叉叉是否出现
good_id = self._has_good()
if len(good_id) > 0:
logging.info("[%s] Train come."%time.asctime())
self.harvest(self.harvest_filter, good_id)
else:
# logging.info("[%s] No goods! Pls wait 2s."%time.asctime())
self.swipe()
time.sleep(2)
return
# 再看看是不是有货没收,如果有就重启app
good_id = self._has_good()
if len(good_id) > 0 and self.loot_speedup:
self.d.app_stop("com.tencent.jgm")
logging.info("[%s] Reset app."%time.asctime())
time.sleep(2)
# 重新启动app
self.d.app_start("com.tencent.jgm")
time.sleep(15)
def open_red_bags(self):
if not self.auto_red_bag:
return
self.d.click(0.5, 0.95)
mid_wait()
screen = self.d.screenshot(format="opencv")
points = UIMatcher.findRedBagOpen(screen)
for point in points:
for key, value in REDBAG_PHOTO_POSTION.items():
if self._point_neer(point, value) and key in self.auto_red_bag:
logging.info(f"{key} open")
# continue
self.d.click(point[0], point[1])
for _ in range(10):
mid_wait()
self.d.click(0.5, 0.55)
short_wait()
self.d.click(0.5, 0.55)
mid_wait()
self.d.click(0.1, 0.95)
mid_wait()
def _point_neer(self, point1, point2):
if (point2[0]*self.dWidth) - 80 < point1[0] < (point2[0]*self.dWidth) + 80:
if (point2[1]*self.dHeight) - 80 < point1[1] < (point2[1]*self.dHeight) + 80:
return True
return False
def _open_upgrade_interface(self):
screen = self.d.screenshot(format="opencv")
# 判断升级按钮的颜色,蓝比红多就处于正常界面,反之在升级界面
R, G, B = UIMatcher.getPixel(screen,0.974,0.615)
if B > R:
self.d.click(0.9, 0.57)
def _close_upgrade_interface(self):
screen = self.d.screenshot(format="opencv")
# 判断升级按钮的颜色,蓝比红多就处于正常界面,反之在升级界面
R, G, B = UIMatcher.getPixel(screen,0.974,0.615)
if B < R:
self.d.click(0.9, 0.57)
def _upgrade_one_with_count(self,id,count):
sx, sy=BUILDING_POSITIONS[id]
self.d.click(sx, sy)
time.sleep(0.3)
for i in range(count):
self.d.click(0.798, 0.884)
# time.sleep(0.1)
def _move_good_by_id(self, good: int, source, times=1):
try:
sx, sy = GOODS_POSITIONS[good]
ex, ey = source
for i in range(times):
self.d.drag(sx, sy, ex, ey, duration = 0.1)
short_wait()
except(Exception):
pass
def _has_good(self):
'''
返回有货的位置列表
'''
screen = self.d.screenshot(format="opencv")
return UIMatcher.detectCross(screen)
def _slide_to_top(self):
for i in range(3):
self.d.swipe(0.488, 0.302,0.482, 0.822)
short_wait()
def _back_to_main(self):
for i in range(3):
self.d.click(0.057, 0.919)
short_wait()