Skip to content

Commit

Permalink
feat: support algopy.Array and algopy.ImmutableArray from algoran…
Browse files Browse the repository at this point in the history
…d-python 2.7
  • Loading branch information
daniel-makerx committed Feb 19, 2025
1 parent 847f6c7 commit fd8d19f
Show file tree
Hide file tree
Showing 10 changed files with 605 additions and 47 deletions.
54 changes: 22 additions & 32 deletions src/_algopy_testing/arc4.py
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ def is_dynamic(self) -> bool:
return True


class _DynamicArrayMeta(type(_ABIEncoded), typing.Generic[_TArrayItem, _TArrayLength]): # type: ignore # noqa: PGH003
class _DynamicArrayMeta(type(_ABIEncoded), typing.Generic[_TArrayItem]): # type: ignore[misc]
__concrete__: typing.ClassVar[dict[type, type]] = {}

def __getitem__(cls, key_t: type[_TArrayItem]) -> type:
Expand Down Expand Up @@ -1015,17 +1015,18 @@ def __repr__(self) -> str:


class _StructTypeInfo(_TypeInfo):
def __init__(self, struct_type: type[Struct]) -> None:
def __init__(self, struct_type: type[Struct], *, frozen: bool) -> None:
self.struct_type = struct_type
self.fields = dataclasses.fields(struct_type)
self.field_names = [field.name for field in self.fields]
self.frozen = frozen

@property
def typ(self) -> type:
return self.struct_type

@property
def child_types(self) -> Iterable[_TypeInfo]:
def child_types(self) -> list[_TypeInfo]:
return _tuple_type_from_struct(self.struct_type)._type_info.child_types

@property
Expand Down Expand Up @@ -1056,8 +1057,11 @@ class Struct(MutableBytes, _ABIEncoded, metaclass=_StructMeta): # type: ignore[
_type_info: typing.ClassVar[_StructTypeInfo] # type: ignore[misc]

def __init_subclass__(cls, *args: typing.Any, **kwargs: dict[str, typing.Any]) -> None:
dataclasses.dataclass(cls, *args, **kwargs)
cls._type_info = _StructTypeInfo(cls)
# make implementation not frozen, so we can conditionally control behaviour
dataclasses.dataclass(cls, *args, **{**kwargs, "frozen": False})
frozen = kwargs.get("frozen", False)
assert isinstance(frozen, bool)
cls._type_info = _StructTypeInfo(cls, frozen=frozen)

def __post_init__(self) -> None:
# calling base class here to init Mutable
Expand All @@ -1073,6 +1077,10 @@ def __setattr__(self, key: str, value: typing.Any) -> None:
super().__setattr__(key, value)
# don't update backing value until base class has been init'd
if hasattr(self, "_on_mutate") and key in self._type_info.field_names:
if self._type_info.frozen:
raise dataclasses.FrozenInstanceError(
f"{type(self)} is frozen and cannot be modified"
)
self._update_backing_value()

def _update_backing_value(self) -> None:
Expand Down Expand Up @@ -1154,34 +1162,16 @@ def emit(event: str | Struct, /, *args: object) -> None:
log(event_hash[:4] + event_data.value)


def native_value_to_arc4(value: object) -> _ABIEncoded: # noqa: PLR0911
import algopy

if isinstance(value, _ABIEncoded):
return value
if isinstance(value, bool):
return Bool(value)
if isinstance(value, algopy.UInt64):
return UInt64(value)
if isinstance(value, algopy.BigUInt):
return UInt512(value)
if isinstance(value, algopy.Bytes):
return DynamicBytes(value)
if isinstance(value, algopy.String):
return String(value)
if isinstance(value, tuple):
return Tuple(tuple(map(native_value_to_arc4, value)))
raise TypeError(f"Unsupported type: {type(value).__name__}")


def _cast_arg_as_arc4(arg: object) -> _ABIEncoded:
from _algopy_testing.serialize import native_to_arc4

if isinstance(arg, int) and not isinstance(arg, bool):
return UInt64(arg) if arg <= MAX_UINT64 else UInt512(arg)
if isinstance(arg, bytes):
return DynamicBytes(arg)
if isinstance(arg, str):
return String(arg)
return native_value_to_arc4(arg)
return native_to_arc4(arg)


