Skip to content

Commit

Permalink
changes for all ledger entries
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Jan 23, 2025
1 parent ab38ae1 commit ac1ca80
Show file tree
Hide file tree
Showing 28 changed files with 813 additions and 251 deletions.
36 changes: 18 additions & 18 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ be exported.`,
if !exports["export-accounts"] {
continue
}
for i, change := range changes.Changes {
for _, change := range changes {
if changed, err := change.AccountChangedExceptSigners(); err != nil {
cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err))
continue
} else if changed {

acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i])
acc, err := transform.TransformAccount(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -149,8 +149,8 @@ be exported.`,
if !exports["export-balances"] {
continue
}
for i, change := range changes.Changes {
balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i])
for _, change := range changes {
balance, err := transform.TransformClaimableBalance(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -162,8 +162,8 @@ be exported.`,
if !exports["export-offers"] {
continue
}
for i, change := range changes.Changes {
offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i])
for _, change := range changes {
offer, err := transform.TransformOffer(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -175,8 +175,8 @@ be exported.`,
if !exports["export-trustlines"] {
continue
}
for i, change := range changes.Changes {
trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i])
for _, change := range changes {
trust, err := transform.TransformTrustline(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -188,8 +188,8 @@ be exported.`,
if !exports["export-pools"] {
continue
}
for i, change := range changes.Changes {
pool, err := transform.TransformPool(change, changes.LedgerHeaders[i])
for _, change := range changes {
pool, err := transform.TransformPool(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -201,9 +201,9 @@ be exported.`,
if !exports["export-contract-data"] {
continue
}
for i, change := range changes.Changes {
for _, change := range changes {
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i])
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -221,8 +221,8 @@ be exported.`,
if !exports["export-contract-code"] {
continue
}
for i, change := range changes.Changes {
contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i])
for _, change := range changes {
contractCode, err := transform.TransformContractCode(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -234,8 +234,8 @@ be exported.`,
if !exports["export-config-settings"] {
continue
}
for i, change := range changes.Changes {
configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i])
for _, change := range changes {
configSettings, err := transform.TransformConfigSetting(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -247,8 +247,8 @@ be exported.`,
if !exports["export-ttl"] {
continue
}
for i, change := range changes.Changes {
ttl, err := transform.TransformTtl(change, changes.LedgerHeaders[i])
for _, change := range changes {
ttl, err := transform.TransformTtl(change)
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming ttl entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand Down
60 changes: 44 additions & 16 deletions internal/input/change_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func (c *ChangeCompactor) addCreatedChange(change ingest.Change) error {
return nil
}

operationIndex, transaction, ledger, ledgerUpgrade := c.getChangeDetails(existingChange, change)

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
return ingest.NewStateError(errors.Errorf(
Expand All @@ -128,10 +130,10 @@ func (c *ChangeCompactor) addCreatedChange(change ingest.Change) error {
Pre: existingChange.Pre,
Post: change.Post,
Reason: change.Reason,
OperationIndex: change.OperationIndex,
Transaction: change.Transaction,
Ledger: change.Ledger,
LedgerUpgrade: change.LedgerUpgrade,
OperationIndex: operationIndex,
Transaction: transaction,
Ledger: ledger,
LedgerUpgrade: ledgerUpgrade,
}
default:
return errors.Errorf("Unknown LedgerEntryChangeType: %d", existingChange.LedgerEntryChangeType())
Expand Down Expand Up @@ -161,6 +163,8 @@ func (c *ChangeCompactor) addUpdatedChange(change ingest.Change) error {
return nil
}

operationIndex, transaction, ledger, ledgerUpgrade := c.getChangeDetails(existingChange, change)

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
// If existing type is created it means that this entry does not
Expand All @@ -170,21 +174,21 @@ func (c *ChangeCompactor) addUpdatedChange(change ingest.Change) error {
Pre: existingChange.Pre, // = nil
Post: change.Post,
Reason: change.Reason,
OperationIndex: change.OperationIndex,
Transaction: change.Transaction,
Ledger: change.Ledger,
LedgerUpgrade: change.LedgerUpgrade,
OperationIndex: operationIndex,
Transaction: transaction,
Ledger: ledger,
LedgerUpgrade: ledgerUpgrade,
}
case xdr.LedgerEntryChangeTypeLedgerEntryUpdated:
c.cache[ledgerKeyString] = ingest.Change{
Type: key.Type,
Pre: existingChange.Pre,
Post: change.Post,
Reason: change.Reason,
OperationIndex: change.OperationIndex,
Transaction: change.Transaction,
Ledger: change.Ledger,
LedgerUpgrade: change.LedgerUpgrade,
OperationIndex: operationIndex,
Transaction: transaction,
Ledger: ledger,
LedgerUpgrade: ledgerUpgrade,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return ingest.NewStateError(errors.Errorf(
Expand Down Expand Up @@ -219,6 +223,8 @@ func (c *ChangeCompactor) addRemovedChange(change ingest.Change) error {
return nil
}

operationIndex, transaction, ledger, ledgerUpgrade := c.getChangeDetails(existingChange, change)

switch existingChange.LedgerEntryChangeType() {
case xdr.LedgerEntryChangeTypeLedgerEntryCreated:
// If existing type is created it means that this will be no op.
Expand All @@ -230,10 +236,10 @@ func (c *ChangeCompactor) addRemovedChange(change ingest.Change) error {
Pre: existingChange.Pre,
Post: nil,
Reason: change.Reason,
OperationIndex: change.OperationIndex,
Transaction: change.Transaction,
Ledger: change.Ledger,
LedgerUpgrade: change.LedgerUpgrade,
OperationIndex: operationIndex,
Transaction: transaction,
Ledger: ledger,
LedgerUpgrade: ledgerUpgrade,
}
case xdr.LedgerEntryChangeTypeLedgerEntryRemoved:
return ingest.NewStateError(errors.Errorf(
Expand Down Expand Up @@ -263,3 +269,25 @@ func (c *ChangeCompactor) GetChanges() []ingest.Change {
func (c *ChangeCompactor) Size() int {
return len(c.cache)
}

func (c *ChangeCompactor) getChangeDetails(existingChange, change ingest.Change) (uint32, *ingest.LedgerTransaction, *xdr.LedgerCloseMeta, *xdr.LedgerUpgrade) {
operationIndex := change.OperationIndex
transaction := change.Transaction
ledger := change.Ledger
ledgerUpgrade := change.LedgerUpgrade

if operationIndex == 0 {
operationIndex = existingChange.OperationIndex
}
if transaction == nil {
transaction = existingChange.Transaction
}
if ledger == nil {
ledger = existingChange.Ledger
}
if ledgerUpgrade == nil {
ledgerUpgrade = existingChange.LedgerUpgrade
}

return operationIndex, transaction, ledger, ledgerUpgrade
}
14 changes: 3 additions & 11 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@ var (
ExtractBatch = extractBatch
)

type LedgerChanges struct {
Changes []ingest.Change
LedgerHeaders []xdr.LedgerHeaderHistoryEntry
}

// ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd)
type ChangeBatch struct {
Changes map[xdr.LedgerEntryType]LedgerChanges
Changes map[xdr.LedgerEntryType][]ingest.Change
BatchStart uint32
BatchEnd uint32
}
Expand Down Expand Up @@ -98,7 +93,7 @@ func extractBatch(
xdr.LedgerEntryTypeConfigSetting,
xdr.LedgerEntryTypeTtl}

ledgerChanges := map[xdr.LedgerEntryType]LedgerChanges{}
ledgerChanges := map[xdr.LedgerEntryType][]ingest.Change{}
ctx := context.Background()
for seq := batchStart; seq <= batchEnd; {
changeCompactors := map[xdr.LedgerEntryType]*ChangeCompactor{}
Expand All @@ -108,13 +103,11 @@ func extractBatch(

// if this ledger is available, we process its changes and move on to the next ledger by incrementing seq.
// Otherwise, nothing is incremented, and we try again on the next iteration of the loop
var header xdr.LedgerHeaderHistoryEntry
if seq <= batchEnd {
changeReader, err := ingest.NewLedgerChangeReader(ctx, *backend, env.NetworkPassphrase, seq)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err)
}
header = changeReader.LedgerTransactionReader.GetHeader()

for {
change, err := changeReader.Read()
Expand Down Expand Up @@ -144,8 +137,7 @@ func extractBatch(
for dataType, compactor := range changeCompactors {
for _, change := range compactor.GetChanges() {
dataTypeChanges := ledgerChanges[dataType]
dataTypeChanges.Changes = append(dataTypeChanges.Changes, change)
dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header)
dataTypeChanges = append(dataTypeChanges, change)
ledgerChanges[dataType] = dataTypeChanges
}
}
Expand Down
23 changes: 12 additions & 11 deletions internal/input/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ func TestSendBatchToChannel(t *testing.T) {
}

func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch {
changes := map[xdr.LedgerEntryType]LedgerChanges{
changes := map[xdr.LedgerEntryType][]ingest.Change{
entryType: {
Changes: []ingest.Change{
{Type: entry.Data.Type, Post: &entry},
{
Type: entry.Data.Type,
Post: &entry,
},
LedgerHeaders: []xdr.LedgerHeaderHistoryEntry{},
},
}

return ChangeBatch{
Changes: changes,
}
Expand All @@ -133,7 +134,7 @@ func mockExtractBatch(
env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch {
log.Errorf("mock called")
return ChangeBatch{
Changes: map[xdr.LedgerEntryType]LedgerChanges{},
Changes: map[xdr.LedgerEntryType][]ingest.Change{},
BatchStart: batchStart,
BatchEnd: batchEnd,
}
Expand Down Expand Up @@ -161,7 +162,7 @@ func TestStreamChangesBatchNumbers(t *testing.T) {
args: input{batchStart: 1, batchEnd: 65},
out: output{
batchRanges: []batchRange{
batchRange{
{
batchStart: 1, batchEnd: 65,
},
},
Expand All @@ -171,9 +172,9 @@ func TestStreamChangesBatchNumbers(t *testing.T) {
args: input{batchStart: 1, batchEnd: 66},
out: output{
batchRanges: []batchRange{
batchRange{
{
batchStart: 1, batchEnd: 64,
}, batchRange{
}, {
batchStart: 65, batchEnd: 66,
},
},
Expand All @@ -183,10 +184,10 @@ func TestStreamChangesBatchNumbers(t *testing.T) {
args: input{batchStart: 1, batchEnd: 128},
out: output{
batchRanges: []batchRange{
batchRange{
{
batchStart: 1, batchEnd: 64,
},
batchRange{
{
batchStart: 65, batchEnd: 128,
},
},
Expand All @@ -196,7 +197,7 @@ func TestStreamChangesBatchNumbers(t *testing.T) {
args: input{batchStart: 1, batchEnd: 32},
out: output{
batchRanges: []batchRange{
batchRange{
{
batchStart: 1, batchEnd: 32,
},
},
Expand Down
16 changes: 7 additions & 9 deletions internal/transform/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) {
func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return AccountOutput{}, err
Expand Down Expand Up @@ -76,12 +76,7 @@ func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistory

outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return AccountOutput{}, err
}

ledgerSequence := header.Header.LedgerSeq
changeDetails := utils.GetChangesDetails(ledgerChange)

transformedAccount := AccountOutput{
AccountID: outputID,
Expand All @@ -105,8 +100,11 @@ func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistory
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
ClosedAt: changeDetails.ClosedAt,
LedgerSequence: changeDetails.LedgerSequence,
TransactionID: changeDetails.TransactionID,
OperationID: changeDetails.OperationID,
OperationType: changeDetails.OperationType,
}
return transformedAccount, nil
}
Loading

0 comments on commit ac1ca80

Please # to comment.