Skip to content

Commit d920a12

Browse files
committed
Feature: Could not creating Superfluid flows from sdk
Solution: Install and import superfluid.py from PyPI. Add helper methods on EthAccount
1 parent 04622be commit d920a12

File tree

5 files changed

+345
-3
lines changed

5 files changed

+345
-3
lines changed

pyproject.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ dependencies = [
3131
"jwcrypto==1.5.6",
3232
"python-magic",
3333
"typing_extensions",
34-
"aioresponses>=0.7.6"
34+
"aioresponses>=0.7.6",
35+
"superfluid~=0.2.1",
36+
"eth_typing==4.3.1",
37+
3538
]
3639

3740
[project.optional-dependencies]

src/aleph/sdk/chains/ethereum.py

+90-2
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,76 @@
1+
from decimal import Decimal
12
from pathlib import Path
2-
from typing import Optional, Union
3+
from typing import Awaitable, Dict, Optional, Set, Union
34

5+
from aleph_message.models import Chain
46
from eth_account import Account
57
from eth_account.messages import encode_defunct
68
from eth_account.signers.local import LocalAccount
79
from eth_keys.exceptions import BadSignature as EthBadSignatureError
10+
from superfluid import Web3FlowInfo
811

12+
from ..conf import settings
13+
from ..connectors.superfluid import Superfluid
914
from ..exceptions import BadSignatureError
1015
from ..utils import bytes_from_hex
1116
from .common import BaseAccount, get_fallback_private_key, get_public_key
1217

18+
CHAINS_WITH_SUPERTOKEN: Set[Chain] = {Chain.AVAX}
19+
CHAIN_IDS: Dict[Chain, int] = {
20+
Chain.AVAX: settings.AVAX_CHAIN_ID,
21+
}
22+
23+
24+
def get_rpc_for_chain(chain: Chain):
25+
"""Returns the RPC to use for a given Ethereum based blockchain"""
26+
if not chain:
27+
return None
28+
29+
if chain == Chain.AVAX:
30+
return settings.AVAX_RPC
31+
else:
32+
raise ValueError(f"Unknown RPC for chain {chain}")
33+
34+
35+
def get_chain_id_for_chain(chain: Chain):
36+
"""Returns the chain ID of a given Ethereum based blockchain"""
37+
if not chain:
38+
return None
39+
40+
if chain in CHAIN_IDS:
41+
return CHAIN_IDS[chain]
42+
else:
43+
raise ValueError(f"Unknown RPC for chain {chain}")
44+
1345

1446
class ETHAccount(BaseAccount):
1547
CHAIN = "ETH"
1648
CURVE = "secp256k1"
1749
_account: LocalAccount
50+
chain: Optional[Chain]
51+
superfluid_connector: Optional[Superfluid]
1852

19-
def __init__(self, private_key: bytes):
53+
def __init__(
54+
self,
55+
private_key: bytes,
56+
chain: Optional[Chain] = None,
57+
rpc: Optional[str] = None,
58+
chain_id: Optional[int] = None,
59+
):
2060
self.private_key = private_key
2161
self._account = Account.from_key(self.private_key)
62+
self.chain = chain
63+
rpc = rpc or get_rpc_for_chain(chain)
64+
chain_id = chain_id or get_chain_id_for_chain(chain)
65+
self.superfluid_connector = (
66+
Superfluid(
67+
rpc=rpc,
68+
chain_id=chain_id,
69+
account=self._account,
70+
)
71+
if chain in CHAINS_WITH_SUPERTOKEN
72+
else None
73+
)
2274

2375
async def sign_raw(self, buffer: bytes) -> bytes:
2476
"""Sign a raw buffer."""
@@ -37,6 +89,42 @@ def from_mnemonic(mnemonic: str) -> "ETHAccount":
3789
Account.enable_unaudited_hdwallet_features()
3890
return ETHAccount(private_key=Account.from_mnemonic(mnemonic=mnemonic).key)
3991

92+
def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
93+
if not self.superfluid_connector:
94+
raise ValueError("Superfluid connector is required to create a flow")
95+
return self.superfluid_connector.create_flow(
96+
sender=self.get_address(), receiver=receiver, flow=flow
97+
)
98+
99+
def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]:
100+
if not self.superfluid_connector:
101+
raise ValueError("Superfluid connector is required to get a flow")
102+
return self.superfluid_connector.get_flow(
103+
sender=self.get_address(), receiver=receiver
104+
)
105+
106+
def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
107+
if not self.superfluid_connector:
108+
raise ValueError("Superfluid connector is required to update a flow")
109+
return self.superfluid_connector.update_flow(
110+
sender=self.get_address(), receiver=receiver, flow=flow
111+
)
112+
113+
def delete_flow(self, receiver: str) -> Awaitable[str]:
114+
if not self.superfluid_connector:
115+
raise ValueError("Superfluid connector is required to delete a flow")
116+
return self.superfluid_connector.delete_flow(
117+
sender=self.get_address(), receiver=receiver
118+
)
119+
120+
def setup_superfluid_connector(self, rpc: str, chain_id: int):
121+
"""set up the Superfluid connector."""
122+
self.superfluid_connector = Superfluid(
123+
rpc=rpc,
124+
chain_id=chain_id,
125+
account=self._account,
126+
)
127+
40128