def _find_bool(
Expand Down Expand Up @@ -1245,13 +1235,13 @@ def _get_max_bytes_len(type_info: _TypeInfo) -> int:
size = 0
if isinstance(type_info, _DynamicArrayTypeInfo):
size += _ABI_LENGTH_SIZE
elif isinstance(type_info, _TupleTypeInfo | _StaticArrayTypeInfo):
elif isinstance(type_info, _TupleTypeInfo | _StructTypeInfo | _StaticArrayTypeInfo):
i = 0
child_types = (
type_info.child_types
if isinstance(type_info, _TupleTypeInfo)
else [type_info.item_type] * type_info.size
)
if isinstance(type_info, _TupleTypeInfo | _StructTypeInfo):
child_types = type_info.child_types
else:
typing.assert_type(type_info, _StaticArrayTypeInfo)
child_types = [type_info.item_type] * type_info.size
while i < len(child_types):
if isinstance(child_types[i], _BoolTypeInfo):
after = _find_bool_types(child_types, i, 1)
Expand Down
37 changes: 27 additions & 10 deletions src/_algopy_testing/decorators/arc4.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,18 @@ def get_ordered_args(
params = list(sig.parameters.values())[1:] # Skip 'self'
app_args_iter = iter(app_args)

ordered_args = [
(
kwargs.get(p.name, next(app_args_iter, p.default))
if p.default is not p.empty
else kwargs.get(p.name) or next(app_args_iter)
)
for p in params
]
ordered_args = []
for p in params:
try:
arg = kwargs[p.name]
except KeyError:
try:
arg = next(app_args_iter)
except StopIteration:
if p.default is p.empty:
raise TypeError(f"missing required argument {p.name}") from None
arg = p.default
ordered_args.append(arg)

if list(app_args_iter):
raise TypeError("Too many positional arguments")
Expand Down Expand Up @@ -168,6 +172,8 @@ def abimethod( # noqa: PLR0913

@functools.wraps(fn)
def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
from _algopy_testing.serialize import native_to_arc4

contract, *app_args = args
assert isinstance(contract, _algopy_testing.ARC4Contract), "expected ARC4 contract"
assert fn is not None, "expected function"
Expand All @@ -186,7 +192,8 @@ def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
check_routing_conditions(app_id, metadata)
result = fn(*args, **kwargs)
if result is not None:
abi_result = _algopy_testing.arc4.native_value_to_arc4(result)

abi_result = native_to_arc4(result)
log(ARC4_RETURN_PREFIX, abi_result)
return result

Expand Down Expand Up @@ -273,6 +280,8 @@ def _extract_arrays_from_args(
app: algopy.Application,
sender: algopy.Account,
) -> _TxnArrays:
from _algopy_testing.serialize import native_to_arc4

txns = list[_algopy_testing.gtxn.TransactionBase]()
apps = [app]
assets = list[_algopy_testing.Asset]()
Expand All @@ -292,7 +301,7 @@ def _extract_arrays_from_args(
app_args.append(_algopy_testing.arc4.UInt8(len(apps)))
apps.append(arg_app)
case _ as maybe_native:
app_args.append(_algopy_testing.arc4.native_value_to_arc4(maybe_native))
app_args.append(native_to_arc4(maybe_native))
if len(app_args) > 15:
packed = _algopy_testing.arc4.Tuple(tuple(app_args[14:]))
app_args[14:] = [packed]
Expand Down Expand Up @@ -320,6 +329,7 @@ def _type_to_arc4(annotation: types.GenericAlias | type | None) -> str: # noqa:
from _algopy_testing.arc4 import _ABIEncoded
from _algopy_testing.gtxn import Transaction, TransactionBase
from _algopy_testing.models import Account, Application, Asset
from _algopy_testing.primitives import ImmutableArray

if annotation is None:
return "void"
Expand All @@ -331,6 +341,13 @@ def _type_to_arc4(annotation: types.GenericAlias | type | None) -> str: # noqa:
if not isinstance(annotation, type):
raise TypeError(f"expected type: {annotation!r}")

if typing.NamedTuple in getattr(annotation, "__orig_bases__", []):
tuple_fields = list(inspect.get_annotations(annotation).values())
tuple_args = [_type_to_arc4(a) for a in tuple_fields]
return f"({','.join(tuple_args)})"

if issubclass(annotation, ImmutableArray):
return f"{_type_to_arc4(annotation._element_type)}[]"
# arc4 types
if issubclass(annotation, _ABIEncoded):
return annotation._type_info.arc4_name
Expand Down
4 changes: 2 additions & 2 deletions src/_algopy_testing/models/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from _algopy_testing.decorators.arc4 import get_active_txn_fields, maybe_arc4_metadata
from _algopy_testing.mutable import set_attr_on_mutate
from _algopy_testing.primitives import Bytes, UInt64
from _algopy_testing.protocols import BytesBacked, UInt64Backed
from _algopy_testing.protocols import BytesBacked, Serializable, UInt64Backed
from _algopy_testing.state.utils import deserialize, serialize

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -165,7 +165,7 @@ def __setattr__(self, name: str, value: typing.Any) -> None:
state._key = name_bytes
case _algopy_testing.BoxMap() as box_map if box_map._key_prefix is None:
box_map._key_prefix = name_bytes
case Bytes() | UInt64() | BytesBacked() | UInt64Backed() | bool():
case Bytes() | UInt64() | BytesBacked() | Serializable() | UInt64Backed() | bool():
app_id = _get_self_or_active_app_id(self)
lazy_context.ledger.set_global_state(app_id, name_bytes, serialize(value))
cls = type(self)
Expand Down
3 changes: 2 additions & 1 deletion src/_algopy_testing/primitives/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from _algopy_testing.primitives.array import Array, ImmutableArray
from _algopy_testing.primitives.biguint import BigUInt
from _algopy_testing.primitives.bytes import Bytes
from _algopy_testing.primitives.string import String
from _algopy_testing.primitives.uint64 import UInt64

__all__ = ["BigUInt", "Bytes", "String", "UInt64"]
__all__ = ["Array", "BigUInt", "Bytes", "ImmutableArray", "String", "UInt64"]
144 changes: 144 additions & 0 deletions src/_algopy_testing/primitives/array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import types
import typing
from collections.abc import Iterable, Iterator, Reversible

from _algopy_testing.primitives.uint64 import UInt64
from _algopy_testing.protocols import Serializable
from _algopy_testing.serialize import deserialize_from_bytes, serialize_to_bytes

_T = typing.TypeVar("_T")


class _ImmutableArrayMeta(type):
__concrete__: typing.ClassVar[dict[type, type]] = {}

# get or create a type that is parametrized with element_t
def __getitem__(cls, element_t: type) -> type:
cache = cls.__concrete__
if c := cache.get(element_t, None):
return c

cls_name = f"{cls.__name__}[{element_t.__name__}]"
cache[element_t] = c = types.new_class(
cls_name,
bases=(cls,),
exec_body=lambda ns: ns.update(
_element_type=element_t,
),
)

return c


class ImmutableArray(Serializable, typing.Generic[_T], metaclass=_ImmutableArrayMeta):
_element_type: typing.ClassVar[type]

# ensure type is fully parameterized by looking up type from metaclass
def __new__(cls, *items: _T) -> typing.Self:
from _algopy_testing.serialize import type_of

try:
assert cls._element_type
except AttributeError:
try:
item = items[0]
except IndexError:
raise TypeError("array must have an item type") from None
cls = cls[type_of(item)]
instance = super().__new__(cls)
return instance

def __init__(self, *items: _T):
for item in items:
if not isinstance(item, typing.get_origin(self._element_type) or self._element_type):
raise TypeError(f"expected items of type {self._element_type}")
self._items = tuple(items)

def __iter__(self) -> Iterator[_T]:
return iter(self._items)

def __reversed__(self) -> Iterator[_T]:
return reversed(self._items)

@property
def length(self) -> UInt64:
return UInt64(len(self._items))

def __getitem__(self, index: UInt64 | int) -> _T:
return self._items[index]

def replace(self, index: UInt64 | int, value: _T) -> "ImmutableArray[_T]":
copied = list(self._items)
copied[int(index)] = value
return self._from_iter(copied)

def append(self, item: _T, /) -> "ImmutableArray[_T]":
copied = list(self._items)
copied.append(item)
return self._from_iter(copied)

def __add__(self, other: Iterable[_T], /) -> "ImmutableArray[_T]":
return self._from_iter((*self._items, *other))

def pop(self) -> "ImmutableArray[_T]":
copied = list(self._items)
copied.pop()
return self._from_iter(copied)

def _from_iter(self, items: Iterable[_T]) -> "ImmutableArray[_T]":
"""Returns a new array populated with items, also ensures element type info is
preserved."""
el_type = self._element_type
typ = ImmutableArray[el_type] # type: ignore[valid-type]
return typ(*items)

def __bool__(self) -> bool:
return bool(self._items)

def serialize(self) -> bytes:
return serialize_to_bytes(self)

@classmethod
def from_bytes(cls, value: bytes, /) -> typing.Self:
return deserialize_from_bytes(cls, value)


class Array(Reversible[_T]):

def __init__(self, *items: _T):
self._items = list(items)

def __iter__(self) -> Iterator[_T]:
return iter(list(self._items))

def __reversed__(self) -> Iterator[_T]:
return reversed(self._items)

@property
def length(self) -> UInt64:
return UInt64(len(self._items))

def __getitem__(self, index: UInt64 | int) -> _T:
return self._items[int(index)]

def __setitem__(self, index: UInt64 | int, value: _T) -> _T:
self._items[int(index)] = value
return value

def append(self, item: _T, /) -> None:
self._items.append(item)

def extend(self, other: Iterable[_T], /) -> None:
self._items.extend(other)

def pop(self) -> _T:
return self._items.pop()

def copy(self) -> "Array[_T]":
return Array(*self._items)

def freeze(self) -> ImmutableArray[_T]:
return ImmutableArray(*self._items)

def __bool__(self) -> bool:
return bool(self._items)
12 changes: 12 additions & 0 deletions src/_algopy_testing/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,18 @@
import algopy


class Serializable:
"""For algopy testing only, allows serializing to/from bytes for types that aren't
BytesBacked."""

@classmethod
def from_bytes(cls, value: bytes, /) -> typing.Self:
raise NotImplementedError

def serialize(self) -> bytes:
raise NotImplementedError


class BytesBacked:
"""Represents a type that is a single bytes value."""

Expand Down
Loading

0 comments on commit fd8d19f

Please # to comment.