From ac1ca80bd30312dd7a00ee6109ab02b7a35e04df Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Wed, 22 Jan 2025 20:01:13 -0500 Subject: [PATCH] changes for all ledger entries --- cmd/export_ledger_entry_changes.go | 36 ++++---- internal/input/change_compactor.go | 60 ++++++++---- internal/input/changes.go | 14 +-- internal/input/changes_test.go | 23 ++--- internal/transform/account.go | 16 ++-- internal/transform/account_signer.go | 16 ++-- internal/transform/account_signer_test.go | 31 +++++-- internal/transform/account_test.go | 76 ++++++++++++++-- internal/transform/claimable_balance.go | 16 ++-- internal/transform/claimable_balance_test.go | 44 +++++++-- internal/transform/config_setting.go | 16 ++-- internal/transform/config_setting_test.go | 62 +++++++++++-- internal/transform/contract_code.go | 16 ++-- internal/transform/contract_code_test.go | 62 +++++++++++-- internal/transform/contract_data.go | 16 ++-- internal/transform/contract_data_test.go | 62 +++++++++++-- internal/transform/liquidity_pool.go | 16 ++-- internal/transform/liquidity_pool_test.go | 62 +++++++++++-- internal/transform/offer.go | 17 ++-- internal/transform/offer_normalized.go | 4 +- internal/transform/offer_normalized_test.go | 34 +++++++ internal/transform/offer_test.go | 78 ++++++++++++++-- internal/transform/schema.go | 52 ++++++++--- internal/transform/trustline.go | 16 ++-- internal/transform/trustline_test.go | 96 ++++++++++++++++++-- internal/transform/ttl.go | 16 ++-- internal/transform/ttl_test.go | 62 +++++++++++-- internal/utils/main.go | 45 +++++++++ 28 files changed, 813 insertions(+), 251 deletions(-) diff --git a/cmd/export_ledger_entry_changes.go b/cmd/export_ledger_entry_changes.go index 572638a2..5e268b80 100644 --- a/cmd/export_ledger_entry_changes.go +++ b/cmd/export_ledger_entry_changes.go @@ -119,13 +119,13 @@ be exported.`, if !exports["export-accounts"] { continue } - for i, change := range changes.Changes { + for _, change := range changes { if changed, err := change.AccountChangedExceptSigners(); err != nil { cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err)) continue } else if changed { - acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i]) + acc, err := transform.TransformAccount(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -149,8 +149,8 @@ be exported.`, if !exports["export-balances"] { continue } - for i, change := range changes.Changes { - balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i]) + for _, change := range changes { + balance, err := transform.TransformClaimableBalance(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -162,8 +162,8 @@ be exported.`, if !exports["export-offers"] { continue } - for i, change := range changes.Changes { - offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i]) + for _, change := range changes { + offer, err := transform.TransformOffer(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -175,8 +175,8 @@ be exported.`, if !exports["export-trustlines"] { continue } - for i, change := range changes.Changes { - trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i]) + for _, change := range changes { + trust, err := transform.TransformTrustline(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -188,8 +188,8 @@ be exported.`, if !exports["export-pools"] { continue } - for i, change := range changes.Changes { - pool, err := transform.TransformPool(change, changes.LedgerHeaders[i]) + for _, change := range changes { + pool, err := transform.TransformPool(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -201,9 +201,9 @@ be exported.`, if !exports["export-contract-data"] { continue } - for i, change := range changes.Changes { + for _, change := range changes { TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData) - contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i]) + contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -221,8 +221,8 @@ be exported.`, if !exports["export-contract-code"] { continue } - for i, change := range changes.Changes { - contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i]) + for _, change := range changes { + contractCode, err := transform.TransformContractCode(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -234,8 +234,8 @@ be exported.`, if !exports["export-config-settings"] { continue } - for i, change := range changes.Changes { - configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i]) + for _, change := range changes { + configSettings, err := transform.TransformConfigSetting(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) @@ -247,8 +247,8 @@ be exported.`, if !exports["export-ttl"] { continue } - for i, change := range changes.Changes { - ttl, err := transform.TransformTtl(change, changes.LedgerHeaders[i]) + for _, change := range changes { + ttl, err := transform.TransformTtl(change) if err != nil { entry, _, _, _ := utils.ExtractEntryFromChange(change) cmdLogger.LogError(fmt.Errorf("error transforming ttl entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err)) diff --git a/internal/input/change_compactor.go b/internal/input/change_compactor.go index 590077af..58be7a04 100644 --- a/internal/input/change_compactor.go +++ b/internal/input/change_compactor.go @@ -109,6 +109,8 @@ func (c *ChangeCompactor) addCreatedChange(change ingest.Change) error { return nil } + operationIndex, transaction, ledger, ledgerUpgrade := c.getChangeDetails(existingChange, change) + switch existingChange.LedgerEntryChangeType() { case xdr.LedgerEntryChangeTypeLedgerEntryCreated: return ingest.NewStateError(errors.Errorf( @@ -128,10 +130,10 @@ func (c *ChangeCompactor) addCreatedChange(change ingest.Change) error { Pre: existingChange.Pre, Post: change.Post, Reason: change.Reason, - OperationIndex: change.OperationIndex, - Transaction: change.Transaction, - Ledger: change.Ledger, - LedgerUpgrade: change.LedgerUpgrade, + OperationIndex: operationIndex, + Transaction: transaction, + Ledger: ledger, + LedgerUpgrade: ledgerUpgrade, } default: return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType()) @@ -161,6 +163,8 @@ func (c *ChangeCompactor) addUpdatedChange(change ingest.Change) error { return nil } + operationIndex, transaction, ledger, ledgerUpgrade := c.getChangeDetails(existingChange, change) + switch existingChange.LedgerEntryChangeType() { case xdr.LedgerEntryChangeTypeLedgerEntryCreated: // If existing type is created it means that this entry does not @@ -170,10 +174,10 @@ func (c *ChangeCompactor) addUpdatedChange(change ingest.Change) error { Pre: existingChange.Pre, // = nil Post: change.Post, Reason: change.Reason, - OperationIndex: change.OperationIndex, - Transaction: change.Transaction, - Ledger: change.Ledger, - LedgerUpgrade: change.LedgerUpgrade, + OperationIndex: operationIndex, + Transaction: transaction, + Ledger: ledger, + LedgerUpgrade: ledgerUpgrade, } case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: c.cache[ledgerKeyString] = ingest.Change{ @@ -181,10 +185,10 @@ func (c *ChangeCompactor) addUpdatedChange(change ingest.Change) error { Pre: existingChange.Pre, Post: change.Post, Reason: change.Reason, - OperationIndex: change.OperationIndex, - Transaction: change.Transaction, - Ledger: change.Ledger, - LedgerUpgrade: change.LedgerUpgrade, + OperationIndex: operationIndex, + Transaction: transaction, + Ledger: ledger, + LedgerUpgrade: ledgerUpgrade, } case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: return ingest.NewStateError(errors.Errorf( @@ -219,6 +223,8 @@ func (c *ChangeCompactor) addRemovedChange(change ingest.Change) error { return nil } + operationIndex, transaction, ledger, ledgerUpgrade := c.getChangeDetails(existingChange, change) + switch existingChange.LedgerEntryChangeType() { case xdr.LedgerEntryChangeTypeLedgerEntryCreated: // If existing type is created it means that this will be no op. @@ -230,10 +236,10 @@ func (c *ChangeCompactor) addRemovedChange(change ingest.Change) error { Pre: existingChange.Pre, Post: nil, Reason: change.Reason, - OperationIndex: change.OperationIndex, - Transaction: change.Transaction, - Ledger: change.Ledger, - LedgerUpgrade: change.LedgerUpgrade, + OperationIndex: operationIndex, + Transaction: transaction, + Ledger: ledger, + LedgerUpgrade: ledgerUpgrade, } case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: return ingest.NewStateError(errors.Errorf( @@ -263,3 +269,25 @@ func (c *ChangeCompactor) GetChanges() []ingest.Change { func (c *ChangeCompactor) Size() int { return len(c.cache) } + +func (c *ChangeCompactor) getChangeDetails(existingChange, change ingest.Change) (uint32, *ingest.LedgerTransaction, *xdr.LedgerCloseMeta, *xdr.LedgerUpgrade) { + operationIndex := change.OperationIndex + transaction := change.Transaction + ledger := change.Ledger + ledgerUpgrade := change.LedgerUpgrade + + if operationIndex == 0 { + operationIndex = existingChange.OperationIndex + } + if transaction == nil { + transaction = existingChange.Transaction + } + if ledger == nil { + ledger = existingChange.Ledger + } + if ledgerUpgrade == nil { + ledgerUpgrade = existingChange.LedgerUpgrade + } + + return operationIndex, transaction, ledger, ledgerUpgrade +} diff --git a/internal/input/changes.go b/internal/input/changes.go index 870b4ed2..6239cc56 100644 --- a/internal/input/changes.go +++ b/internal/input/changes.go @@ -17,14 +17,9 @@ var ( ExtractBatch = extractBatch ) -type LedgerChanges struct { - Changes []ingest.Change - LedgerHeaders []xdr.LedgerHeaderHistoryEntry -} - // ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd) type ChangeBatch struct { - Changes map[xdr.LedgerEntryType]LedgerChanges + Changes map[xdr.LedgerEntryType][]ingest.Change BatchStart uint32 BatchEnd uint32 } @@ -98,7 +93,7 @@ func extractBatch( xdr.LedgerEntryTypeConfigSetting, xdr.LedgerEntryTypeTtl} - ledgerChanges := map[xdr.LedgerEntryType]LedgerChanges{} + ledgerChanges := map[xdr.LedgerEntryType][]ingest.Change{} ctx := context.Background() for seq := batchStart; seq <= batchEnd; { changeCompactors := map[xdr.LedgerEntryType]*ChangeCompactor{} @@ -108,13 +103,11 @@ func extractBatch( // if this ledger is available, we process its changes and move on to the next ledger by incrementing seq. // Otherwise, nothing is incremented, and we try again on the next iteration of the loop - var header xdr.LedgerHeaderHistoryEntry if seq <= batchEnd { changeReader, err := ingest.NewLedgerChangeReader(ctx, *backend, env.NetworkPassphrase, seq) if err != nil { logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err) } - header = changeReader.LedgerTransactionReader.GetHeader() for { change, err := changeReader.Read() @@ -144,8 +137,7 @@ func extractBatch( for dataType, compactor := range changeCompactors { for _, change := range compactor.GetChanges() { dataTypeChanges := ledgerChanges[dataType] - dataTypeChanges.Changes = append(dataTypeChanges.Changes, change) - dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header) + dataTypeChanges = append(dataTypeChanges, change) ledgerChanges[dataType] = dataTypeChanges } } diff --git a/internal/input/changes_test.go b/internal/input/changes_test.go index f5bce40d..6c197abc 100644 --- a/internal/input/changes_test.go +++ b/internal/input/changes_test.go @@ -114,14 +114,15 @@ func TestSendBatchToChannel(t *testing.T) { } func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch { - changes := map[xdr.LedgerEntryType]LedgerChanges{ + changes := map[xdr.LedgerEntryType][]ingest.Change{ entryType: { - Changes: []ingest.Change{ - {Type: entry.Data.Type, Post: &entry}, + { + Type: entry.Data.Type, + Post: &entry, }, - LedgerHeaders: []xdr.LedgerHeaderHistoryEntry{}, }, } + return ChangeBatch{ Changes: changes, } @@ -133,7 +134,7 @@ func mockExtractBatch( env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch { log.Errorf("mock called") return ChangeBatch{ - Changes: map[xdr.LedgerEntryType]LedgerChanges{}, + Changes: map[xdr.LedgerEntryType][]ingest.Change{}, BatchStart: batchStart, BatchEnd: batchEnd, } @@ -161,7 +162,7 @@ func TestStreamChangesBatchNumbers(t *testing.T) { args: input{batchStart: 1, batchEnd: 65}, out: output{ batchRanges: []batchRange{ - batchRange{ + { batchStart: 1, batchEnd: 65, }, }, @@ -171,9 +172,9 @@ func TestStreamChangesBatchNumbers(t *testing.T) { args: input{batchStart: 1, batchEnd: 66}, out: output{ batchRanges: []batchRange{ - batchRange{ + { batchStart: 1, batchEnd: 64, - }, batchRange{ + }, { batchStart: 65, batchEnd: 66, }, }, @@ -183,10 +184,10 @@ func TestStreamChangesBatchNumbers(t *testing.T) { args: input{batchStart: 1, batchEnd: 128}, out: output{ batchRanges: []batchRange{ - batchRange{ + { batchStart: 1, batchEnd: 64, }, - batchRange{ + { batchStart: 65, batchEnd: 128, }, }, @@ -196,7 +197,7 @@ func TestStreamChangesBatchNumbers(t *testing.T) { args: input{batchStart: 1, batchEnd: 32}, out: output{ batchRanges: []batchRange{ - batchRange{ + { batchStart: 1, batchEnd: 32, }, }, diff --git a/internal/transform/account.go b/internal/transform/account.go index 84f4c706..4512a268 100644 --- a/internal/transform/account.go +++ b/internal/transform/account.go @@ -10,7 +10,7 @@ import ( ) // TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) { +func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return AccountOutput{}, err @@ -76,12 +76,7 @@ func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistory outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return AccountOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) transformedAccount := AccountOutput{ AccountID: outputID, @@ -105,8 +100,11 @@ func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistory NumSponsoring: uint32(accountEntry.NumSponsoring()), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedAccount, nil } diff --git a/internal/transform/account_signer.go b/internal/transform/account_signer.go index 348cb6cd..ac21403b 100644 --- a/internal/transform/account_signer.go +++ b/internal/transform/account_signer.go @@ -6,8 +6,6 @@ import ( "github.com/guregu/null" "github.com/stellar/go/ingest" - "github.com/stellar/go/ingest/ledger" - "github.com/stellar/stellar-etl/internal/toid" "github.com/stellar/stellar-etl/internal/utils" ) @@ -25,10 +23,7 @@ func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) return signers, fmt.Errorf("could not extract signer data from ledger entry of type: %+v", ledgerEntry.Data.Type) } - closedAt := ledger.ClosedAt(*ledgerChange.Ledger) - ledgerSequence := ledger.Sequence(*ledgerChange.Ledger) - outputTransactionID := toid.New(int32(ledgerSequence), int32(ledgerChange.Transaction.Index), 0).ToInt64() - outputOperationID := toid.New(int32(ledgerSequence), int32(ledgerChange.Transaction.Index), int32(ledgerChange.OperationIndex+1)).ToInt64() + changeDetails := utils.GetChangesDetails(ledgerChange) sponsors := accountEntry.SponsorPerSigner() for signer, weight := range accountEntry.SignerSummary() { @@ -45,10 +40,11 @@ func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) LastModifiedLedger: outputLastModifiedLedger, LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: ledgerSequence, - TransactionID: outputTransactionID, - OperationID: outputOperationID, + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, }) } sort.Slice(signers, func(a, b int) bool { return signers[a].Weight < signers[b].Weight }) diff --git a/internal/transform/account_signer_test.go b/internal/transform/account_signer_test.go index 6ba392b7..19c2fa02 100644 --- a/internal/transform/account_signer_test.go +++ b/internal/transform/account_signer_test.go @@ -149,8 +149,22 @@ func makeSignersTestInput() ingest.Change { }, Transaction: &ingest.LedgerTransaction{ Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, }, - OperationIndex: 1, + OperationIndex: 0, } } @@ -166,8 +180,9 @@ func makeSignersTestOutput() []AccountSignerOutput { Deleted: true, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), - TransactionID: 42949677056, - OperationID: 42949677058, + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, { AccountID: testAccount1ID.Address(), Signer: "GACAKBQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB3BQ", @@ -178,8 +193,9 @@ func makeSignersTestOutput() []AccountSignerOutput { Deleted: true, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), - TransactionID: 42949677056, - OperationID: 42949677058, + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, { AccountID: testAccount1ID.Address(), Signer: "GAFAWDAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABNDC", @@ -190,8 +206,9 @@ func makeSignersTestOutput() []AccountSignerOutput { Deleted: true, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), - TransactionID: 42949677056, - OperationID: 42949677058, + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, } } diff --git a/internal/transform/account_test.go b/internal/transform/account_test.go index af050c03..fdb6f543 100644 --- a/internal/transform/account_test.go +++ b/internal/transform/account_test.go @@ -36,6 +36,22 @@ func TestTransformAccount(t *testing.T) { Type: xdr.LedgerEntryTypeOffer, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, }, }, AccountOutput{}, fmt.Errorf("could not extract account data from ledger entry; actual type is LedgerEntryTypeOffer"), @@ -95,15 +111,7 @@ func TestTransformAccount(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformAccount(test.input.ledgerChange, header) + actualOutput, actualError := TransformAccount(test.input.ledgerChange) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -119,6 +127,22 @@ func wrapAccountEntry(accountEntry xdr.AccountEntry, lastModified int) ingest.Ch Account: &accountEntry, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, } } @@ -166,6 +190,37 @@ func makeAccountTestInput() ingest.Change { Type: xdr.LedgerEntryTypeAccount, Pre: &ledgerEntry, Post: nil, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, } } @@ -192,5 +247,8 @@ func makeAccountTestOutput() AccountOutput { Deleted: true, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), } } diff --git a/internal/transform/claimable_balance.go b/internal/transform/claimable_balance.go index fb76b1d2..4013cc0b 100644 --- a/internal/transform/claimable_balance.go +++ b/internal/transform/claimable_balance.go @@ -21,7 +21,7 @@ func transformClaimants(claimants []xdr.Claimant) []Claimant { } // TransformClaimableBalance converts a claimable balance from the history archive ingestion system into a form suitable for BigQuery -func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ClaimableBalanceOutput, error) { +func TransformClaimableBalance(ledgerChange ingest.Change) (ClaimableBalanceOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ClaimableBalanceOutput{}, err @@ -45,12 +45,7 @@ func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHead outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return ClaimableBalanceOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) transformed := ClaimableBalanceOutput{ BalanceID: balanceID, @@ -65,8 +60,11 @@ func TransformClaimableBalance(ledgerChange ingest.Change, header xdr.LedgerHead LedgerEntryChange: uint32(changeType), Flags: outputFlags, Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformed, nil } diff --git a/internal/transform/claimable_balance_test.go b/internal/transform/claimable_balance_test.go index 90690fa1..00a11e4b 100644 --- a/internal/transform/claimable_balance_test.go +++ b/internal/transform/claimable_balance_test.go @@ -35,15 +35,7 @@ func TestTransformClaimableBalance(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformClaimableBalance(test.input.ingest, header) + actualOutput, actualError := TransformClaimableBalance(test.input.ingest) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -100,6 +92,37 @@ func makeClaimableBalanceTestInput() ingest.Change { Type: xdr.LedgerEntryTypeClaimableBalance, Pre: &ledgerEntry, Post: nil, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, } } @@ -126,5 +149,8 @@ func makeClaimableBalanceTestOutput() ClaimableBalanceOutput { Deleted: true, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), } } diff --git a/internal/transform/config_setting.go b/internal/transform/config_setting.go index f110b2ab..66394f4b 100644 --- a/internal/transform/config_setting.go +++ b/internal/transform/config_setting.go @@ -10,7 +10,7 @@ import ( ) // TransformConfigSetting converts an config setting ledger change entry into a form suitable for BigQuery -func TransformConfigSetting(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ConfigSettingOutput, error) { +func TransformConfigSetting(ledgerChange ingest.Change) (ConfigSettingOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ConfigSettingOutput{}, err @@ -90,12 +90,7 @@ func TransformConfigSetting(ledgerChange ingest.Change, header xdr.LedgerHeaderH bucketListSizeWindow = append(bucketListSizeWindow, uint64(sizeWindow)) } - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return ConfigSettingOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) transformedConfigSetting := ConfigSettingOutput{ ConfigSettingId: int32(configSettingId), @@ -143,8 +138,11 @@ func TransformConfigSetting(ledgerChange ingest.Change, header xdr.LedgerHeaderH LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedConfigSetting, nil } diff --git a/internal/transform/config_setting_test.go b/internal/transform/config_setting_test.go index 59163e88..c6c28071 100644 --- a/internal/transform/config_setting_test.go +++ b/internal/transform/config_setting_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/ingest" @@ -30,6 +31,23 @@ func TestTransformConfigSetting(t *testing.T) { Type: xdr.LedgerEntryTypeOffer, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, ConfigSettingOutput{}, fmt.Errorf("could not extract config setting from ledger entry; actual type is LedgerEntryTypeOffer"), }, @@ -44,15 +62,7 @@ func TestTransformConfigSetting(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformConfigSetting(test.input, header) + actualOutput, actualError := TransformConfigSetting(test.input) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -77,6 +87,37 @@ func makeConfigSettingTestInput() []ingest.Change { Type: xdr.LedgerEntryTypeConfigSetting, Pre: &xdr.LedgerEntry{}, Post: &contractDataLedgerEntry, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, }, } } @@ -135,6 +176,9 @@ func makeConfigSettingTestOutput() []ConfigSettingOutput { Deleted: false, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, } } diff --git a/internal/transform/contract_code.go b/internal/transform/contract_code.go index b026d11d..dda7890d 100644 --- a/internal/transform/contract_code.go +++ b/internal/transform/contract_code.go @@ -9,7 +9,7 @@ import ( ) // TransformContractCode converts a contract code ledger change entry into a form suitable for BigQuery -func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (ContractCodeOutput, error) { +func TransformContractCode(ledgerChange ingest.Change) (ContractCodeOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractCodeOutput{}, err @@ -31,12 +31,7 @@ func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHi contractCodeHash := contractCode.Hash.HexString() - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return ContractCodeOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) var outputNInstructions uint32 var outputNFunctions uint32 @@ -69,8 +64,8 @@ func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHi LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, LedgerKeyHash: ledgerKeyHash, NInstructions: outputNInstructions, NFunctions: outputNFunctions, @@ -82,6 +77,9 @@ func TransformContractCode(ledgerChange ingest.Change, header xdr.LedgerHeaderHi NImports: outputNImports, NExports: outputNExports, NDataSegmentBytes: outputNDataSegmentBytes, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedCode, nil } diff --git a/internal/transform/contract_code_test.go b/internal/transform/contract_code_test.go index c15467a6..f3d42acd 100644 --- a/internal/transform/contract_code_test.go +++ b/internal/transform/contract_code_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/ingest" @@ -30,6 +31,23 @@ func TestTransformContractCode(t *testing.T) { Type: xdr.LedgerEntryTypeOffer, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, ContractCodeOutput{}, fmt.Errorf("could not extract contract code from ledger entry; actual type is LedgerEntryTypeOffer"), }, @@ -44,15 +62,7 @@ func TestTransformContractCode(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformContractCode(test.input, header) + actualOutput, actualError := TransformContractCode(test.input) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -93,6 +103,37 @@ func makeContractCodeTestInput() []ingest.Change { Type: xdr.LedgerEntryTypeContractCode, Pre: &xdr.LedgerEntry{}, Post: &contractCodeLedgerEntry, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, }, } } @@ -118,6 +159,9 @@ func makeContractCodeTestOutput() []ContractCodeOutput { NImports: 8, NExports: 9, NDataSegmentBytes: 10, + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, } } diff --git a/internal/transform/contract_data.go b/internal/transform/contract_data.go index a1b8de1e..544abbe3 100644 --- a/internal/transform/contract_data.go +++ b/internal/transform/contract_data.go @@ -46,7 +46,7 @@ func NewTransformContractDataStruct(assetFrom AssetFromContractDataFunc, contrac } // TransformContractData converts a contract data ledger change entry into a form suitable for BigQuery -func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string, header xdr.LedgerHeaderHistoryEntry) (ContractDataOutput, error, bool) { +func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest.Change, passphrase string) (ContractDataOutput, error, bool) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return ContractDataOutput{}, err, false @@ -97,12 +97,7 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. contractDataDurability := contractData.Durability.String() - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return ContractDataOutput{}, err, false - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) outputKey, outputKeyDecoded := serializeScVal(contractData.Key) outputVal, outputValDecoded := serializeScVal(contractData.Val) @@ -124,14 +119,17 @@ func (t *TransformContractDataStruct) TransformContractData(ledgerChange ingest. LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, LedgerKeyHash: ledgerKeyHash, Key: outputKey, KeyDecoded: outputKeyDecoded, Val: outputVal, ValDecoded: outputValDecoded, ContractDataXDR: outputContractDataXDR, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedData, nil, true } diff --git a/internal/transform/contract_data_test.go b/internal/transform/contract_data_test.go index 49bfe57f..c420396e 100644 --- a/internal/transform/contract_data_test.go +++ b/internal/transform/contract_data_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/ingest" @@ -32,6 +33,23 @@ func TestTransformContractData(t *testing.T) { Type: xdr.LedgerEntryTypeOffer, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, "unit test", ContractDataOutput{}, fmt.Errorf("could not extract contract data from ledger entry; actual type is LedgerEntryTypeOffer"), @@ -48,16 +66,8 @@ func TestTransformContractData(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } TransformContractData := NewTransformContractDataStruct(MockAssetFromContractData, MockContractBalanceFromContractData) - actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase, header) + actualOutput, actualError, _ := TransformContractData.TransformContractData(test.input, test.passphrase) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -123,6 +133,37 @@ func makeContractDataTestInput() []ingest.Change { Type: xdr.LedgerEntryTypeContractData, Pre: &xdr.LedgerEntry{}, Post: &contractDataLedgerEntry, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, }, } } @@ -169,6 +210,9 @@ func makeContractDataTestOutput() []ContractDataOutput { Val: val, ValDecoded: valDecoded, ContractDataXDR: "AAAAAAAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABMAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAAAAQAAAA4AAAABYQAAAAAAAA4AAAABYQAAAAAAAAEAAAAAAAAAAQ==", + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, } } diff --git a/internal/transform/liquidity_pool.go b/internal/transform/liquidity_pool.go index eacc6be3..be836b9b 100644 --- a/internal/transform/liquidity_pool.go +++ b/internal/transform/liquidity_pool.go @@ -9,7 +9,7 @@ import ( ) // TransformPool converts an liquidity pool ledger change entry into a form suitable for BigQuery -func TransformPool(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (PoolOutput, error) { +func TransformPool(ledgerChange ingest.Change) (PoolOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return PoolOutput{}, err @@ -49,12 +49,7 @@ func TransformPool(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEnt } assetBID := FarmHashAsset(assetBCode, assetBIssuer, assetBType) - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return PoolOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) transformedPool := PoolOutput{ PoolID: PoolIDToString(lp.LiquidityPoolId), @@ -75,8 +70,11 @@ func TransformPool(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEnt LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedPool, nil } diff --git a/internal/transform/liquidity_pool_test.go b/internal/transform/liquidity_pool_test.go index af97587a..1cb22e66 100644 --- a/internal/transform/liquidity_pool_test.go +++ b/internal/transform/liquidity_pool_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/ingest" @@ -34,6 +35,23 @@ func TestTransformPool(t *testing.T) { Type: xdr.LedgerEntryTypeOffer, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, }, PoolOutput{}, nil, @@ -47,15 +65,7 @@ func TestTransformPool(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformPool(test.input.ingest, header) + actualOutput, actualError := TransformPool(test.input.ingest) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -89,6 +99,37 @@ func makePoolTestInput() ingest.Change { Type: xdr.LedgerEntryTypeLiquidityPool, Pre: &ledgerEntry, Post: nil, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, } } @@ -114,5 +155,8 @@ func makePoolTestOutput() PoolOutput { Deleted: true, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), } } diff --git a/internal/transform/offer.go b/internal/transform/offer.go index b3008d30..223fbab8 100644 --- a/internal/transform/offer.go +++ b/internal/transform/offer.go @@ -6,11 +6,10 @@ import ( "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" - "github.com/stellar/go/xdr" ) // TransformOffer converts an account from the history archive ingestion system into a form suitable for BigQuery -func TransformOffer(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (OfferOutput, error) { +func TransformOffer(ledgerChange ingest.Change) (OfferOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return OfferOutput{}, err @@ -69,12 +68,7 @@ func TransformOffer(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEn outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq) - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return OfferOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) transformedOffer := OfferOutput{ SellerID: outputSellerID, @@ -96,8 +90,11 @@ func TransformOffer(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEn LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedOffer, nil } diff --git a/internal/transform/offer_normalized.go b/internal/transform/offer_normalized.go index 0276509e..7a9e0c86 100644 --- a/internal/transform/offer_normalized.go +++ b/internal/transform/offer_normalized.go @@ -9,14 +9,12 @@ import ( "github.com/stellar/stellar-etl/internal/utils" "github.com/stellar/go/ingest" - "github.com/stellar/go/xdr" ) // TransformOfferNormalized converts an offer into a normalized form, allowing it to be stored as part of the historical orderbook dataset func TransformOfferNormalized(ledgerChange ingest.Change, ledgerSeq uint32) (NormalizedOfferOutput, error) { - var header xdr.LedgerHeaderHistoryEntry - transformed, err := TransformOffer(ledgerChange, header) + transformed, err := TransformOffer(ledgerChange) if err != nil { return NormalizedOfferOutput{}, err } diff --git a/internal/transform/offer_normalized_test.go b/internal/transform/offer_normalized_test.go index 47df28f8..11c975fe 100644 --- a/internal/transform/offer_normalized_test.go +++ b/internal/transform/offer_normalized_test.go @@ -43,6 +43,23 @@ func TestTransformOfferNormalized(t *testing.T) { }, }, Post: nil, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, 100}, wantOutput: NormalizedOfferOutput{}, wantErr: fmt.Errorf("offer 0 is deleted"), @@ -83,6 +100,23 @@ func makeOfferNormalizedTestInput() (ledgerChange ingest.Change, err error) { }, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, } return } diff --git a/internal/transform/offer_test.go b/internal/transform/offer_test.go index 8279d620..c6e19c2b 100644 --- a/internal/transform/offer_test.go +++ b/internal/transform/offer_test.go @@ -35,6 +35,23 @@ func TestTransformOffer(t *testing.T) { Type: xdr.LedgerEntryTypeAccount, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, }, OfferOutput{}, fmt.Errorf("could not extract offer data from ledger entry; actual type is LedgerEntryTypeAccount"), @@ -97,15 +114,7 @@ func TestTransformOffer(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformOffer(test.input.ingest, header) + actualOutput, actualError := TransformOffer(test.input.ingest) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -122,6 +131,23 @@ func wrapOfferEntry(offerEntry xdr.OfferEntry, lastModified int) ingest.Change { Offer: &offerEntry, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, } } @@ -153,6 +179,37 @@ func makeOfferTestInput() (ledgerChange ingest.Change, err error) { }, }, Post: nil, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, } return } @@ -180,5 +237,8 @@ func makeOfferTestOutput() OfferOutput { Sponsor: null.StringFrom(testAccount3Address), LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), } } diff --git a/internal/transform/schema.go b/internal/transform/schema.go index 72a84929..2f88f837 100644 --- a/internal/transform/schema.go +++ b/internal/transform/schema.go @@ -113,6 +113,9 @@ type AccountOutput struct { Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // AccountSignerOutput is a representation of an account signer that aligns with the BigQuery table account_signers @@ -126,8 +129,9 @@ type AccountSignerOutput struct { Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` - TransactionID int64 `json:"transaction_id"` - OperationID int64 `json:"id"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // OperationOutput is a representation of an operation that aligns with the BigQuery table history_operations @@ -162,6 +166,9 @@ type ClaimableBalanceOutput struct { Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // Claimants @@ -217,6 +224,9 @@ type PoolOutput struct { Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // AssetOutput is a representation of an asset that aligns with the BigQuery table history_assets @@ -249,6 +259,9 @@ type TrustlineOutput struct { Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // OfferOutput is a representation of an offer that aligns with the BigQuery table offers @@ -274,6 +287,9 @@ type OfferOutput struct { Sponsor null.String `json:"sponsor"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // TradeOutput is a representation of a trade that aligns with the BigQuery table history_trades @@ -526,6 +542,9 @@ type ContractDataOutput struct { Val map[string]string `json:"val"` ValDecoded map[string]string `json:"val_decoded"` ContractDataXDR string `json:"contract_data_xdr"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // ContractCodeOutput is a representation of contract code that aligns with the Bigquery table soroban_contract_code @@ -539,16 +558,19 @@ type ContractCodeOutput struct { LedgerSequence uint32 `json:"ledger_sequence"` LedgerKeyHash string `json:"ledger_key_hash"` //ContractCodeCode string `json:"contract_code"` - NInstructions uint32 `json:"n_instructions"` - NFunctions uint32 `json:"n_functions"` - NGlobals uint32 `json:"n_globals"` - NTableEntries uint32 `json:"n_table_entries"` - NTypes uint32 `json:"n_types"` - NDataSegments uint32 `json:"n_data_segments"` - NElemSegments uint32 `json:"n_elem_segments"` - NImports uint32 `json:"n_imports"` - NExports uint32 `json:"n_exports"` - NDataSegmentBytes uint32 `json:"n_data_segment_bytes"` + NInstructions uint32 `json:"n_instructions"` + NFunctions uint32 `json:"n_functions"` + NGlobals uint32 `json:"n_globals"` + NTableEntries uint32 `json:"n_table_entries"` + NTypes uint32 `json:"n_types"` + NDataSegments uint32 `json:"n_data_segments"` + NElemSegments uint32 `json:"n_elem_segments"` + NImports uint32 `json:"n_imports"` + NExports uint32 `json:"n_exports"` + NDataSegmentBytes uint32 `json:"n_data_segment_bytes"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // ConfigSettingOutput is a representation of soroban config settings that aligns with the Bigquery table config_settings @@ -601,6 +623,9 @@ type ConfigSettingOutput struct { Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // TtlOutput is a representation of soroban ttl that aligns with the Bigquery table ttls @@ -612,6 +637,9 @@ type TtlOutput struct { Deleted bool `json:"deleted"` ClosedAt time.Time `json:"closed_at"` LedgerSequence uint32 `json:"ledger_sequence"` + TransactionID null.Int `json:"transaction_id"` + OperationID null.Int `json:"operation_id"` + OperationType null.Int `json:"operation_type"` } // ContractEventOutput is a representation of soroban contract events and diagnostic events diff --git a/internal/transform/trustline.go b/internal/transform/trustline.go index 40f06e6d..804828c7 100644 --- a/internal/transform/trustline.go +++ b/internal/transform/trustline.go @@ -14,7 +14,7 @@ import ( ) // TransformTrustline converts a trustline from the history archive ingestion system into a form suitable for BigQuery -func TransformTrustline(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (TrustlineOutput, error) { +func TransformTrustline(ledgerChange ingest.Change) (TrustlineOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return TrustlineOutput{}, err @@ -52,12 +52,7 @@ func TransformTrustline(ledgerChange ingest.Change, header xdr.LedgerHeaderHisto liabilities := trustEntry.Liabilities() - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return TrustlineOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) transformedTrustline := TrustlineOutput{ LedgerKey: outputLedgerKey, @@ -76,8 +71,11 @@ func TransformTrustline(ledgerChange ingest.Change, header xdr.LedgerHeaderHisto LedgerEntryChange: uint32(changeType), Sponsor: ledgerEntrySponsorToNullString(ledgerEntry), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedTrustline, nil diff --git a/internal/transform/trustline_test.go b/internal/transform/trustline_test.go index 7bbef506..c677b018 100644 --- a/internal/transform/trustline_test.go +++ b/internal/transform/trustline_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/ingest" @@ -35,6 +36,23 @@ func TestTransformTrustline(t *testing.T) { Type: xdr.LedgerEntryTypeOffer, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, }, TrustlineOutput{}, fmt.Errorf("could not extract trustline data from ledger entry; actual type is LedgerEntryTypeOffer"), @@ -50,15 +68,7 @@ func TestTransformTrustline(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformTrustline(test.input.ingest, header) + actualOutput, actualError := TransformTrustline(test.input.ingest) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -114,11 +124,73 @@ func makeTrustlineTestInput() []ingest.Change { Type: xdr.LedgerEntryTypeTrustline, Pre: &xdr.LedgerEntry{}, Post: &assetLedgerEntry, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, }, { Type: xdr.LedgerEntryTypeTrustline, Pre: &xdr.LedgerEntry{}, Post: &lpLedgerEntry, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, }, } } @@ -142,6 +214,9 @@ func makeTrustlineTestOutput() []TrustlineOutput { Deleted: false, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, { LedgerKey: "AAAAAQAAAAAcR0GXGO76pFs4y38vJVAanjnLg4emNun7zAx0pHcDGAAAAAMBAwQFBwkAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", @@ -159,6 +234,9 @@ func makeTrustlineTestOutput() []TrustlineOutput { Deleted: false, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, } } diff --git a/internal/transform/ttl.go b/internal/transform/ttl.go index c0e6fe9c..ac8258b5 100644 --- a/internal/transform/ttl.go +++ b/internal/transform/ttl.go @@ -9,7 +9,7 @@ import ( ) // TransformTtl converts an ttl ledger change entry into a form suitable for BigQuery -func TransformTtl(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (TtlOutput, error) { +func TransformTtl(ledgerChange ingest.Change) (TtlOutput, error) { ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange) if err != nil { return TtlOutput{}, err @@ -28,12 +28,7 @@ func TransformTtl(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntr keyHash := ttl.KeyHash.HexString() liveUntilLedgerSeq := ttl.LiveUntilLedgerSeq - closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime) - if err != nil { - return TtlOutput{}, err - } - - ledgerSequence := header.Header.LedgerSeq + changeDetails := utils.GetChangesDetails(ledgerChange) transformedPool := TtlOutput{ KeyHash: keyHash, @@ -41,8 +36,11 @@ func TransformTtl(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntr LastModifiedLedger: uint32(ledgerEntry.LastModifiedLedgerSeq), LedgerEntryChange: uint32(changeType), Deleted: outputDeleted, - ClosedAt: closedAt, - LedgerSequence: uint32(ledgerSequence), + ClosedAt: changeDetails.ClosedAt, + LedgerSequence: changeDetails.LedgerSequence, + TransactionID: changeDetails.TransactionID, + OperationID: changeDetails.OperationID, + OperationType: changeDetails.OperationType, } return transformedPool, nil diff --git a/internal/transform/ttl_test.go b/internal/transform/ttl_test.go index 8f14e089..4ea2d65e 100644 --- a/internal/transform/ttl_test.go +++ b/internal/transform/ttl_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/ingest" @@ -30,6 +31,23 @@ func TestTransformTtl(t *testing.T) { Type: xdr.LedgerEntryTypeOffer, }, }, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + }, + OperationIndex: 1, }, TtlOutput{}, fmt.Errorf("could not extract ttl from ledger entry; actual type is LedgerEntryTypeOffer"), }, @@ -44,15 +62,7 @@ func TestTransformTtl(t *testing.T) { } for _, test := range tests { - header := xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - ScpValue: xdr.StellarValue{ - CloseTime: 1000, - }, - LedgerSeq: 10, - }, - } - actualOutput, actualError := TransformTtl(test.input, header) + actualOutput, actualError := TransformTtl(test.input) assert.Equal(t, test.wantErr, actualError) assert.Equal(t, test.wantOutput, actualOutput) } @@ -88,6 +98,37 @@ func makeTtlTestInput() []ingest.Change { Type: xdr.LedgerEntryTypeTtl, Pre: &preTtlLedgerEntry, Post: &TtlLedgerEntry, + Ledger: &xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: 1000, + }, + LedgerSeq: 10, + }, + }, + }, + }, + Transaction: &ingest.LedgerTransaction{ + Index: 1, + Envelope: xdr.TransactionEnvelope{ + Type: 2, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Operations: []xdr.Operation{ + { + Body: xdr.OperationBody{ + Type: 1, + }, + }, + }, + }, + }, + }, + }, + OperationIndex: 0, }, } } @@ -102,6 +143,9 @@ func makeTtlTestOutput() []TtlOutput { Deleted: false, LedgerSequence: 10, ClosedAt: time.Date(1970, time.January, 1, 0, 16, 40, 0, time.UTC), + TransactionID: null.NewInt(42949677056, true), + OperationID: null.NewInt(42949677057, true), + OperationType: null.NewInt(1, true), }, } } diff --git a/internal/utils/main.go b/internal/utils/main.go index 1141b764..81f72e08 100644 --- a/internal/utils/main.go +++ b/internal/utils/main.go @@ -8,11 +8,13 @@ import ( "math/big" "time" + "github.com/guregu/null" "github.com/spf13/pflag" "github.com/stellar/go/hash" "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/ledger" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/keypair" "github.com/stellar/go/network" @@ -20,6 +22,7 @@ import ( "github.com/stellar/go/support/storage" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" + "github.com/stellar/stellar-etl/internal/toid" ) // PanicOnError is a function that panics if the provided error is not nil @@ -1101,3 +1104,45 @@ type HistoryArchiveLedgerAndLCM struct { Ledger historyarchive.Ledger LCM xdr.LedgerCloseMeta } + +type ChangeDetails struct { + ClosedAt time.Time + LedgerSequence uint32 + OperationType null.Int + TransactionID null.Int + OperationID null.Int +} + +func GetChangesDetails(ledgerChange ingest.Change) ChangeDetails { + var outputOperationType null.Int + var outputTransactionID null.Int + var outputOperationID null.Int + var lcm *xdr.LedgerCloseMeta + + if ledgerChange.Ledger == nil { + lcm = &ledgerChange.Transaction.Ledger + } else { + lcm = ledgerChange.Ledger + } + closedAt := ledger.ClosedAt(*lcm) + ledgerSequence := ledger.Sequence(*lcm) + if ledgerChange.Transaction != nil { + outputTransactionID = null.NewInt(toid.New(int32(ledgerSequence), int32(ledgerChange.Transaction.Index), 0).ToInt64(), true) + // outputOperationID may point to the incorrect operation because the change compactor takes the latest OperationIndex if it exists + // It is possible that OperationIndex is 0 and invalid but will be used to populate outputOperationID anyways + // TODO: This should be fixed when the LedgerEntryChange transforms/processors use the actual changes instead of compacted changes + outputOperationID = null.NewInt(toid.New(int32(ledgerSequence), int32(ledgerChange.Transaction.Index), int32(ledgerChange.OperationIndex+1)).ToInt64(), true) + operation, ok := ledgerChange.Transaction.GetOperation(ledgerChange.OperationIndex) + if ok { + outputOperationType = null.NewInt(int64(operation.Body.Type), true) + } + } + + return ChangeDetails{ + ClosedAt: closedAt, + LedgerSequence: ledgerSequence, + OperationType: outputOperationType, + TransactionID: outputTransactionID, + OperationID: outputOperationID, + } +}