41129
def get_fallback_account(path: Optional[Path] = None) -> ETHAccount:
42130
return ETHAccount(private_key=get_fallback_private_key(path=path))

src/aleph/sdk/conf.py

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ class Settings(BaseSettings):
3838

3939
CODE_USES_SQUASHFS: bool = which("mksquashfs") is not None # True if command exists
4040

41+
AVAX_RPC: str = "https://api.avax.network/ext/bc/C/rpc"
42+
AVAX_CHAIN_ID: int = 43114
43+
AVAX_ALEPH_SUPER_TOKEN = "0xc0Fbc4967259786C743361a5885ef49380473dCF" # mainnet
44+
4145
# Dns resolver
4246
DNS_IPFS_DOMAIN = "ipfs.public.aleph.sh"
4347
DNS_PROGRAM_DOMAIN = "program.public.aleph.sh"
+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from decimal import Decimal
5+
from typing import TYPE_CHECKING, Optional
6+
7+
from eth_utils import to_normalized_address, to_wei
8+
from superfluid import CFA_V1, Operation, Web3FlowInfo
9+
from web3 import Web3
10+
from web3.types import TxParams
11+
12+
from aleph.sdk.conf import settings
13+
14+
if TYPE_CHECKING:
15+
from aleph.sdk.chains.ethereum import LocalAccount
16+
17+
18+
async def sign_and_send_transaction(
19+
account: LocalAccount, tx_params: TxParams, rpc: str
20+
) -> str:
21+
"""
22+
Signs and broadcasts a transaction using ETHAccount
23+
@param tx_params - Transaction parameters
24+
@param rpc - RPC URL
25+
@returns - str - The transaction hash
26+
"""
27+
web3 = Web3(Web3.HTTPProvider(rpc))
28+
29+
def sign_and_send():
30+
signed_txn = account.sign_transaction(tx_params)
31+
transaction_hash = web3.eth.send_raw_transaction(signed_txn.rawTransaction)
32+
return transaction_hash.hex()
33+
34+
# Sending a transaction is done over HTTP(S) and implemented using a blocking
35+
# API in `web3.eth`. This runs it in a non-blocking asyncio executor.
36+
loop = asyncio.get_running_loop()
37+
transaction_hash = await loop.run_in_executor(None, sign_and_send)
38+
return transaction_hash
39+
40+
41+
async def execute_operation_with_account(
42+
account: LocalAccount, operation: Operation
43+
) -> str:
44+
"""
45+
Executes the operation using ETHAccount
46+
@param operation - Operation instance from the library
47+
@returns - str - The transaction hash
48+
@returns - str - The transaction hash
49+
"""
50+
populated_transaction = operation._get_populated_transaction_request(
51+
operation.rpc, account.key
52+
)
53+
transaction_hash = await sign_and_send_transaction(
54+
account, populated_transaction, operation.rpc
55+
)
56+
return transaction_hash
57+
58+
59+
class Superfluid:
60+
account: Optional[LocalAccount]
61+
62+
def __init__(
63+
self,
64+
rpc=settings.AVAX_RPC,
65+
chain_id=settings.AVAX_CHAIN_ID,
66+
account: Optional[LocalAccount] = None,
67+
):
68+
self.cfaV1Instance = CFA_V1(rpc, chain_id)
69+
self.account = account
70+
71+
async def create_flow(self, sender: str, receiver: str, flow: Decimal) -> str:
72+
if not self.account:
73+
raise ValueError("An account is required to create a flow")
74+
return await execute_operation_with_account(
75+
account=self.account,
76+
operation=self.cfaV1Instance.create_flow(
77+
sender=to_normalized_address(sender),
78+
receiver=to_normalized_address(receiver),
79+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
80+
flow_rate=to_wei(Decimal(flow), "ether"),
81+
),
82+
)
83+
84+
async def get_flow(self, sender: str, receiver: str) -> Web3FlowInfo:
85+
return self.cfaV1Instance.get_flow(
86+
sender=to_normalized_address(sender),
87+
receiver=to_normalized_address(receiver),
88+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
89+
)
90+
91+
async def delete_flow(self, sender: str, receiver: str) -> str:
92+
if not self.account:
93+
raise ValueError("An account is required to delete a flow")
94+
return await execute_operation_with_account(
95+
account=self.account,
96+
operation=self.cfaV1Instance.delete_flow(
97+
sender=to_normalized_address(sender),
98+
receiver=to_normalized_address(receiver),
99+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
100+
),
101+
)
102+
103+
async def update_flow(self, sender: str, receiver: str, flow: Decimal) -> str:
104+
if not self.account:
105+
raise ValueError("An account is required to update a flow")
106+
return await execute_operation_with_account(
107+
account=self.account,
108+
operation=self.cfaV1Instance.update_flow(
109+
sender=to_normalized_address(sender),
110+
receiver=to_normalized_address(receiver),
111+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
112+
flow_rate=to_wei(Decimal(flow), "ether"),
113+
),
114+
)

