From 0a830cbd4facfa4021953cbc91339138fd11677b Mon Sep 17 00:00:00 2001 From: Boluwatife Popoola Date: Tue, 16 Jul 2024 23:26:35 +0100 Subject: [PATCH 01/20] populate data from nadabot v2 --- .../management/commands/populatedata.py | 131 ++++++++++++++++++ indexer_app/utils.py | 2 +- nadabot/utils.py | 2 +- 3 files changed, 133 insertions(+), 2 deletions(-) diff --git a/indexer_app/management/commands/populatedata.py b/indexer_app/management/commands/populatedata.py index f542081..10b03c6 100644 --- a/indexer_app/management/commands/populatedata.py +++ b/indexer_app/management/commands/populatedata.py @@ -13,11 +13,13 @@ Pot, PotApplication, PotApplicationReview, + PotFactory, PotPayout, PotPayoutChallenge, PotPayoutChallengeAdminResponse, ) from tokens.models import Token +from nadabot.models import Provider, NadabotRegistry, Stamp, Group class Command(BaseCommand): @@ -213,8 +215,137 @@ def handle(self, *args, **options): page += 1 else: break + # Nadabot + NADABOT_ID = "v2new.staging.nadabot.near" + registry, _ = Account.objects.get_or_create(id=NADABOT_ID) + url = f"{settings.FASTNEAR_RPC_URL}/account/{NADABOT_ID}/view/get_contract_source_metadata" + response = requests.get(url) + if response.status_code != 200: + print( + f"Request for nadabot source metadata failed ({response.status_code}) with message: {response.text}" + ) + return + source_metadata = response.json() + print("........", source_metadata) + url = f"{settings.FASTNEAR_RPC_URL}/account/{NADABOT_ID}/view/get_config" + response = requests.get(url) + if response.status_code != 200: + print( + f"Request for nadabot config failed ({response.status_code}) with message: {response.text}" + ) + return + config = response.json() + print("... requested config, now creating reg..") + owner, _ = Account.objects.get_or_create(id=config["owner"]) + reg_defaults = { + "owner": owner, + "created_at": datetime.fromtimestamp(1711744807), + "updated_at": datetime.fromtimestamp(1711744807), + "source_metadata": source_metadata + } + nadabot_registry, _ = NadabotRegistry.objects.update_or_create( + id=registry, defaults=reg_defaults + ) + if config.get("admins"): + for admin_id in config["admins"]: + admin, _ = Account.objects.get_or_create(id=admin_id) + nadabot_registry.admins.add(admin) + url = f"{settings.FASTNEAR_RPC_URL}/account/{NADABOT_ID}/view/get_providers" + response = requests.get(url) + if response.status_code != 200: + print( + f"Request for provider data failed ({response.status_code}) with message: {response.text}" + ) + return + providers = response.json() + print("provider data; ", providers) + for provider in providers: + print("sleeping for 1 second") + time.sleep(1) + submitter, _ = Account.objects.get_or_create(id=provider["submitted_by"]) + contract, _ = Account.objects.get_or_create(id=provider["contract_id"]) + # provider_id = provider["id"] + provider_default = { + "contract": contract, + "method_name": provider["method_name"], + "name": provider["provider_name"], + "description": provider.get("description"), + "status": provider["status"], + "admin_notes": provider.get("admin_notes"), + "default_weight": provider["default_weight"], + "gas": provider.get("gas"), + "tags": provider.get("tags"), + "icon_url": provider.get("icon_url"), + "external_url": provider.get("external_url"), + "submitted_by_id": provider["submitted_by"], + "submitted_at": datetime.fromtimestamp(provider.get("submitted_at_ms") / 1000), + "stamp_validity_ms": provider.get("stamp_validity_ms"), + "account_id_arg_name": provider["account_id_arg_name"], + "custom_args": provider.get("custom_args"), + "registry_id": NADABOT_ID + } + provider, _ = Provider.objects.update_or_create( + on_chain_id=provider["id"], + defaults=provider_default + ) + + # stamps + url = f"{settings.FASTNEAR_RPC_URL}/account/{NADABOT_ID}/view/get_stamps" + response = requests.get(url) + if response.status_code != 200: + print( + f"Request for stamps data failed ({response.status_code}) with message: {response.text}" + ) + return + stamps = response.json() + for stamp in stamps: + user, _ = Account.objects.get_or_create(id=stamp["user_id"]) + provider, _ = Provider.objects.get_or_create(on_chain_id=stamp["provider"]["id"]) + stamp_default = { + "verified_at": datetime.fromtimestamp(stamp["validated_at_ms"] / 1000) + } + stamp_obj, _ = Stamp.objects.update_or_create( + user=user, + provider=provider, + defaults=stamp_default + ) + + # Groups + url = f"{settings.FASTNEAR_RPC_URL}/account/{NADABOT_ID}/view/get_groups" + response = requests.get(url) + if response.status_code != 200: + print( + f"Request for groups data failed ({response.status_code}) with message: {response.text}" + ) + return + groups = response.json() + for group in groups: + print("the roup.. ", group) + rule = group['rule'] + rule_key = rule + rule_val = None + if type(rule) == dict: + rule_key = next(iter(rule)) + rule_val = rule.get(rule_key) + + group_default = { + "name": group["name"], + "created_at": datetime.now(), + "updated_at": datetime.now(), + "rule_type": rule_key, + "rule_val": rule_val + } + group_obj, _ = Group.objects.update_or_create( + id=group["id"], + defaults=group_default + ) + if group.get("providers"): + for group_provider_id in group["providers"]: + group_provider, _ = Provider.objects.get_or_create(on_chain_id=group_provider_id) + group_obj.providers.add(group_provider) # pot factory POTFACTORY_ID = "v1.potfactory.potlock.near" + # pot_factory, _ = PotFactory.objects # pots near_acct, _ = Account.objects.get_or_create(id="near") url = f"{settings.FASTNEAR_RPC_URL}/account/{POTFACTORY_ID}/view/get_pots" diff --git a/indexer_app/utils.py b/indexer_app/utils.py index 46aeed7..149cd99 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -1088,7 +1088,7 @@ async def handle_new_provider( external_url=data.get("external_url"), submitted_by_id=data["submitted_by"], submitted_at = datetime.fromtimestamp(data.get("submitted_at_ms") / 1000), - stamp_validity_ms = datetime.fromtimestamp(data.get("stamp_validity_ms") / 1000) if data.get("stamp_validity_ms") else None, + stamp_validity_ms = data.get("stamp_validity_ms"), account_id_arg_name = data["account_id_arg_name"], custom_args = data.get("custom_args"), registry_id=receiverId diff --git a/nadabot/utils.py b/nadabot/utils.py index 0aaa7a8..aa9ff61 100644 --- a/nadabot/utils.py +++ b/nadabot/utils.py @@ -5,7 +5,7 @@ BASE_PATTERN = ( r"nadabot\.testnet" if settings.ENVIRONMENT == "testnet" -else r"v\d+(?:new)?\.[a-zA-Z]+\.nadabot\.near" +else r"v\d+(?:new)?(?:\.[a-zA-Z]+)?\.nadabot\.near" ) From 0f482665480eff7c80eaae4e139c19dde8661bcb Mon Sep 17 00:00:00 2001 From: Boluwatife Popoola Date: Thu, 18 Jul 2024 15:59:47 +0100 Subject: [PATCH 02/20] add to populate data script and fix some --- .../management/commands/populatedata.py | 53 ++++++++++++++++++- indexer_app/utils.py | 32 +++++++---- 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/indexer_app/management/commands/populatedata.py b/indexer_app/management/commands/populatedata.py index 10b03c6..6e260b1 100644 --- a/indexer_app/management/commands/populatedata.py +++ b/indexer_app/management/commands/populatedata.py @@ -343,9 +343,60 @@ def handle(self, *args, **options): for group_provider_id in group["providers"]: group_provider, _ = Provider.objects.get_or_create(on_chain_id=group_provider_id) group_obj.providers.add(group_provider) + # pot factory POTFACTORY_ID = "v1.potfactory.potlock.near" - # pot_factory, _ = PotFactory.objects + pot_factory, _ = Account.objects.get_or_create(id=POTFACTORY_ID) + # get pot factory metadata from fastnear + url = f"{settings.FASTNEAR_RPC_URL}/account/{POTFACTORY_ID}/view/get_contract_source_metadata" + response = requests.get(url) + if response.status_code != 200: + print( + f"Request for pot factory metadata data failed ({response.status_code}) with message: {response.text}" + ) + return + metadata = response.json() + print("pot factory metadata; ", metadata) + + # get pot factory config + url = f"{settings.FASTNEAR_RPC_URL}/account/{POTFACTORY_ID}/view/get_config" + response = requests.get(url) + if response.status_code != 200: + print( + f"Request for pot factory config data failed ({response.status_code}) with message: {response.text}" + ) + return + config = response.json() + # get pot factory owner + owner, _ = Account.objects.get_or_create(id=config["owner"]) + protocol_fee_recipient_account, _ = Account.objects.get_or_create( + id=config["protocol_fee_recipient_account"], + ) + defaults = { + "owner": owner, + "deployed_at": datetime.fromtimestamp(1707662008), + "source_metadata": metadata, + "protocol_fee_basis_points": config["protocol_fee_basis_points"], + "protocol_fee_recipient": protocol_fee_recipient_account, + "require_whitelist": config["require_whitelist"], + } + # Create Factory object + factory, factory_created = PotFactory.objects.update_or_create( + id=pot_factory, defaults=defaults + ) + + if config.get("admins"): + for admin_id in config["admins"]: + admin, _ = Account.objects.get_or_create( + id=admin_id, + ) + factory.admins.add(admin) + + # Add whitelisted deployers to the PotFactory + if config.get("whitelisted_deployers"): + for deployer_id in config["whitelisted_deployers"]: + deployer, _ = Account.objects.get_or_create(id=deployer_id) + factory.whitelisted_deployers.add(deployer) # pots near_acct, _ = Account.objects.get_or_create(id="near") url = f"{settings.FASTNEAR_RPC_URL}/account/{POTFACTORY_ID}/view/get_pots" diff --git a/indexer_app/utils.py b/indexer_app/utils.py index 149cd99..daac58c 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -61,12 +61,15 @@ async def handle_new_nadabot_registry( try: registry, _ = await Account.objects.aget_or_create(id=receiverId) owner, _ = await Account.objects.aget_or_create(id=data["owner"]) + reg_defaults = { + "owner": owner, + "created_at": created_at, + "updated_at": created_at, + "source_metadata": data.get('source_metadata') + } nadabot_registry, created = await NadabotRegistry.objects.aupdate_or_create( id=registry, - owner=owner, - created_at=created_at, - updated_at=created_at, - source_metadata=data.get('source_metadata') + defaults=reg_defaults ) if data.get("admins"): @@ -1111,10 +1114,13 @@ async def handle_add_stamp( provider, _ = await Provider.objects.aget_or_create(on_chain_id=data["provider_id"]) try: + stamp_default = { + "verified_at": datetime.fromtimestamp(data["validated_at_ms"] / 1000) + } stamp = await Stamp.objects.aupdate_or_create( user=user, provider=provider, - verified_at = datetime.fromtimestamp(data["validated_at_ms"] / 1000) + defaults=stamp_default ) except Exception as e: logger.error(f"Failed to create stamp: {e}") @@ -1138,13 +1144,17 @@ async def handle_new_group( rule_key = next(iter(rule)) rule_val = rule.get(rule_key) - group = await Group.objects.acreate( + group_default = { + "name": group_data["name"], + "created_at": created_at, + "updated_at": created_at, + "rule_type": rule_key, + "rule_val": rule_val + } + + group, _ = Group.objects.update_or_create( id=group_data["id"], - name=group_data["name"], - created_at=created_at, - updated_at=created_at, - rule_type = rule_key, - rule_val = rule_val + defaults=group_default ) logger.info(f"addding provider.... : {group_data['providers']}") From ef4946e957b3e394dd247dd0546d037ad91d8638 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Mon, 15 Jul 2024 14:40:36 -0400 Subject: [PATCH 03/20] add chains app & Chain model (#41) --- accounts/migrations/0004_account_chain.py | 28 +++++++++ .../migrations/0005_alter_account_chain.py | 26 ++++++++ accounts/models.py | 10 ++++ base/settings.py | 2 + chains/__init__.py | 0 chains/admin.py | 18 ++++++ chains/apps.py | 6 ++ chains/migrations/0001_initial.py | 60 +++++++++++++++++++ chains/migrations/0002_add_near_chain.py | 25 ++++++++ chains/migrations/__init__.py | 0 chains/models.py | 48 +++++++++++++++ chains/tests.py | 3 + chains/views.py | 3 + pyproject.toml | 1 + 14 files changed, 230 insertions(+) create mode 100644 accounts/migrations/0004_account_chain.py create mode 100644 accounts/migrations/0005_alter_account_chain.py create mode 100644 chains/__init__.py create mode 100644 chains/admin.py create mode 100644 chains/apps.py create mode 100644 chains/migrations/0001_initial.py create mode 100644 chains/migrations/0002_add_near_chain.py create mode 100644 chains/migrations/__init__.py create mode 100644 chains/models.py create mode 100644 chains/tests.py create mode 100644 chains/views.py diff --git a/accounts/migrations/0004_account_chain.py b/accounts/migrations/0004_account_chain.py new file mode 100644 index 0000000..5fcbb89 --- /dev/null +++ b/accounts/migrations/0004_account_chain.py @@ -0,0 +1,28 @@ +# Generated by Django 5.0.4 on 2024-07-15 18:23 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("accounts", "0003_alter_account_options"), + ("chains", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="account", + name="chain", + field=models.ForeignKey( + blank=True, + help_text="Blockchain this account is located on.", + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="accounts", + related_query_name="account", + to="chains.chain", + ), + ), + ] diff --git a/accounts/migrations/0005_alter_account_chain.py b/accounts/migrations/0005_alter_account_chain.py new file mode 100644 index 0000000..2dbe9dd --- /dev/null +++ b/accounts/migrations/0005_alter_account_chain.py @@ -0,0 +1,26 @@ +# Generated by Django 5.0.4 on 2024-07-15 18:26 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("accounts", "0004_account_chain"), + ("chains", "0002_add_near_chain"), + ] + + operations = [ + migrations.AlterField( + model_name="account", + name="chain", + field=models.ForeignKey( + help_text="Blockchain this account is located on.", + on_delete=django.db.models.deletion.CASCADE, + related_name="accounts", + related_query_name="account", + to="chains.chain", + ), + ), + ] diff --git a/accounts/models.py b/accounts/models.py index 25e647b..35afcf4 100644 --- a/accounts/models.py +++ b/accounts/models.py @@ -6,6 +6,7 @@ from django.utils.translation import gettext_lazy as _ from base.logging import logger +from chains.models import Chain class Account(models.Model): @@ -17,6 +18,15 @@ class Account(models.Model): validators=[], help_text=_("On-chain account address."), ) + chain = models.ForeignKey( + Chain, + null=False, + blank=False, + on_delete=models.CASCADE, + related_name="accounts", + related_query_name="account", + help_text=_("Blockchain this account is located on."), + ) total_donations_in_usd = models.DecimalField( _("total donations received in USD"), max_digits=20, diff --git a/base/settings.py b/base/settings.py index 65667bc..3919a7e 100644 --- a/base/settings.py +++ b/base/settings.py @@ -92,6 +92,7 @@ "django.contrib.staticfiles", "rest_framework", "drf_spectacular", + "django_extensions", "corsheaders", # "cachalot", "celery", @@ -104,6 +105,7 @@ "pots", "tokens", "nadabot", + "chains", ] DEFAULT_PAGE_SIZE = 30 diff --git a/chains/__init__.py b/chains/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chains/admin.py b/chains/admin.py new file mode 100644 index 0000000..7a9ede9 --- /dev/null +++ b/chains/admin.py @@ -0,0 +1,18 @@ +from django.contrib import admin + +from .models import Chain + + +@admin.register(Chain) +class ChainAdmin(admin.ModelAdmin): + list_display = ( + "name", + "name_slug", + "rpc_url", + "explorer_url", + "evm_compat", + "evm_chain_id", + ) + search_fields = ("name", "name_slug", "rpc_url", "explorer_url", "evm_chain_id") + list_filter = ("evm_compat",) + ordering = ("name",) diff --git a/chains/apps.py b/chains/apps.py new file mode 100644 index 0000000..e01f14f --- /dev/null +++ b/chains/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class ChainsConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "chains" diff --git a/chains/migrations/0001_initial.py b/chains/migrations/0001_initial.py new file mode 100644 index 0000000..812d7c5 --- /dev/null +++ b/chains/migrations/0001_initial.py @@ -0,0 +1,60 @@ +# Generated by Django 5.0.4 on 2024-07-15 18:23 + +import django_extensions.db.fields +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [] + + operations = [ + migrations.CreateModel( + name="Chain", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("name", models.CharField(db_index=True, max_length=32, unique=True)), + ( + "name_slug", + django_extensions.db.fields.AutoSlugField( + blank=True, + editable=False, + max_length=32, + populate_from=("name",), + unique=True, + ), + ), + ("rpc_url", models.URLField()), + ("explorer_url", models.URLField()), + ("evm_compat", models.BooleanField()), + ( + "evm_chain_id", + models.IntegerField(blank=True, db_index=True, null=True), + ), + ], + options={ + "ordering": ("name",), + }, + ), + migrations.AddConstraint( + model_name="chain", + constraint=models.CheckConstraint( + check=models.Q( + models.Q(("evm_chain_id__isnull", False), ("evm_compat", True)), + models.Q(("evm_chain_id__isnull", True), ("evm_compat", False)), + _connector="OR", + ), + name="evm_chain_id_check", + ), + ), + ] diff --git a/chains/migrations/0002_add_near_chain.py b/chains/migrations/0002_add_near_chain.py new file mode 100644 index 0000000..73fec6f --- /dev/null +++ b/chains/migrations/0002_add_near_chain.py @@ -0,0 +1,25 @@ +# Generated by Django 5.0.4 on 2024-07-15 18:24 + +from django.db import migrations + + +def create_near_chain(apps, schema_editor): + Chain = apps.get_model("chains", "Chain") + # Create the "near" chain + near_chain, created = Chain.objects.get_or_create( + name="NEAR", defaults={"evm_compat": False} + ) + + # Set "near" chain as the default for all existing accounts + Account = apps.get_model("accounts", "Account") + Account.objects.update(chain=near_chain) + print("Updated all accounts to use NEAR chain") + + +class Migration(migrations.Migration): + + dependencies = [("chains", "0001_initial"), ("accounts", "0004_account_chain")] + + operations = [ + migrations.RunPython(create_near_chain), + ] diff --git a/chains/migrations/__init__.py b/chains/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chains/models.py b/chains/models.py new file mode 100644 index 0000000..0545a4c --- /dev/null +++ b/chains/models.py @@ -0,0 +1,48 @@ +from django.db import models +from django_extensions.db.fields import AutoSlugField + + +class Chain(models.Model): + ############################ + # BLOCKCHAIN MODEL FIELDS + ############################ + + # GENERAL + name = models.CharField( + max_length=32, unique=True, blank=False, null=False, db_index=True + ) + name_slug = AutoSlugField( + populate_from=("name",), + max_length=32, + unique=True, + blank=False, + null=False, + db_index=True, + ) + + # URLS + rpc_url = models.URLField() + explorer_url = models.URLField() + + # EVM + evm_compat = models.BooleanField(null=False, blank=False) + evm_chain_id = models.IntegerField(null=True, blank=True, db_index=True) + + ############################ + # META + ############################ + class Meta: + # default record ordering + ordering = ("name",) + + constraints = [ + # if an evm then the evm_chain_id id must be set otherwise should be null + models.CheckConstraint( + name="evm_chain_id_check", + check=models.Q(evm_compat=True, evm_chain_id__isnull=False) + | models.Q(evm_compat=False, evm_chain_id__isnull=True), + ), + ] + + def __str__(self): + return self.name diff --git a/chains/tests.py b/chains/tests.py new file mode 100644 index 0000000..7ce503c --- /dev/null +++ b/chains/tests.py @@ -0,0 +1,3 @@ +from django.test import TestCase + +# Create your tests here. diff --git a/chains/views.py b/chains/views.py new file mode 100644 index 0000000..91ea44a --- /dev/null +++ b/chains/views.py @@ -0,0 +1,3 @@ +from django.shortcuts import render + +# Create your views here. diff --git a/pyproject.toml b/pyproject.toml index fee60dd..d31b0bb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ sentry-sdk = { extras = ["django"], version = "^1.45.0" } watchtower = "^3.1.0" django-cors-headers = "^4.3.1" drf-spectacular = "^0.27.2" +django-extensions = "^3.2.3" [tool.poetry.group.dev.dependencies] black = "^24.3.0" From bff777b550097f557f6411e37e592256a62eb8c1 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Tue, 16 Jul 2024 14:34:49 -0400 Subject: [PATCH 04/20] default to NEAR chain when creating new account --- accounts/models.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/accounts/models.py b/accounts/models.py index 35afcf4..65a869d 100644 --- a/accounts/models.py +++ b/accounts/models.py @@ -137,6 +137,9 @@ def fetch_near_social_profile_data(self, should_save=True): def save(self, *args, **kwargs): if self._state.adding: # If the account is being created (not updated) + if not self.chain_id: + # default to NEAR chain when none is provided + self.chain = Chain.objects.get(name="NEAR") self.fetch_near_social_profile_data( False # don't save yet as we want to avoid infinite loop ) From 6b1cd028a21fa52b811f032da40eed01996ce1db Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Wed, 17 Jul 2024 09:48:17 -0400 Subject: [PATCH 05/20] add debug logging to handler.py for memory usage & timing --- indexer_app/handler.py | 90 ++++++++++++++++++++++++++++++++---------- indexer_app/logging.py | 12 +++++- pyproject.toml | 1 + 3 files changed, 82 insertions(+), 21 deletions(-) diff --git a/indexer_app/handler.py b/indexer_app/handler.py index e65d0b1..ea93cb2 100644 --- a/indexer_app/handler.py +++ b/indexer_app/handler.py @@ -1,5 +1,6 @@ import base64 import json +import time from datetime import datetime from django.conf import settings @@ -7,21 +8,22 @@ from near_lake_framework import near_primitives from base.utils import convert_ns_to_utc -from pots.utils import match_pot_factory_pattern, match_pot_subaccount_pattern from nadabot.utils import match_nadabot_registry_pattern -from .logging import logger -from .utils import ( # handle_batch_donations, +from pots.utils import match_pot_factory_pattern, match_pot_subaccount_pattern + +from .logging import log_memory_usage, logger +from .utils import handle_add_nadabot_admin # handle_batch_donations, +from .utils import ( handle_add_stamp, handle_default_list_status_change, handle_list_admin_removal, handle_list_registration_update, handle_list_upvote, - handle_add_nadabot_admin, - handle_new_nadabot_registry, handle_new_donation, handle_new_group, handle_new_list, handle_new_list_registration, + handle_new_nadabot_registry, handle_new_pot, handle_new_pot_factory, handle_new_provider, @@ -29,17 +31,20 @@ handle_payout_challenge_response, handle_pot_application, handle_pot_application_status_change, + handle_pot_config_update, + handle_registry_blacklist_action, + handle_registry_unblacklist_action, handle_set_payouts, handle_social_profile_update, handle_transfer_payout, handle_update_default_human_threshold, - handle_registry_blacklist_action, - handle_registry_unblacklist_action, - handle_pot_config_update, ) async def handle_streamer_message(streamer_message: near_primitives.StreamerMessage): + start_time = time.time() + log_memory_usage("Start of handle_streamer_message") + block_timestamp = streamer_message.block.header.timestamp block_height = streamer_message.block.header.height now_datetime = datetime.fromtimestamp(block_timestamp / 1000000000) @@ -50,12 +55,19 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess logger.info( f"Block Height: {block_height}, Block Timestamp: {block_timestamp} ({formatted_date})" ) + logger.info( + f"Time after processing block info: {time.time() - start_time:.4f} seconds" + ) + log_memory_usage("After processing block info") # if block_height == 111867204: # with open("indexer_outcome2.json", "w") as file: # file.write(f"{streamer_message}") - for shard in streamer_message.shards: - for receipt_execution_outcome in shard.receipt_execution_outcomes: + for shard_index, shard in enumerate(streamer_message.shards): + shard_start_time = time.time() + for outcome_index, receipt_execution_outcome in enumerate( + shard.receipt_execution_outcomes + ): # we only want to proceed if the tx succeeded if ( "SuccessReceiptId" @@ -67,7 +79,9 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess receiver_id = receipt_execution_outcome.receipt.receiver_id if ( receiver_id != settings.NEAR_SOCIAL_CONTRACT_ADDRESS - and not receiver_id.endswith((settings.POTLOCK_TLA, settings.NADABOT_TLA)) + and not receiver_id.endswith( + (settings.POTLOCK_TLA, settings.NADABOT_TLA) + ) ): continue # 1. HANDLE LOGS @@ -75,6 +89,7 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess receipt = receipt_execution_outcome.receipt signer_id = receipt.receipt["Action"]["signer_id"] + log_processing_start = time.time() for log_index, log in enumerate( receipt_execution_outcome.execution_outcome.outcome.logs, start=1 ): @@ -85,20 +100,32 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess event_name = parsed_log.get("event") print("parsa parsa...", parsed_log) if event_name == "update_pot_config": - await handle_pot_config_update(parsed_log.get("data")[0], receiver_id) + await handle_pot_config_update( + parsed_log.get("data")[0], receiver_id + ) if event_name == "add_or_update_provider": - await handle_new_provider(parsed_log.get("data")[0], receiver_id, signer_id) + await handle_new_provider( + parsed_log.get("data")[0], receiver_id, signer_id + ) elif event_name == "add_stamp": - await handle_add_stamp(parsed_log.get("data")[0], receiver_id, signer_id) + await handle_add_stamp( + parsed_log.get("data")[0], receiver_id, signer_id + ) elif event_name == "update_default_human_threshold": - await handle_update_default_human_threshold(parsed_log.get("data")[0], receiver_id) + await handle_update_default_human_threshold( + parsed_log.get("data")[0], receiver_id + ) if event_name == "add_or_update_group": - await handle_new_group(parsed_log.get("data")[0], now_datetime) + await handle_new_group(parsed_log.get("data")[0], now_datetime) if event_name == "blacklist_account": - await handle_registry_blacklist_action(parsed_log.get("data")[0], receiver_id, now_datetime) + await handle_registry_blacklist_action( + parsed_log.get("data")[0], receiver_id, now_datetime + ) if event_name == "unblacklist_account": - await handle_registry_unblacklist_action(parsed_log.get("data")[0], receiver_id, now_datetime) + await handle_registry_unblacklist_action( + parsed_log.get("data")[0], receiver_id, now_datetime + ) except json.JSONDecodeError: logger.warning( f"Receipt ID: `{receipt_execution_outcome.receipt.receipt_id}`\nError during parsing logs from JSON string to dict" @@ -109,6 +136,11 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess # TODO: handle set_source_metadata logs for various contracts + logger.info( + f"Time to process logs for receipt {receipt_execution_outcome.receipt.receipt_id}: {time.time() - log_processing_start:.4f} seconds" + ) + log_memory_usage("After processing logs") + # 2. HANDLE METHOD CALLS # Skip if the tx failed # if ( @@ -120,6 +152,7 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess # # consider logging failures to logging service; for now, just skip # print("here we are...") # continue + method_call_processing_start = time.time() LISTS_CONTRACT = "lists." + settings.POTLOCK_TLA DONATE_CONTRACT = "donate." + settings.POTLOCK_TLA @@ -170,8 +203,12 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess await handle_new_pot_factory( args_dict, receiver_id, now_datetime ) - elif match_nadabot_registry_pattern(receiver_id): # matches registries in the pattern, version(v1).env(staging).nadabot.near - await handle_new_nadabot_registry(args_dict, receiver_id, now_datetime) + elif match_nadabot_registry_pattern( + receiver_id + ): # matches registries in the pattern, version(v1).env(staging).nadabot.near + await handle_new_nadabot_registry( + args_dict, receiver_id, now_datetime + ) elif match_pot_subaccount_pattern(receiver_id): logger.info( f"new pot deployment: {args_dict}, {action}" @@ -388,3 +425,16 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess logger.error(f"Error in indexer handler:\n{e}") # with open("indexer_error.txt", "a") as file: # file.write(f"{e}\n") + logger.info( + f"Time to process method calls for receipt {receipt_execution_outcome.receipt.receipt_id}: {time.time() - method_call_processing_start:.4f} seconds" + ) + log_memory_usage("After processing method calls") + logger.info( + f"Time to process shard {shard_index}: {time.time() - shard_start_time:.4f} seconds" + ) + log_memory_usage(f"After processing shard {shard_index}") + + logger.info( + f"Total time to process streamer message: {time.time() - start_time:.4f} seconds" + ) + log_memory_usage("End of handle_streamer_message") diff --git a/indexer_app/logging.py b/indexer_app/logging.py index 973684a..3627087 100644 --- a/indexer_app/logging.py +++ b/indexer_app/logging.py @@ -1,3 +1,13 @@ import logging -logger = logging.getLogger("indexer") \ No newline at end of file +import psutil + +logger = logging.getLogger("indexer") + + +def log_memory_usage(stage): + process = psutil.Process() + memory_info = process.memory_info() + logger.info( + f"{stage} - RSS: {memory_info.rss / 1024 / 1024:.2f} MB, VMS: {memory_info.vms / 1024 / 1024:.2f} MB" + ) diff --git a/pyproject.toml b/pyproject.toml index d31b0bb..e7bcb60 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ watchtower = "^3.1.0" django-cors-headers = "^4.3.1" drf-spectacular = "^0.27.2" django-extensions = "^3.2.3" +psutil = "^6.0.0" [tool.poetry.group.dev.dependencies] black = "^24.3.0" From bc1d9d10ae18e0f3b5479d618b952cc3da78a8de Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Wed, 17 Jul 2024 10:02:10 -0400 Subject: [PATCH 06/20] add additional debug logs for streamer processing time --- indexer_app/handler.py | 9 ++++++--- indexer_app/tasks.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/indexer_app/handler.py b/indexer_app/handler.py index ea93cb2..4e95e59 100644 --- a/indexer_app/handler.py +++ b/indexer_app/handler.py @@ -1,3 +1,4 @@ +import asyncio import base64 import json import time @@ -48,9 +49,11 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess block_timestamp = streamer_message.block.header.timestamp block_height = streamer_message.block.header.height now_datetime = datetime.fromtimestamp(block_timestamp / 1000000000) - await cache.aset( - "block_height", block_height - ) # TODO: add custom timeout if it should be valid for longer than default (5 minutes) + # Fire and forget the cache update + asyncio.create_task(cache.aset("block_height", block_height)) + # await cache.aset( + # "block_height", block_height + # ) # TODO: add custom timeout if it should be valid for longer than default (5 minutes) formatted_date = convert_ns_to_utc(block_timestamp) logger.info( f"Block Height: {block_height}, Block Timestamp: {block_timestamp} ({formatted_date})" diff --git a/indexer_app/tasks.py b/indexer_app/tasks.py index 0cd5b27..fc29b8f 100644 --- a/indexer_app/tasks.py +++ b/indexer_app/tasks.py @@ -1,5 +1,6 @@ import asyncio import logging +import time from pathlib import Path import requests @@ -47,16 +48,44 @@ async def indexer(from_block: int, to_block: int): while True: try: + # Log time before fetching a new block + fetch_start_time = time.time() # streamer_message is the current block streamer_message = await streamer_messages_queue.get() + fetch_end_time = time.time() + logger.info( + f"Time to fetch new block: {fetch_end_time - fetch_start_time:.4f} seconds" + ) block_count += 1 + + # Log time before caching block height + cache_start_time = time.time() await cache_block_height( "current_block_height", streamer_message.block.header.height, block_count, streamer_message.block.header.timestamp, ) # current block height + cache_end_time = time.time() + + logger.info( + f"Time to cache block height: {cache_end_time - cache_start_time:.4f} seconds" + ) + + # Log time before handling the streamer message + handle_start_time = time.time() await handle_streamer_message(streamer_message) + handle_end_time = time.time() + logger.info( + f"Time to handle streamer message: {handle_end_time - handle_start_time:.4f} seconds" + ) + + # Log total time for one iteration + iteration_end_time = time.time() + logger.info( + f"Total time for one iteration: {iteration_end_time - fetch_start_time:.4f} seconds" + ) + except Exception as e: logger.error(f"Error in streamer_messages_queue: {e}") From e288b20de661e32a771f783753c46abc76981486 Mon Sep 17 00:00:00 2001 From: Boluwatife Popoola Date: Thu, 18 Jul 2024 16:42:52 +0100 Subject: [PATCH 07/20] =?UTF-8?q?resolve=20conflicts,=20=F0=9F=A4=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- indexer_app/tasks.py | 6 +- indexer_app/utils.py | 199 ++++++++++++++++++++----------------------- 2 files changed, 95 insertions(+), 110 deletions(-) diff --git a/indexer_app/tasks.py b/indexer_app/tasks.py index fc29b8f..51644b1 100644 --- a/indexer_app/tasks.py +++ b/indexer_app/tasks.py @@ -22,6 +22,8 @@ from .logging import logger from .utils import cache_block_height, get_block_height +CURRENT_BLOCK_HEIGHT_KEY = "current_block_height" + async def indexer(from_block: int, to_block: int): """ @@ -61,7 +63,7 @@ async def indexer(from_block: int, to_block: int): # Log time before caching block height cache_start_time = time.time() await cache_block_height( - "current_block_height", + CURRENT_BLOCK_HEIGHT_KEY, streamer_message.block.header.height, block_count, streamer_message.block.header.timestamp, @@ -98,7 +100,7 @@ def listen_to_near_events(): try: # Update below with desired network & block height - start_block = get_block_height("current_block_height") + start_block = get_block_height(CURRENT_BLOCK_HEIGHT_KEY) # start_block = 119_568_113 logger.info(f"what's the start block, pray tell? {start_block-1}") loop.run_until_complete(indexer(start_block - 1, None)) diff --git a/indexer_app/utils.py b/indexer_app/utils.py index daac58c..9537070 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -15,7 +15,7 @@ from donations.models import Donation from indexer_app.models import BlockHeight from lists.models import List, ListRegistration, ListUpvote -from nadabot.models import Group, NadabotRegistry, Provider, Stamp, BlackList +from nadabot.models import BlackList, Group, NadabotRegistry, Provider, Stamp from pots.models import ( Pot, PotApplication, @@ -52,9 +52,7 @@ async def handle_social_profile_update(args_dict, receiver_id, signer_id): async def handle_new_nadabot_registry( - data: dict, - receiverId: str, - created_at: datetime + data: dict, receiverId: str, created_at: datetime ): logger.info(f"nadabot registry init... {data}") @@ -81,9 +79,7 @@ async def handle_new_nadabot_registry( async def handle_registry_blacklist_action( - data: dict, - receiverId: str, - created_at: datetime + data: dict, receiverId: str, created_at: datetime ): logger.info(f"Registry blacklist action....... {data}") @@ -97,32 +93,29 @@ async def handle_registry_blacklist_action( "registry": registry, "account": account, "reason": data.get("reason"), - "date_blacklisted": created_at + "date_blacklisted": created_at, } ) await BlackList.objects.abulk_create( - objs = [BlackList(**data) for data in bulk_obj], ignore_conflicts=True + objs=[BlackList(**data) for data in bulk_obj], ignore_conflicts=True ) except Exception as e: logger.error(f"Error in adding acct to blacklist: {e}") async def handle_registry_unblacklist_action( - data: dict, - receiverId: str, - created_at: datetime + data: dict, receiverId: str, created_at: datetime ): logger.info(f"Registry remove blacklisted accts....... {data}") try: registry, _ = await Account.objects.aget_or_create(id=receiverId) - entries = BlackList.objects.filter(account__in=data["accounts"]) + entries = BlackList.objects.filter(account__in=data["accounts"]) await entries.adelete() except Exception as e: logger.error(f"Error in removing acct from blacklist: {e}") - async def handle_new_pot( data: dict, receiver_id: str, @@ -219,72 +212,71 @@ async def handle_new_pot( logger.error(f"Failed to handle new pot, Error: {e}") - async def handle_pot_config_update( log_data: dict, receiver_id: str, ): - try: - data = log_data - logger.info(f"asserting involved accts.... {receiver_id}") - if data.get("chef"): - chef, _ = await Account.objects.aget_or_create(id=data["chef"]) - owner, _ = await Account.objects.aget_or_create(id=data["owner"]) - logger.info(f"building updated config for pot {receiver_id}") - pot_config = { - "deployer": data["deployed_by"], - "source_metadata": data["source_metadata"], - "owner_id": data["owner"], - "chef_id": data.get("chef"), - "name": data["pot_name"], - "description": data["pot_description"], - "max_approved_applicants": data["max_projects"], - "base_currency": data["base_currency"], - "application_start": datetime.fromtimestamp( - data["application_start_ms"] / 1000 - ), - "application_end": datetime.fromtimestamp( - data["application_end_ms"] / 1000 - ), - "matching_round_start": datetime.fromtimestamp( - data["public_round_start_ms"] / 1000 - ), - "matching_round_end": datetime.fromtimestamp( - data["public_round_end_ms"] / 1000 - ), - "registry_provider": data["registry_provider"], - "min_matching_pool_donation_amount": data[ - "min_matching_pool_donation_amount" - ], - "sybil_wrapper_provider": data["sybil_wrapper_provider"], - "custom_sybil_checks": data.get("custom_sybil_checks"), - "custom_min_threshold_score": data.get("custom_min_threshold_score"), - "referral_fee_matching_pool_basis_points": data[ - "referral_fee_matching_pool_basis_points" - ], - "referral_fee_public_round_basis_points": data[ - "referral_fee_public_round_basis_points" - ], - "chef_fee_basis_points": data["chef_fee_basis_points"], - "matching_pool_balance": data["matching_pool_balance"], - "total_public_donations": data["total_public_donations"], - "public_donations_count": data["public_donations_count"], - "cooldown_period_ms": data["cooldown_end_ms"], - "all_paid_out": data["all_paid_out"], - "protocol_config_provider": data["protocol_config_provider"], - } + try: + data = log_data + logger.info(f"asserting involved accts.... {receiver_id}") + if data.get("chef"): + chef, _ = await Account.objects.aget_or_create(id=data["chef"]) + owner, _ = await Account.objects.aget_or_create(id=data["owner"]) + logger.info(f"building updated config for pot {receiver_id}") + pot_config = { + "deployer": data["deployed_by"], + "source_metadata": data["source_metadata"], + "owner_id": data["owner"], + "chef_id": data.get("chef"), + "name": data["pot_name"], + "description": data["pot_description"], + "max_approved_applicants": data["max_projects"], + "base_currency": data["base_currency"], + "application_start": datetime.fromtimestamp( + data["application_start_ms"] / 1000 + ), + "application_end": datetime.fromtimestamp( + data["application_end_ms"] / 1000 + ), + "matching_round_start": datetime.fromtimestamp( + data["public_round_start_ms"] / 1000 + ), + "matching_round_end": datetime.fromtimestamp( + data["public_round_end_ms"] / 1000 + ), + "registry_provider": data["registry_provider"], + "min_matching_pool_donation_amount": data[ + "min_matching_pool_donation_amount" + ], + "sybil_wrapper_provider": data["sybil_wrapper_provider"], + "custom_sybil_checks": data.get("custom_sybil_checks"), + "custom_min_threshold_score": data.get("custom_min_threshold_score"), + "referral_fee_matching_pool_basis_points": data[ + "referral_fee_matching_pool_basis_points" + ], + "referral_fee_public_round_basis_points": data[ + "referral_fee_public_round_basis_points" + ], + "chef_fee_basis_points": data["chef_fee_basis_points"], + "matching_pool_balance": data["matching_pool_balance"], + "total_public_donations": data["total_public_donations"], + "public_donations_count": data["public_donations_count"], + "cooldown_period_ms": data["cooldown_end_ms"], + "all_paid_out": data["all_paid_out"], + "protocol_config_provider": data["protocol_config_provider"], + } - pot, created = await Pot.objects.aupdate_or_create( - id=receiver_id, defaults=pot_config - ) + pot, created = await Pot.objects.aupdate_or_create( + id=receiver_id, defaults=pot_config + ) - if data.get("admins"): - for admin_id in data["admins"]: - admin, _ = await Account.objects.aget_or_create(id=admin_id) - pot.admins.aadd(admin) - # await Pot.objects.filter(id=receiver_id).aupdate(**pot_config) - except Exception as e: - logger.error(f"Failed to update Pot config, Error: {e}") + if data.get("admins"): + for admin_id in data["admins"]: + admin, _ = await Account.objects.aget_or_create(id=admin_id) + pot.admins.aadd(admin) + # await Pot.objects.filter(id=receiver_id).aupdate(**pot_config) + except Exception as e: + logger.error(f"Failed to update Pot config, Error: {e}") async def handle_new_pot_factory(data: dict, receiver_id: str, created_at: datetime): @@ -801,6 +793,8 @@ async def handle_add_nadabot_admin(data, receiverId): await obj.admins.aadd(user) except Exception as e: logger.error(f"Failed to add nadabot admin, Error: {e}") + + # # TODO: Need to abstract some actions. # async def handle_batch_donations( # receiver_id: str, @@ -1035,10 +1029,8 @@ async def handle_new_donation( # } # await Account.objects.filter(id=recipient.id).aupdate(**acctUpdate) -async def handle_update_default_human_threshold( - data: dict, - receiverId: str -): + +async def handle_update_default_human_threshold(data: dict, receiverId: str): logger.info(f"update threshold data... {data}") try: @@ -1051,11 +1043,7 @@ async def handle_update_default_human_threshold( logger.error(f"Failed to update default threshold, Error: {e}") -async def handle_new_provider( - data: dict, - receiverId: str, - signerId: str -): +async def handle_new_provider(data: dict, receiverId: str, signerId: str): logger.info(f"new provider data: {data}, {receiverId}") data = data["provider"] @@ -1073,8 +1061,8 @@ async def handle_new_provider( # had the same id emitted for them, the id `13`, so we have to catch it and manoeuvre aroubd it. # TODO: REMOVE when next version contract is deployed, as this issue would be fixed. if provider_id == 13: - provider_id = await cache.aget("last_id", 1) - await cache.aset("last_id", provider_id+1) + provider_id = await cache.aget("last_id", 1) + await cache.aset("last_id", provider_id + 1) provider = await Provider.objects.aupdate_or_create( on_chain_id=provider_id, @@ -1100,11 +1088,7 @@ async def handle_new_provider( logger.error(f"Failed to add new stamp provider: {e}") -async def handle_add_stamp( - data: dict, - receiverId: str, - signerId: str -): +async def handle_add_stamp(data: dict, receiverId: str, signerId: str): logger.info(f"new stamp data: {data}, {receiverId}") data = data["stamp"] @@ -1126,18 +1110,14 @@ async def handle_add_stamp( logger.error(f"Failed to create stamp: {e}") - -async def handle_new_group( - data: dict, - created_at: datetime -): +async def handle_new_group(data: dict, created_at: datetime): logger.info(f"new group data: {data}") - group_data = data.get('group', {}) + group_data = data.get("group", {}) try: # group enums can have values, they are represented as a dict in the events from the indexer, and enum choices without values are presented as normal strings: # withValue: {'group': {'id': 5, 'name': 'Here we go again', 'providers': [8, 1, 4, 6], 'rule': {'IncreasingReturns': 10}}} # noValue: {"id":6,"name":"Lachlan test group","providers":[1,2],"rule":"Highest"} - rule = group_data['rule'] + rule = group_data["rule"] rule_key = rule rule_val = None if type(rule) == dict: @@ -1160,27 +1140,30 @@ async def handle_new_group( logger.info(f"addding provider.... : {group_data['providers']}") if group_data.get("providers"): for provider_id in group_data["providers"]: - provider, _ = await Provider.objects.aget_or_create(on_chain_id=provider_id) + provider, _ = await Provider.objects.aget_or_create( + on_chain_id=provider_id + ) await group.providers.aadd(provider) except Exception as e: logger.error(f"Failed to create group, because: {e}") + async def cache_block_height( key: str, height: int, block_count: int, block_timestamp: int -) -> int: +): await cache.aset(key, height) # the cache os the default go to for the restart block, the db is a backup if the redis server crashes. # if (block_count % int(settings.BLOCK_SAVE_HEIGHT or 400)) == 0: # logger.info(f"saving daylight, {height}") - await BlockHeight.objects.aupdate_or_create( - id=1, - defaults={ - "block_height": height, - "block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000), - "updated_at": timezone.now(), - }, - ) # better than ovverriding model's save method to get a singleton? we need only one entry - return height + # await BlockHeight.objects.aupdate_or_create( + # id=1, + # defaults={ + # "block_height": height, + # "block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000), + # "updated_at": timezone.now(), + # }, + # ) # better than ovverriding model's save method to get a singleton? we need only one entry + # return height def get_block_height(key: str) -> int: From 6ae3515bc06cb97a54d3a87f50c2f7e654d92554 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Wed, 17 Jul 2024 10:18:31 -0400 Subject: [PATCH 08/20] set cache key in asyncio task --- indexer_app/handler.py | 4 ++-- indexer_app/tasks.py | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/indexer_app/handler.py b/indexer_app/handler.py index 4e95e59..1fc264d 100644 --- a/indexer_app/handler.py +++ b/indexer_app/handler.py @@ -49,8 +49,8 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess block_timestamp = streamer_message.block.header.timestamp block_height = streamer_message.block.header.height now_datetime = datetime.fromtimestamp(block_timestamp / 1000000000) - # Fire and forget the cache update - asyncio.create_task(cache.aset("block_height", block_height)) + # # Fire and forget the cache update + # asyncio.create_task(cache.aset("block_height", block_height)) # await cache.aset( # "block_height", block_height # ) # TODO: add custom timeout if it should be valid for longer than default (5 minutes) diff --git a/indexer_app/tasks.py b/indexer_app/tasks.py index 51644b1..e82c10b 100644 --- a/indexer_app/tasks.py +++ b/indexer_app/tasks.py @@ -8,6 +8,7 @@ from celery import shared_task from celery.signals import task_revoked, worker_shutdown from django.conf import settings +from django.core.cache import cache from django.db import transaction from django.db.models import Count, DecimalField, Q, Sum, Value from django.db.models.functions import Cast, NullIf @@ -62,12 +63,18 @@ async def indexer(from_block: int, to_block: int): # Log time before caching block height cache_start_time = time.time() - await cache_block_height( - CURRENT_BLOCK_HEIGHT_KEY, - streamer_message.block.header.height, - block_count, - streamer_message.block.header.timestamp, - ) # current block height + # Fire and forget the cache update + asyncio.create_task( + cache.aset( + CURRENT_BLOCK_HEIGHT_KEY, streamer_message.block.header.height + ) + ) + # await cache_block_height( + # CURRENT_BLOCK_HEIGHT_KEY, + # streamer_message.block.header.height, + # block_count, + # streamer_message.block.header.timestamp, + # ) # current block height cache_end_time = time.time() logger.info( From 49439ccd09fcb4082ef891ce91c150eed7025473 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Wed, 17 Jul 2024 10:24:39 -0400 Subject: [PATCH 09/20] remove excess debug logs --- indexer_app/handler.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/indexer_app/handler.py b/indexer_app/handler.py index 1fc264d..e4e6184 100644 --- a/indexer_app/handler.py +++ b/indexer_app/handler.py @@ -58,10 +58,10 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess logger.info( f"Block Height: {block_height}, Block Timestamp: {block_timestamp} ({formatted_date})" ) - logger.info( - f"Time after processing block info: {time.time() - start_time:.4f} seconds" - ) - log_memory_usage("After processing block info") + # logger.info( + # f"Time after processing block info: {time.time() - start_time:.4f} seconds" + # ) + # log_memory_usage("After processing block info") # if block_height == 111867204: # with open("indexer_outcome2.json", "w") as file: # file.write(f"{streamer_message}") @@ -139,10 +139,10 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess # TODO: handle set_source_metadata logs for various contracts - logger.info( - f"Time to process logs for receipt {receipt_execution_outcome.receipt.receipt_id}: {time.time() - log_processing_start:.4f} seconds" - ) - log_memory_usage("After processing logs") + # logger.info( + # f"Time to process logs for receipt {receipt_execution_outcome.receipt.receipt_id}: {time.time() - log_processing_start:.4f} seconds" + # ) + # log_memory_usage("After processing logs") # 2. HANDLE METHOD CALLS # Skip if the tx failed @@ -428,16 +428,16 @@ async def handle_streamer_message(streamer_message: near_primitives.StreamerMess logger.error(f"Error in indexer handler:\n{e}") # with open("indexer_error.txt", "a") as file: # file.write(f"{e}\n") - logger.info( - f"Time to process method calls for receipt {receipt_execution_outcome.receipt.receipt_id}: {time.time() - method_call_processing_start:.4f} seconds" - ) - log_memory_usage("After processing method calls") - logger.info( - f"Time to process shard {shard_index}: {time.time() - shard_start_time:.4f} seconds" - ) - log_memory_usage(f"After processing shard {shard_index}") + # logger.info( + # f"Time to process method calls for receipt {receipt_execution_outcome.receipt.receipt_id}: {time.time() - method_call_processing_start:.4f} seconds" + # ) + # log_memory_usage("After processing method calls") + # logger.info( + # f"Time to process shard {shard_index}: {time.time() - shard_start_time:.4f} seconds" + # ) + # log_memory_usage(f"After processing shard {shard_index}") - logger.info( - f"Total time to process streamer message: {time.time() - start_time:.4f} seconds" - ) - log_memory_usage("End of handle_streamer_message") + # logger.info( + # f"Total time to process streamer message: {time.time() - start_time:.4f} seconds" + # ) + # log_memory_usage("End of handle_streamer_message") From bc4216a70b818053aca30d6272c63defd4f83269 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Wed, 17 Jul 2024 10:51:32 -0400 Subject: [PATCH 10/20] add BlockHeight record update back --- indexer_app/tasks.py | 18 +++++++++--------- indexer_app/utils.py | 16 ++++++++-------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/indexer_app/tasks.py b/indexer_app/tasks.py index e82c10b..dd2175e 100644 --- a/indexer_app/tasks.py +++ b/indexer_app/tasks.py @@ -65,16 +65,16 @@ async def indexer(from_block: int, to_block: int): cache_start_time = time.time() # Fire and forget the cache update asyncio.create_task( - cache.aset( - CURRENT_BLOCK_HEIGHT_KEY, streamer_message.block.header.height - ) + cache_block_height( + CURRENT_BLOCK_HEIGHT_KEY, + streamer_message.block.header.height, + block_count, + streamer_message.block.header.timestamp, + ) # current block height + # cache.aset( + # CURRENT_BLOCK_HEIGHT_KEY, streamer_message.block.header.height + # ) ) - # await cache_block_height( - # CURRENT_BLOCK_HEIGHT_KEY, - # streamer_message.block.header.height, - # block_count, - # streamer_message.block.header.timestamp, - # ) # current block height cache_end_time = time.time() logger.info( diff --git a/indexer_app/utils.py b/indexer_app/utils.py index 9537070..a5425ed 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -1155,14 +1155,14 @@ async def cache_block_height( # the cache os the default go to for the restart block, the db is a backup if the redis server crashes. # if (block_count % int(settings.BLOCK_SAVE_HEIGHT or 400)) == 0: # logger.info(f"saving daylight, {height}") - # await BlockHeight.objects.aupdate_or_create( - # id=1, - # defaults={ - # "block_height": height, - # "block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000), - # "updated_at": timezone.now(), - # }, - # ) # better than ovverriding model's save method to get a singleton? we need only one entry + await BlockHeight.objects.aupdate_or_create( + id=1, + defaults={ + "block_height": height, + "block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000), + "updated_at": timezone.now(), + }, + ) # better than ovverriding model's save method to get a singleton? we need only one entry # return height From 847a64fca5b6d8263babedf5da8ab344b38896aa Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Wed, 17 Jul 2024 11:00:07 -0400 Subject: [PATCH 11/20] add debug log --- indexer_app/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/indexer_app/utils.py b/indexer_app/utils.py index a5425ed..addf349 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -1151,6 +1151,7 @@ async def handle_new_group(data: dict, created_at: datetime): async def cache_block_height( key: str, height: int, block_count: int, block_timestamp: int ): + logger.info(f"caching block height: {height}") await cache.aset(key, height) # the cache os the default go to for the restart block, the db is a backup if the redis server crashes. # if (block_count % int(settings.BLOCK_SAVE_HEIGHT or 400)) == 0: From 6bf767f0195c1f33f54eb0ed6863a9588e15fa91 Mon Sep 17 00:00:00 2001 From: Prometheus Date: Wed, 17 Jul 2024 20:33:29 +0100 Subject: [PATCH 12/20] create util function for common price fetching (#43) * create util function for common price fetching * resolve reviews * add chains app & Chain model (#41) * default to NEAR chain when creating new account * add debug logging to handler.py for memory usage & timing * add additional debug logs for streamer processing time * remove BlockHeight db write in cache_block_height * set cache key in asyncio task * remove excess debug logs * add BlockHeight record update back * add debug log * Revert "Merge branch 'dev' into common-price-fetch" This reverts commit 54d526ece477b0a49957c5f085de7090f2d05542, reversing changes made to 8b6c363c6e2c5fa9d5f7fe6f1c32ceb67a794a5d. * small fixes --------- Co-authored-by: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> --- donations/models.py | 114 +++++-------------- pots/models.py | 68 ++--------- tokens/migrations/0003_token_coingecko_id.py | 24 ++++ tokens/models.py | 76 +++++++++++++ 4 files changed, 141 insertions(+), 141 deletions(-) create mode 100644 tokens/migrations/0003_token_coingecko_id.py diff --git a/donations/models.py b/donations/models.py index bb756f9..d1da775 100644 --- a/donations/models.py +++ b/donations/models.py @@ -203,90 +203,36 @@ def fetch_usd_prices(self): existing_referrer_fee_usd = self.referrer_fee_usd existing_chef_fee_usd = self.chef_fee_usd # first, see if there is a TokenHistoricalPrice within 1 day (or HISTORICAL_PRICE_QUERY_HOURS) of self.donated_at - token = self.token - time_window = timedelta(hours=settings.HISTORICAL_PRICE_QUERY_HOURS or 24) - token_prices = TokenHistoricalPrice.objects.filter( - token=token, - timestamp__gte=self.donated_at - time_window, - timestamp__lte=self.donated_at + time_window, - ) - existing_token_price = token_prices.first() - total_amount = token.format_price(self.total_amount) - net_amount = token.format_price(self.net_amount) - protocol_amount = token.format_price(self.protocol_fee) - referrer_amount = ( - None if not self.referrer_fee else token.format_price(self.referrer_fee) - ) - chef_amount = None if not self.chef_fee else token.format_price(self.chef_fee) - # chef_amount = token.format_price(self.chef_fee or "0") - if existing_token_price: - try: - price_usd = existing_token_price.price_usd - self.total_amount_usd = total_amount * price_usd - self.net_amount_usd = net_amount * price_usd - self.protocol_fee_usd = protocol_amount * price_usd - self.referrer_fee_usd = ( - None if not referrer_amount else referrer_amount * price_usd - ) - self.chef_fee_usd = None if not chef_amount else chef_amount * price_usd - self.save() - logger.info( - "USD prices calculated and saved using existing TokenHistoricalPrice" - ) - except Exception as e: + + try: + token = self.token + price_usd = token.fetch_usd_prices_common(self.donated_at) + if not price_usd: logger.error( - f"Failed to calculate and save USD prices using existing TokenHistoricalPrice: {e}" - ) - # TODO: update totals for relevant accounts - else: - # no existing price within acceptable time period; fetch from coingecko - price_data = {} - try: - logger.info( - "No existing price within acceptable time period; fetching historical price..." + f"No USD price found for token {token.symbol} at {self.paid_at}" ) - endpoint = f"{settings.COINGECKO_URL}/coins/{self.token.id.id}/history?date={format_date(self.donated_at)}&localization=false" - if settings.COINGECKO_API_KEY: - endpoint += f"&x_cg_pro_api_key={settings.COINGECKO_API_KEY}" - logger.info(f"coingecko endpoint: {endpoint}") - response = requests.get(endpoint) - logger.info(f"coingecko response: {response}") - if response.status_code == 429: - logger.warning("Coingecko rate limit exceeded") - price_data = response.json() - except Exception as e: - logger.warning(f"Failed to fetch coingecko price data: {e}") - logger.info(f"coingecko price data: {price_data}") - price_usd = ( - price_data.get("market_data", {}).get("current_price", {}).get("usd") + return + total_amount = token.format_price(self.total_amount) + net_amount = token.format_price(self.net_amount) + protocol_amount = token.format_price(self.protocol_fee) + referrer_amount = ( + None if not self.referrer_fee else token.format_price(self.referrer_fee) ) - logger.info(f"unit price: {price_usd}") - if price_usd: - try: - # convert price_usd to decimal - price_usd = Decimal(price_usd) - self.total_amount_usd = total_amount * price_usd - self.net_amount_usd = net_amount * price_usd - self.protocol_fee_usd = protocol_amount * price_usd - self.referrer_fee_usd = ( - None if not referrer_amount else referrer_amount * price_usd - ) - self.chef_fee_usd = ( - None if not chef_amount else chef_amount * price_usd - ) - self.save() - except Exception as e: - logger.error( - f"Failed to calculate and save USD prices using fetched price: {e}" - ) - # TODO: update totals for relevant accounts - try: - TokenHistoricalPrice.objects.create( - token=token, - price_usd=price_usd, - timestamp=self.donated_at, - ) - except Exception as e: - logger.warning( - f"Error creating TokenHistoricalPrice: {e} token: {token} price_usd: {price_usd}" - ) + chef_amount = None if not self.chef_fee else token.format_price(self.chef_fee) + self.total_amount_usd = total_amount * price_usd + self.net_amount_usd = net_amount * price_usd + self.protocol_fee_usd = protocol_amount * price_usd + self.referrer_fee_usd = ( + None if not referrer_amount else referrer_amount * price_usd + ) + self.chef_fee_usd = None if not chef_amount else chef_amount * price_usd + self.save() + logger.info( + f"Saved USD prices for donation: {self.on_chain_id}" + ) + except Exception as e: + logger.error( + f"Failed to calculate and save USD prices: {e}" + ) + # chef_amount = token.format_price(self.chef_fee or "0") + # TODO: update totals for relevant accounts diff --git a/pots/models.py b/pots/models.py index 62051cf..ba326ae 100644 --- a/pots/models.py +++ b/pots/models.py @@ -450,65 +450,19 @@ class PotPayout(models.Model): ### Fetches USD prices for the Donation record and saves USD totals def fetch_usd_prices(self): # first, see if there is a TokenHistoricalPrice within 1 day (or HISTORICAL_PRICE_QUERY_HOURS) of self.paid_at - token = self.token - time_window = timedelta(hours=settings.HISTORICAL_PRICE_QUERY_HOURS or 24) - token_prices = TokenHistoricalPrice.objects.filter( - token=token, - timestamp__gte=self.paid_at - time_window, - timestamp__lte=self.paid_at + time_window, - ) - existing_token_price = token_prices.first() - amount = token.format_price(self.amount) - if existing_token_price: - try: - price_usd = existing_token_price.price_usd - self.amount_paid_usd = amount * price_usd - self.save() - except Exception as e: - logger.error( - f"Failed to calculate and save USD prices for payout using existing TokenHistoricalPrice: {e}" - ) - else: - # no existing price within acceptable time period; fetch from coingecko - try: + try: + token = self.token + price_usd = token.fetch_usd_prices_common(self.paid_at) + if not price_usd: logger.info( - "No existing price within acceptable time period; fetching historical price..." + f"No USD price found for token {self.token.symbol} at {self.paid_at}" ) - endpoint = f"{settings.COINGECKO_URL}/coins/{self.token.id.id}/history?date={format_date(self.paid_at)}&localization=false" - if settings.COINGECKO_API_KEY: - endpoint += f"&x_cg_pro_api_key={settings.COINGECKO_API_KEY}" - response = requests.get(endpoint) - logger.info(f"coingecko response: {response}") - if response.status_code == 429: - logger.warning("Coingecko rate limit exceeded") - price_data = response.json() - except Exception as e: - logger.warning(f"Failed to fetch coingecko price data: {e}") - logger.info(f"coingecko price data: {price_data}") - price_usd = ( - price_data.get("market_data", {}).get("current_price", {}).get("usd") - ) - logger.info(f"unit price: {price_usd}") - if price_usd: - try: - # convert price_usd to decimal - price_usd = Decimal(price_usd) - self.amount_paid_usd = amount * price_usd - self.save() - except Exception as e: - logger.error( - f"Failed to calculate and save USD prices using fetched price: {e}" - ) - try: - TokenHistoricalPrice.objects.create( - token=token, - price_usd=price_usd, - timestamp=self.paid_at, - ) - except Exception as e: - logger.warning( - f"Error creating TokenHistoricalPrice: {e} token: {token} price_usd: {price_usd}" - ) + return + self.amount_paid_usd = token.format_price(self.amount) * price_usd + self.save() + logger.info(f"Saved USD prices for pot payout for pot id: {self.pot.id}") + except Exception as e: + logger.error(f"Failed to calculate and save USD prices: {e}") class PotPayoutChallenge(models.Model): diff --git a/tokens/migrations/0003_token_coingecko_id.py b/tokens/migrations/0003_token_coingecko_id.py new file mode 100644 index 0000000..91793d5 --- /dev/null +++ b/tokens/migrations/0003_token_coingecko_id.py @@ -0,0 +1,24 @@ +# Generated by Django 5.0.6 on 2024-07-17 17:57 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("tokens", "0002_token_icon_token_name_token_symbol"), + ] + + operations = [ + migrations.AddField( + model_name="token", + name="coingecko_id", + field=models.CharField( + default="near", + help_text="Token id on coingecko.", + max_length=255, + verbose_name="coingecko_id", + ), + preserve_default=False, + ), + ] diff --git a/tokens/models.py b/tokens/models.py index 90a7a8d..e2cd906 100644 --- a/tokens/models.py +++ b/tokens/models.py @@ -1,11 +1,16 @@ +from datetime import timedelta from decimal import Decimal from os import name +import requests +from django.conf import settings from django.db import models from django.utils import timezone from django.utils.translation import gettext_lazy as _ from accounts.models import Account +from base.logging import logger +from base.utils import format_date class Token(models.Model): @@ -37,6 +42,11 @@ class Token(models.Model): null=False, help_text=_("Token decimals."), ) + coingecko_id = models.CharField( + _("coingecko_id"), + max_length=255, + help_text=_("Token id on coingecko."), + ) def get_most_recent_price(self): return self.historical_prices.order_by("-timestamp").first() @@ -46,6 +56,72 @@ def format_price(self, amount_str: str): formatted_amount = Decimal(amount_str) / (Decimal("10") ** self.decimals) return formatted_amount + def fetch_usd_prices_common(self, timestamp): + + time_window = timedelta(hours=settings.HISTORICAL_PRICE_QUERY_HOURS or 24) + token_prices = TokenHistoricalPrice.objects.filter( + token=self, + timestamp__gte=timestamp - time_window, + timestamp__lte=timestamp + time_window, + ) + existing_token_price = token_prices.first() + if existing_token_price: + return existing_token_price.price_usd + + price_data = {} + if self.coingecko_id: + try: + logger.info( + "No existing price within acceptable time period; fetching historical pricefrom gecko..." + ) + endpoint = f"{settings.COINGECKO_URL}/coins/{self.coingecko_id}/history?date={format_date(timestamp)}&localization=false" + if settings.COINGECKO_API_KEY: + endpoint += f"&x_cg_pro_api_key={settings.COINGECKO_API_KEY}" + logger.info(f"coingecko endpoint: {endpoint}") + response = requests.get(endpoint) + logger.info(f"coingecko response: {response}") + if response.status_code == 429: + logger.warning("Coingecko rate limit exceeded") + price_data = response.json() + except Exception as e: + logger.warning(f"Failed to fetch coingecko price data: {e}") + return None + price_usd = ( + price_data.get("market_data", {}).get("current_price", {}).get("usd") + ) + if price_usd: + TokenHistoricalPrice.objects.create( + token=self, + timestamp=timestamp, + price_usd=price_usd, + ) + return Decimal(price_usd) + return None + + def save(self, *args, **kwargs): + try: + if self._state.adding: + endpoint = f"{settings.COINGECKO_URL}/coins/list?include_platform=true" + if settings.COINGECKO_API_KEY: + endpoint += f"&x_cg_pro_api_key={settings.COINGECKO_API_KEY}" + response = requests.get(endpoint) + logger.info(f"coingecko response: {response}") + if response.status_code == 429: + logger.warning("Coingecko rate limit exceeded") + price_data = response.json() + coin_data = list( + filter( + lambda x: x["symbol"] == kwargs["symbol"] + and x["platforms"].get("near-protocol"), + price_data, + ) + ) + if coin_data: + self.coingecko_id = coin_data[0]["id"] + except Exception as e: + logger.error(f"Failed to fetch token id from coingecko: {e}") + super().save(*args, **kwargs) + class TokenHistoricalPrice(models.Model): token = models.ForeignKey( From 67ccf6ff5ada2dd865bfa1ab0e45a05e68eb2504 Mon Sep 17 00:00:00 2001 From: Prometheus Date: Thu, 18 Jul 2024 14:24:32 +0100 Subject: [PATCH 13/20] handle zero net amount and migration script (#45) * handle zero net amount and migration script * remove unused import * small net_amount updates --------- Co-authored-by: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> --- .../0012_update_donations_net_amount.py | 27 +++++++++++++++++++ donations/models.py | 16 +++++------ .../management/commands/populatedata.py | 13 ++++++++- indexer_app/utils.py | 5 ++-- 4 files changed, 49 insertions(+), 12 deletions(-) create mode 100644 donations/migrations/0012_update_donations_net_amount.py diff --git a/donations/migrations/0012_update_donations_net_amount.py b/donations/migrations/0012_update_donations_net_amount.py new file mode 100644 index 0000000..974d6ca --- /dev/null +++ b/donations/migrations/0012_update_donations_net_amount.py @@ -0,0 +1,27 @@ +# Add custom migration to donations app that uses run_python to update net_amount for all Donations where net_amount="0". + +from django.db import migrations + + +def update_donations_net_amount(apps, schema_editor): + Donation = apps.get_model("donations", "Donation") + for donation in Donation.objects.filter(net_amount="0"): + total_amount = int(donation.total_amount) + protocol_fee = int(donation.protocol_fee) + referrer_fee = int(donation.referrer_fee or 0) + chef_fee = int(donation.chef_fee or 0) + + net_amount = total_amount - protocol_fee - referrer_fee - chef_fee + donation.net_amount = net_amount + donation.save() + + +class Migration(migrations.Migration): + + dependencies = [ + ("donations", "0011_remove_donation_ft_alter_donation_token"), + ] + + operations = [ + migrations.RunPython(update_donations_net_amount), + ] diff --git a/donations/models.py b/donations/models.py index d1da775..21e8e1f 100644 --- a/donations/models.py +++ b/donations/models.py @@ -208,8 +208,8 @@ def fetch_usd_prices(self): token = self.token price_usd = token.fetch_usd_prices_common(self.donated_at) if not price_usd: - logger.error( - f"No USD price found for token {token.symbol} at {self.paid_at}" + logger.info( + f"No USD price found for token {token.name} ({token.id.id}) at {self.donated_at}" ) return total_amount = token.format_price(self.total_amount) @@ -218,7 +218,9 @@ def fetch_usd_prices(self): referrer_amount = ( None if not self.referrer_fee else token.format_price(self.referrer_fee) ) - chef_amount = None if not self.chef_fee else token.format_price(self.chef_fee) + chef_amount = ( + None if not self.chef_fee else token.format_price(self.chef_fee) + ) self.total_amount_usd = total_amount * price_usd self.net_amount_usd = net_amount * price_usd self.protocol_fee_usd = protocol_amount * price_usd @@ -227,12 +229,8 @@ def fetch_usd_prices(self): ) self.chef_fee_usd = None if not chef_amount else chef_amount * price_usd self.save() - logger.info( - f"Saved USD prices for donation: {self.on_chain_id}" - ) + logger.info(f"Saved USD prices for donation: {self.on_chain_id}") except Exception as e: - logger.error( - f"Failed to calculate and save USD prices: {e}" - ) + logger.error(f"Failed to calculate and save USD prices: {e}") # chef_amount = token.format_price(self.chef_fee or "0") # TODO: update totals for relevant accounts diff --git a/indexer_app/management/commands/populatedata.py b/indexer_app/management/commands/populatedata.py index 6e260b1..ce33ac4 100644 --- a/indexer_app/management/commands/populatedata.py +++ b/indexer_app/management/commands/populatedata.py @@ -596,6 +596,17 @@ def handle(self, *args, **options): if donation.get("chef_id"): chef, _ = Account.objects.get_or_create(id=donation["chef_id"]) + # net_amount is 0 for some donations, so calculate it + net_amount = donation["net_amount"] + if net_amount == "0": + total_amount = int(donation["total_amount"]) + protocol_fee = int(donation["protocol_fee"]) + referrer_fee = int(donation["referrer_fee"] or 0) + chef_fee = int(donation["chef_fee"] or 0) + net_amount = ( + total_amount - protocol_fee - referrer_fee - chef_fee + ) + # pot donations always NEAR ft_acct, _ = Account.objects.get_or_create(id="near") ft_token, _ = Token.objects.get_or_create(id=ft_acct) @@ -604,7 +615,7 @@ def handle(self, *args, **options): "total_amount": donation["total_amount"], "total_amount_usd": None, # USD amounts will be added later "net_amount_usd": None, - "net_amount": donation["net_amount"], + "net_amount": net_amount, "token": ft_token, "message": donation["message"], "donated_at": datetime.fromtimestamp( diff --git a/indexer_app/utils.py b/indexer_app/utils.py index addf349..7291e80 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -829,15 +829,16 @@ async def handle_new_donation( logger.info(f"handle_new_donation args data: {data}, {receiver_id}") logger.info(f"donation data: {donation_data}") - if "net_amount" in donation_data: + if "net_amount" in donation_data and donation_data["net_amount"] != "0": net_amount = int(donation_data["net_amount"]) else: # direct donations don't have net_amount property, so have to calculate it here total_amount = int(donation_data["total_amount"]) protocol_fee = int(donation_data["protocol_fee"]) referrer_fee = int(donation_data["referrer_fee"] or 0) + chef_fee = int(donation_data.get("chef_fee") or 0) - net_amount = total_amount - protocol_fee - referrer_fee + net_amount = total_amount - protocol_fee - referrer_fee - chef_fee donated_at = datetime.fromtimestamp( (donation_data.get("donated_at") or donation_data.get("donated_at_ms")) / 1000 From ec1f73fff57c2d007652ddcd1022a3448df14996 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Thu, 18 Jul 2024 09:30:09 -0400 Subject: [PATCH 14/20] add coingecko_id to token admin --- tokens/admin.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tokens/admin.py b/tokens/admin.py index 76aedc9..b949d2b 100644 --- a/tokens/admin.py +++ b/tokens/admin.py @@ -5,7 +5,15 @@ @admin.register(Token) class TokenAdmin(admin.ModelAdmin): - list_display = ("id", "name", "symbol", "icon", "decimals", "get_most_recent_price") + list_display = ( + "id", + "name", + "symbol", + "coingecko_id", + "icon", + "decimals", + "get_most_recent_price", + ) search_fields = ("id",) def get_most_recent_price(self, obj): From 5a3504eb3e3310d752031f82b668fce6fff8d77b Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Thu, 18 Jul 2024 09:39:02 -0400 Subject: [PATCH 15/20] make coingecko_id nullable --- .../0004_alter_token_coingecko_id.py | 23 +++++++++++++++++++ tokens/models.py | 1 + 2 files changed, 24 insertions(+) create mode 100644 tokens/migrations/0004_alter_token_coingecko_id.py diff --git a/tokens/migrations/0004_alter_token_coingecko_id.py b/tokens/migrations/0004_alter_token_coingecko_id.py new file mode 100644 index 0000000..d64b775 --- /dev/null +++ b/tokens/migrations/0004_alter_token_coingecko_id.py @@ -0,0 +1,23 @@ +# Generated by Django 5.0.4 on 2024-07-18 13:38 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("tokens", "0003_token_coingecko_id"), + ] + + operations = [ + migrations.AlterField( + model_name="token", + name="coingecko_id", + field=models.CharField( + help_text="Token id on coingecko.", + max_length=255, + null=True, + verbose_name="coingecko_id", + ), + ), + ] diff --git a/tokens/models.py b/tokens/models.py index e2cd906..9f560e9 100644 --- a/tokens/models.py +++ b/tokens/models.py @@ -45,6 +45,7 @@ class Token(models.Model): coingecko_id = models.CharField( _("coingecko_id"), max_length=255, + null=True, help_text=_("Token id on coingecko."), ) From 2bc255db16d5f2e19b2cfa8a7117743190f6a4d2 Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Thu, 18 Jul 2024 09:46:43 -0400 Subject: [PATCH 16/20] add blank=True to nullable Token fields for admin --- ..._coingecko_id_alter_token_icon_and_more.py | 56 +++++++++++++++++++ tokens/models.py | 4 ++ 2 files changed, 60 insertions(+) create mode 100644 tokens/migrations/0005_alter_token_coingecko_id_alter_token_icon_and_more.py diff --git a/tokens/migrations/0005_alter_token_coingecko_id_alter_token_icon_and_more.py b/tokens/migrations/0005_alter_token_coingecko_id_alter_token_icon_and_more.py new file mode 100644 index 0000000..25d205a --- /dev/null +++ b/tokens/migrations/0005_alter_token_coingecko_id_alter_token_icon_and_more.py @@ -0,0 +1,56 @@ +# Generated by Django 5.0.4 on 2024-07-18 13:46 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("tokens", "0004_alter_token_coingecko_id"), + ] + + operations = [ + migrations.AlterField( + model_name="token", + name="coingecko_id", + field=models.CharField( + blank=True, + help_text="Token id on coingecko.", + max_length=255, + null=True, + verbose_name="coingecko_id", + ), + ), + migrations.AlterField( + model_name="token", + name="icon", + field=models.TextField( + blank=True, + help_text="Token icon (base64 data URL).", + null=True, + verbose_name="icon", + ), + ), + migrations.AlterField( + model_name="token", + name="name", + field=models.CharField( + blank=True, + help_text="Token name.", + max_length=255, + null=True, + verbose_name="name", + ), + ), + migrations.AlterField( + model_name="token", + name="symbol", + field=models.CharField( + blank=True, + help_text="Token symbol.", + max_length=255, + null=True, + verbose_name="symbol", + ), + ), + ] diff --git a/tokens/models.py b/tokens/models.py index 9f560e9..5ff50dc 100644 --- a/tokens/models.py +++ b/tokens/models.py @@ -24,17 +24,20 @@ class Token(models.Model): _("name"), max_length=255, null=True, + blank=True, help_text=_("Token name."), ) symbol = models.CharField( _("symbol"), max_length=255, null=True, + blank=True, help_text=_("Token symbol."), ) icon = models.TextField( _("icon"), null=True, + blank=True, help_text=_("Token icon (base64 data URL)."), ) decimals = models.PositiveIntegerField( @@ -46,6 +49,7 @@ class Token(models.Model): _("coingecko_id"), max_length=255, null=True, + blank=True, help_text=_("Token id on coingecko."), ) From 04b6d3dbbc43d90ae3e4899a165bbb5b2c90eeb7 Mon Sep 17 00:00:00 2001 From: Boluwatife Popoola Date: Thu, 18 Jul 2024 22:47:45 +0100 Subject: [PATCH 17/20] fix new conflicts --- indexer_app/tasks.py | 3 +-- indexer_app/utils.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/indexer_app/tasks.py b/indexer_app/tasks.py index dd2175e..473aa6d 100644 --- a/indexer_app/tasks.py +++ b/indexer_app/tasks.py @@ -62,8 +62,7 @@ async def indexer(from_block: int, to_block: int): block_count += 1 # Log time before caching block height - cache_start_time = time.time() - # Fire and forget the cache update + cache_start_time = time.time() # Fire and forget the cache update asyncio.create_task( cache_block_height( CURRENT_BLOCK_HEIGHT_KEY, diff --git a/indexer_app/utils.py b/indexer_app/utils.py index 7291e80..9f98de4 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -1165,6 +1165,16 @@ async def cache_block_height( "updated_at": timezone.now(), }, ) # better than ovverriding model's save method to get a singleton? we need only one entry +======= + # await BlockHeight.objects.aupdate_or_create( + # id=1, + # defaults={ + # "block_height": height, + # "block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000), + # "updated_at": timezone.now(), + # }, + # ) # better than ovverriding model's save method to get a singleton? we need only one entry +>>>>>>> e288b20 (resolve conflicts, 🤦) # return height From 9bea4e8df2311426a5db53b43329dee032521721 Mon Sep 17 00:00:00 2001 From: Boluwatife Popoola Date: Thu, 18 Jul 2024 22:18:31 +0100 Subject: [PATCH 18/20] fix populate data --- indexer_app/management/commands/populatedata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer_app/management/commands/populatedata.py b/indexer_app/management/commands/populatedata.py index ce33ac4..9fb9a9b 100644 --- a/indexer_app/management/commands/populatedata.py +++ b/indexer_app/management/commands/populatedata.py @@ -659,7 +659,7 @@ def handle(self, *args, **options): for payout in config["payouts"]: paid_at = ( None - if "paid_at" not in payout + if payout.get("paid_at") is None else datetime.fromtimestamp(payout["paid_at"] / 1000) ) recipient, _ = Account.objects.get_or_create( From a20b6110261ea8671e77b3040f384ca89aff159b Mon Sep 17 00:00:00 2001 From: Lachlan Glen <54282009+lachlanglen@users.noreply.github.com> Date: Wed, 17 Jul 2024 10:11:41 -0400 Subject: [PATCH 19/20] remove BlockHeight db write in cache_block_height --- indexer_app/utils.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/indexer_app/utils.py b/indexer_app/utils.py index 9f98de4..5a165a2 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -1165,17 +1165,7 @@ async def cache_block_height( "updated_at": timezone.now(), }, ) # better than ovverriding model's save method to get a singleton? we need only one entry -======= - # await BlockHeight.objects.aupdate_or_create( - # id=1, - # defaults={ - # "block_height": height, - # "block_timestamp": datetime.fromtimestamp(block_timestamp / 1000000000), - # "updated_at": timezone.now(), - # }, - # ) # better than ovverriding model's save method to get a singleton? we need only one entry ->>>>>>> e288b20 (resolve conflicts, 🤦) - # return height + return height def get_block_height(key: str) -> int: From 17c301148e807542a5ee7ef43bc75bbe4c1567a1 Mon Sep 17 00:00:00 2001 From: Boluwatife Popoola Date: Fri, 19 Jul 2024 00:11:31 +0100 Subject: [PATCH 20/20] fix conflict change --- indexer_app/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer_app/utils.py b/indexer_app/utils.py index 5a165a2..7291e80 100644 --- a/indexer_app/utils.py +++ b/indexer_app/utils.py @@ -1165,7 +1165,7 @@ async def cache_block_height( "updated_at": timezone.now(), }, ) # better than ovverriding model's save method to get a singleton? we need only one entry - return height + # return height def get_block_height(key: str) -> int: