forked from HipsterBrown/physical-world-monitoring
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
68 lines (47 loc) · 1.59 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import asyncio
import json
from typing import Any
import functions_framework
from flask import Request
from flask.typing import ResponseReturnValue
from pydantic_settings import BaseSettings, SettingsConfigDict
from viam.robot.client import RobotClient
from viam.components.board import Board
class Settings(BaseSettings):
viam_api_key: str = ""
viam_api_key_id: str = ""
machine_address: str = ""
board_name: str = ""
pin_name: str = ""
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
async def connect(settings: Settings):
opts = RobotClient.Options.with_api_key(
api_key=settings.viam_api_key,
api_key_id=settings.viam_api_key_id,
)
return await RobotClient.at_address(settings.machine_address, opts)
async def blink_led(settings: Settings, data: dict[str, Any]):
machine = await connect(settings)
pi = Board.from_robot(machine, settings.board_name)
pin = await pi.gpio_pin_by_name(settings.pin_name)
count = 0
while count < 10:
await pin.set(True)
await asyncio.sleep(0.5)
await pin.set(False)
await asyncio.sleep(0.5)
count += 1
await machine.close()
@functions_framework.http
def alert_movement(request: Request) -> ResponseReturnValue:
if request.method != "POST":
return "Ok"
payload = request.form
if payload is None:
return "Ok"
settings = Settings()
print("Got the following data:")
data: dict[str, Any] = json.loads(list(payload.keys())[0])
print(data)
asyncio.run(blink_led(settings, data))
return "Ok"