tests/unit/test_superfluid.py

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import random
2+
from decimal import Decimal
3+
from unittest import mock
4+
from unittest.mock import AsyncMock, MagicMock, patch
5+
6+
import pytest
7+
from aleph_message.models import Chain
8+
from eth_utils import to_checksum_address
9+
from superfluid import Operation, Web3FlowInfo
10+
11+
from aleph.sdk.chains.ethereum import ETHAccount
12+
from aleph.sdk.conf import settings
13+
14+
15+
def generate_fake_eth_address():
16+
return to_checksum_address(
17+
"0x" + "".join([random.choice("0123456789abcdef") for _ in range(40)])
18+
)
19+
20+
21+
@pytest.fixture
22+
def mock_superfluid():
23+
with patch("aleph.sdk.connectors.superfluid.CFA_V1") as MockCFA_V1:
24+
yield MockCFA_V1.return_value
25+
26+
27+
@pytest.fixture
28+
def eth_account(mock_superfluid):
29+
private_key = b"\x01" * 32
30+
return ETHAccount(
31+
private_key,
32+
chain=Chain.AVAX,
33+
rpc=settings.AVAX_RPC,
34+
chain_id=settings.AVAX_CHAIN_ID,
35+
)
36+
37+
38+
@pytest.mark.asyncio
39+
async def test_initialization(eth_account):
40+
assert eth_account.superfluid_connector is not None
41+
42+
43+
@pytest.mark.asyncio
44+
async def test_create_flow(eth_account, mock_superfluid):
45+
mock_operation = AsyncMock(spec=Operation)
46+
mock_superfluid.create_flow.return_value = mock_operation
47+
48+
sender = eth_account.get_address()
49+
receiver = generate_fake_eth_address()
50+
flow = Decimal("10.0")
51+
52+
with patch(
53+
"aleph.sdk.connectors.superfluid.execute_operation_with_account",
54+
return_value="0xTransactionHash",
55+
) as mock_execute:
56+
tx_hash = await eth_account.create_flow(receiver, flow)
57+
assert tx_hash == "0xTransactionHash"
58+
mock_execute.assert_called_once_with(
59+
account=eth_account._account, operation=mock_operation
60+
)
61+
mock_superfluid.create_flow.assert_called_once_with(
62+
sender=sender.lower(),
63+
receiver=receiver.lower(),
64+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
65+
flow_rate=mock.ANY,
66+
)
67+
68+
69+
@pytest.mark.asyncio
70+
async def test_delete_flow(eth_account, mock_superfluid):
71+
mock_operation = AsyncMock(spec=Operation)
72+
mock_superfluid.delete_flow.return_value = mock_operation
73+
74+
sender = eth_account.get_address()
75+
receiver = generate_fake_eth_address()
76+
77+
with patch(
78+
"aleph.sdk.connectors.superfluid.execute_operation_with_account",
79+
return_value="0xTransactionHash",
80+
) as mock_execute:
81+
tx_hash = await eth_account.delete_flow(receiver)
82+
assert tx_hash == "0xTransactionHash"
83+
mock_execute.assert_called_once_with(
84+
account=eth_account._account, operation=mock_operation
85+
)
86+
mock_superfluid.delete_flow.assert_called_once_with(
87+
sender=sender.lower(),
88+
receiver=receiver.lower(),
89+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
90+
)
91+
92+
93+
@pytest.mark.asyncio
94+
async def test_update_flow(eth_account, mock_superfluid):
95+
mock_operation = AsyncMock(spec=Operation)
96+
mock_superfluid.update_flow.return_value = mock_operation
97+
98+
sender = eth_account.get_address()
99+
receiver = generate_fake_eth_address()
100+
flow = Decimal(15.0)
101+
102+
with patch(
103+
"aleph.sdk.connectors.superfluid.execute_operation_with_account",
104+
return_value="0xTransactionHash",
105+
) as mock_execute:
106+
tx_hash = await eth_account.update_flow(receiver, flow)
107+
assert tx_hash == "0xTransactionHash"
108+
mock_execute.assert_called_once_with(
109+
account=eth_account._account, operation=mock_operation
110+
)
111+
mock_superfluid.update_flow.assert_called_once_with(
112+
sender=sender.lower(),
113+
receiver=receiver.lower(),
114+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
115+
flow_rate=mock.ANY,
116+
)
117+
118+
119+
@pytest.mark.asyncio
120+
async def test_get_flow(eth_account, mock_superfluid):
121+
mock_flow_info = MagicMock(spec=Web3FlowInfo)
122+
mock_superfluid.get_flow.return_value = mock_flow_info
123+
124+
sender = eth_account.get_address()
125+
receiver = generate_fake_eth_address()
126+
127+
flow_info = await eth_account.get_flow(receiver)
128+
assert flow_info == mock_flow_info
129+
mock_superfluid.get_flow.assert_called_once_with(
130+
sender=sender.lower(),
131+
receiver=receiver.lower(),
132+
super_token=settings.AVAX_ALEPH_SUPER_TOKEN,
133+
)

0 commit comments

Comments
 (0)