-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathupgrader.go
950 lines (837 loc) · 26.7 KB
/
upgrader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
// Copyright © 2021 - 2025 Weald Technology Trading.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgresql
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
)
type schemaMetadata struct {
Version uint64 `json:"version"`
}
var currentVersion = uint64(11)
type upgrade struct {
funcs []func(context.Context, *Service) error
}
var upgrades = map[uint64]*upgrade{
2: {
funcs: []func(context.Context, *Service) error{
addBlockMEV,
addTransactionAccessLists,
},
},
3: {
funcs: []func(context.Context, *Service) error{
addForeignKeys,
},
},
4: {
funcs: []func(context.Context, *Service) error{
fixGasPrice,
},
},
5: {
funcs: []func(context.Context, *Service) error{
addBalancesTable,
},
},
6: {
funcs: []func(context.Context, *Service) error{
renameBlockRewards,
},
},
7: {
funcs: []func(context.Context, *Service) error{
addBlockIndices,
},
},
8: {
funcs: []func(context.Context, *Service) error{
addTransactionStorageForeignKey,
},
},
9: {
funcs: []func(context.Context, *Service) error{
addCancunBlockFields,
addCancunTransactionFields,
},
},
10: {
funcs: []func(context.Context, *Service) error{
totalDifficultyOptional,
},
},
11: {
funcs: []func(context.Context, *Service) error{
addRequestsHash,
addTransactionAuthorizationLists,
},
},
}
// Upgrade upgrades the database.
func (s *Service) Upgrade(ctx context.Context) error {
// See if we have anything at all.
tableExists, err := s.tableExists(ctx, "t_metadata")
if err != nil {
return errors.Wrap(err, "failed to check presence of tables")
}
if !tableExists {
return s.Init(ctx)
}
version, err := s.version(ctx)
if err != nil {
return errors.Wrap(err, "failed to obtain version")
}
log.Trace().Uint64("current_version", version).Uint64("required_version", currentVersion).Msg("Checking if database upgrade is required")
if version == currentVersion {
// Nothing to do.
return nil
}
ctx, cancel, err := s.BeginTx(ctx)
if err != nil {
return errors.Wrap(err, "failed to begin upgrade transaction")
}
for i := version + 1; i <= currentVersion; i++ {
log.Info().Uint64("target_version", i).Msg("Upgrading database")
if upgrade, exists := upgrades[i]; exists {
for i, upgradeFunc := range upgrade.funcs {
log.Info().Int("current", i+1).Int("total", len(upgrade.funcs)).Msg("Running upgrade function")
if err := upgradeFunc(ctx, s); err != nil {
cancel()
return errors.Wrap(err, "failed to upgrade")
}
}
}
}
if err := s.setVersion(ctx, currentVersion); err != nil {
cancel()
return errors.Wrap(err, "failed to set latest schema version")
}
if err := s.CommitTx(ctx); err != nil {
cancel()
return errors.Wrap(err, "failed to commit upgrade transaction")
}
log.Info().Msg("Upgrade complete")
return nil
}
// // columnExists returns true if the given column exists in the given table.
// func (s *Service) columnExists(ctx context.Context, tableName string, columnName string) (bool, error) {
// tx := s.tx(ctx)
// if tx == nil {
// ctx, cancel, err := s.BeginTx(ctx)
// if err != nil {
// return false, errors.Wrap(err, "failed to begin transaction")
// }
// tx = s.tx(ctx)
// defer cancel()
// }
//
// query := fmt.Sprintf(`SELECT true
// FROM pg_attribute
// WHERE attrelid = '%s'::regclass
// AND attname = '%s'
// AND NOT attisdropped`, tableName, columnName)
//
// rows, err := tx.Query(ctx, query)
// if err != nil {
// return false, err
// }
// defer rows.Close()
//
// found := false
// if rows.Next() {
// err = rows.Scan(
// &found,
// )
// if err != nil {
// return false, errors.Wrap(err, "failed to scan row")
// }
// }
// return found, nil
// }
// tableExists returns true if the given table exists.
func (s *Service) tableExists(ctx context.Context, tableName string) (bool, error) {
tx := s.tx(ctx)
if tx == nil {
ctx, cancel, err := s.BeginTx(ctx)
if err != nil {
return false, errors.Wrap(err, "failed to begin transaction")
}
tx = s.tx(ctx)
defer cancel()
}
rows, err := tx.Query(ctx, `SELECT true
FROM information_schema.tables
WHERE table_schema = (SELECT current_schema())
AND table_name = $1`, tableName)
if err != nil {
return false, err
}
defer rows.Close()
found := false
if rows.Next() {
err = rows.Scan(
&found,
)
if err != nil {
return false, errors.Wrap(err, "failed to scan row")
}
}
return found, nil
}
// version obtains the version of the schema.
func (s *Service) version(ctx context.Context) (uint64, error) {
data, err := s.Metadata(ctx, "schema")
if err != nil {
return 0, errors.Wrap(err, "failed to obtain schema metadata")
}
// No data means it's version 0 of the schema.
if len(data) == 0 {
return 0, nil
}
var metadata schemaMetadata
if err := json.Unmarshal(data, &metadata); err != nil {
return 0, errors.Wrap(err, "failed to unmarshal metadata JSON")
}
return metadata.Version, nil
}
// setVersion sets the version of the schema.
func (s *Service) setVersion(ctx context.Context, version uint64) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
metadata := &schemaMetadata{
Version: version,
}
data, err := json.Marshal(metadata)
if err != nil {
return errors.Wrap(err, "failed to marshal metadata")
}
return s.SetMetadata(ctx, "schema", data)
}
// Init initialises the database.
func (s *Service) Init(ctx context.Context) error {
ctx, cancel, err := s.BeginTx(ctx)
if err != nil {
return errors.Wrap(err, "failed to begin initial tables transaction")
}
tx := s.tx(ctx)
if tx == nil {
cancel()
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
-- t_metadata stores data about execd processing functions.
CREATE TABLE t_metadata (
f_key TEXT NOT NULL PRIMARY KEY
,f_value JSONB NOT NULL
);
CREATE UNIQUE INDEX i_metadata_1 ON t_metadata(f_key);
-- t_blocks contains execution layer blocks.
CREATE TABLE t_blocks (
f_height INTEGER NOT NULL
,f_hash BYTEA NOT NULL
,f_base_fee BIGINT
,f_blob_gas_used BIGINT
,f_difficulty BIGINT NOT NULL
,f_excess_blob_gas INTEGER
,f_extra_data BYTEA NOT NULL
,f_gas_limit INTEGER NOT NULL
,f_gas_used INTEGER NOT NULL
,f_fee_recipient BYTEA NOT NULL
,f_parent_beacon_block_root BYTEA
,f_parent_hash BYTEA NOT NULL -- cannot enforce contraint here as parent of block 0 is 0
,f_size INTEGER NOT NULL
,f_state_root BYTEA NOT NULL
,f_withdrawals_root BYTEA
,f_timestamp TIMESTAMPTZ NOT NULL
,f_total_difficulty NUMERIC
,f_issuance NUMERIC
,f_requests_hash BYTEA
);
CREATE UNIQUE INDEX i_blocks_1 ON t_blocks(f_height,f_hash);
CREATE UNIQUE INDEX i_blocks_2 ON t_blocks(f_hash);
CREATE INDEX i_blocks_3 ON t_blocks(f_fee_recipient);
CREATE UNIQUE INDEX i_blocks_4 ON t_blocks(f_timestamp);
-- t_transasctions contains execution layer transactions.
CREATE TABLE t_transactions (
f_block_height INTEGER NOT NULL
,f_block_hash BYTEA NOT NULL REFERENCES t_blocks(f_hash) ON DELETE CASCADE
,f_contract_address BYTEA
,f_index INTEGER NOT NULL
,f_type BIGINT NOT NULL
,f_from BYTEA NOT NULL
,f_gas_limit INTEGER NOT NULL
,f_gas_price BIGINT NOT NULL
,f_gas_used INTEGER NOT NULL
,f_hash BYTEA NOT NULL
,f_input BYTEA
,f_max_fee_per_gas BIGINT
,f_max_priority_fee_per_gas BIGINT
,f_nonce BIGINT NOT NULL
,f_r BYTEA NOT NULL
,f_s BYTEA NOT NULL
,f_status INTEGER NOT NULL
,f_to BYTEA
,f_v BYTEA NOT NULL
,f_value NUMERIC NOT NULL
,f_y_parity BOOLEAN
,f_max_fee_per_blob_gas BIGINT
,f_blob_versioned_hashes BYTEA[]
,f_blob_gas_price BIGINT
,f_blob_gas_used BIGINT
);
CREATE UNIQUE INDEX i_transactions_1 ON t_transactions(f_block_hash,f_index);
CREATE INDEX i_transactions_2 ON t_transactions(f_from,f_block_height);
CREATE INDEX i_transactions_3 ON t_transactions(f_to,f_block_height);
CREATE INDEX i_transactions_4 ON t_transactions(f_block_height);
CREATE UNIQUE INDEX i_transactions_5 ON t_transactions(f_hash);
CREATE TABLE t_transaction_access_lists (
f_transaction_hash BYTEA NOT NULL REFERENCES t_transactions(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_address BYTEA NOT NULL
,f_storage_keys BYTEA[] NOT NULL
);
CREATE UNIQUE INDEX i_transaction_access_lists_1 ON t_transaction_access_lists(f_transaction_hash,f_block_height,f_address);
CREATE INDEX i_transaction_access_lists_2 ON t_transaction_access_lists(f_address);
CREATE INDEX i_transaction_access_lists_3 ON t_transaction_access_lists(f_block_height);
CREATE TABLE t_transaction_authorization_lists (
f_transaction_hash BYTEA NOT NULL REFERENCES t_transactions(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_index INTEGER NOT NULL
,f_chain_id BYTEA NOT NULL
,f_address BYTEA NOT NULL
,f_nonce BIGINT NOT NULL
,f_r BYTEA NOT NULL
,f_s BYTEA NOT NULL
,f_y_parity BOOLEAN NOT NULL
);
CREATE UNIQUE INDEX i_transaction_authorization_lists_1 ON t_transaction_authorization_lists(f_transaction_hash,f_block_height,f_index);
CREATE INDEX i_transaction_authorization_lists_2 ON t_transaction_authorization_lists(f_address);
CREATE INDEX i_transaction_authorization_lists_3 ON t_transaction_authorization_lists(f_block_height);
-- t_transaction_balance_changes contains balance changes as a result of a transaction.
CREATE TABLE t_transaction_balance_changes (
f_transaction_hash BYTEA NOT NULL REFERENCES t_transactions(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_address BYTEA NOT NULL
,f_old NUMERIC NOT NULL
,f_new NUMERIC NOT NULL
);
CREATE UNIQUE INDEX i_transaction_balance_changes_1 ON t_transaction_balance_changes(f_transaction_hash,f_block_height,f_address);
CREATE INDEX i_transaction_balance_changes_2 ON t_transaction_balance_changes(f_address);
CREATE INDEX i_transaction_balance_changes_3 ON t_transaction_balance_changes(f_block_height);
-- t_transaction_storage_changes contains storage changes as a result of a transaction.
CREATE TABLE t_transaction_storage_changes (
f_transaction_hash BYTEA NOT NULL REFERENCES t_transactions(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_address BYTEA NOT NULL
,f_storage_address BYTEA NOT NULL
,f_value BYTEA NOT NULL
);
CREATE UNIQUE INDEX i_transaction_storage_changes_1 ON t_transaction_storage_changes(f_transaction_hash,f_address,f_storage_address);
CREATE INDEX i_transaction_storage_changes_2 ON t_transaction_storage_changes(f_address);
CREATE INDEX i_transaction_storage_changes_3 ON t_transaction_storage_changes(f_block_height);
-- t_events contains execution layer events.
CREATE TABLE t_events (
f_transaction_hash BYTEA NOT NULL REFERENCES t_transactions(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_index INTEGER NOT NULL
,f_address BYTEA NOT NULL
,f_topics BYTEA[] NOT NULL
,f_data BYTEA
);
CREATE UNIQUE INDEX i_events_1 ON t_events(f_transaction_hash,f_block_height,f_index);
CREATE INDEX i_events_2 ON t_events(f_address);
CREATE INDEX i_events_3 ON t_events(f_block_height);
-- t_block_rewards contains block rewards.
CREATE TABLE t_block_rewards (
f_block_hash BYTEA NOT NULL REFERENCES t_blocks(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_fees NUMERIC NOT NULL
,f_payments NUMERIC NOT NULL
);
CREATE UNIQUE INDEX i_block_rewards_1 ON t_block_rewards(f_block_hash);
CREATE INDEX i_block_rewards_2 ON t_block_rewards(f_block_height);
-- t_balances contains balances on addresses.
CREATE TABLE t_balances (
f_address BYTEA NOT NULL
,f_currency TEXT NOT NULL
,f_from TIMESTAMPTZ NOT NULL
,f_amount NUMERIC NOT NULL
);
CREATE UNIQUE INDEX i_balances_1 ON t_balances(f_address,f_currency,f_from);
`); err != nil {
cancel()
return errors.Wrap(err, "failed to create initial tables")
}
if err := s.setVersion(ctx, currentVersion); err != nil {
cancel()
return errors.Wrap(err, "failed to set latest schema version")
}
if err := s.CommitTx(ctx); err != nil {
cancel()
return errors.Wrap(err, "failed to commit initial tables transaction")
}
return nil
}
// addBlockMEV creates the t_block_mevs table.
func addBlockMEV(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
tableExists, err := s.tableExists(ctx, "t_block_mevs")
if err != nil {
return errors.Wrap(err, "failed to check presence of t_block_mevs")
}
if tableExists {
return nil
}
if _, err := tx.Exec(ctx, `
CREATE TABLE t_block_mevs (
f_block_hash BYTEA NOT NULL REFERENCES t_blocks(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_fees NUMERIC NOT NULL
,f_payments NUMERIC NOT NULL
)`); err != nil {
return errors.Wrap(err, "failed to create t_block_mevs")
}
if _, err := tx.Exec(ctx, `
CREATE UNIQUE INDEX i_block_mevs_1 ON t_block_mevs(f_block_hash)
`); err != nil {
return errors.Wrap(err, "failed to create i_block_mevs_1")
}
if _, err := tx.Exec(ctx, `
CREATE INDEX i_block_mevs_2 ON t_block_mevs(f_block_height);
`); err != nil {
return errors.Wrap(err, "failed to create i_block_mevs_2")
}
return nil
}
// addTransactionAccessLists creates the t_transaction_access_lists table.
func addTransactionAccessLists(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
tableExists, err := s.tableExists(ctx, "t_transaction_access_lists")
if err != nil {
return errors.Wrap(err, "failed to check presence of t_transaction_access_lists")
}
if tableExists {
return nil
}
if _, err := tx.Exec(ctx, `
CREATE TABLE t_transaction_access_lists (
f_transaction_hash BYTEA NOT NULL
,f_block_height INTEGER NOT NULL
,f_address BYTEA NOT NULL
,f_storage_keys BYTEA[] NOT NULL
)`); err != nil {
return errors.Wrap(err, "failed to create t_transaction_access_lists")
}
if _, err := tx.Exec(ctx, `
CREATE UNIQUE INDEX i_transaction_access_lists_1 ON t_transaction_access_lists(f_transaction_hash,f_block_height,f_address);
`); err != nil {
return errors.Wrap(err, "failed to create i_transaction_access_lists_1")
}
if _, err := tx.Exec(ctx, `
CREATE INDEX i_transaction_access_lists_2 ON t_transaction_access_lists(f_address);
`); err != nil {
return errors.Wrap(err, "failed to create i_transaction_access_lists_2")
}
if _, err := tx.Exec(ctx, `
CREATE INDEX i_transaction_access_lists_3 ON t_transaction_access_lists(f_block_height);
`); err != nil {
return errors.Wrap(err, "failed to create i_transaction_access_lists_3")
}
return nil
}
// addForeignKeys adds foreign keys to relevant tables.
func addForeignKeys(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
// See if we need to fix the transactions index.
var goodIndices uint64
err := tx.QueryRow(ctx, `
SELECT COUNT(*)
FROM pg_indexes
WHERE indexname = 'i_transactions_5'
AND indexdef LIKE '%UNIQUE%'
`).Scan(
&goodIndices,
)
if err != nil {
return errors.Wrap(err, "failed to obtain transactions index information")
}
log.Trace().Uint64("good_indices", goodIndices).Msg("Found number of good indices")
if goodIndices == 0 {
// Need to fix the index.
_, err := tx.Exec(ctx, `
DROP INDEX IF EXISTS i_transactions_5`)
if err != nil {
return errors.Wrap(err, "failed to drop i_transactions_5")
}
_, err = tx.Exec(ctx, `
CREATE UNIQUE INDEX i_transactions_5 ON t_transactions(f_hash)`)
if err != nil {
return errors.Wrap(err, "failed to create i_transactions_5")
}
}
_, err = tx.Exec(ctx, `
ALTER TABLE t_events
ADD CONSTRAINT t_events_f_transaction_hash_fkey
FOREIGN KEY (f_transaction_hash)
REFERENCES t_transactions(f_hash)
ON DELETE CASCADE`)
if err != nil {
return errors.Wrap(err, "failed to set foreign key for t_events")
}
_, err = tx.Exec(ctx, `
ALTER TABLE t_transaction_access_lists
ADD CONSTRAINT t_transaction_access_lists_f_transaction_hash_fkey
FOREIGN KEY (f_transaction_hash)
REFERENCES t_transactions(f_hash)
ON DELETE CASCADE`)
if err != nil {
return errors.Wrap(err, "failed to set foreign key for t_transaction_access_lists")
}
_, err = tx.Exec(ctx, `
ALTER TABLE t_transaction_balance_changes
ADD CONSTRAINT t_transaction_balance_changes_f_transaction_hash_fkey
FOREIGN KEY (f_transaction_hash)
REFERENCES t_transactions(f_hash)
ON DELETE CASCADE`)
if err != nil {
return errors.Wrap(err, "failed to set foreign key for t_transaction_balance_changes")
}
_, err = tx.Exec(ctx, `
ALTER TABLE t_transaction_storage_changes
ADD CONSTRAINT t_transaction_storage_changes_f_transaction_hash_fkey
FOREIGN KEY (f_transaction_hash)
REFERENCES t_transactions(f_hash)
ON DELETE CASCADE`)
if err != nil {
return errors.Wrap(err, "failed to set foreign key for t_transaction_storage_changes")
}
return nil
}
// fixGasPrice fixes the f_gas_price column in the t_transactions table.
func fixGasPrice(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
// Fetch the minimum height.
minHeight := uint64(0)
err := tx.QueryRow(ctx, `
SELECT COALESCE(MIN(f_block_height),999999999)
FROM t_transactions
WHERE f_type = 2
AND f_gas_price IS NULL`,
).Scan(
&minHeight,
)
if err != nil {
return errors.Wrap(err, "failed to obtain minimum block height")
}
// Fetch the maximum height.
maxHeight := uint64(0)
err = tx.QueryRow(ctx, `
SELECT COALESCE(MAX(f_height),0)
FROM t_blocks`,
).Scan(
&maxHeight,
)
if err != nil {
return errors.Wrap(err, "failed to obtain maximum block height")
}
for height := minHeight; height <= maxHeight; height++ {
log.Trace().Uint64("block_height", height).Msg("Fixing gas price for transactions in block")
// Obtain the base fee for the block.
baseFee := uint64(0)
if err := tx.QueryRow(ctx, `
SELECT f_base_fee
FROM t_blocks
WHERE f_height = $1`,
height,
).Scan(
&baseFee,
); err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to obtain base fee for block %d", height))
}
// Update the gas price for the type 2 transactions in the block.
_, err := tx.Exec(ctx, `
UPDATE t_transactions
SET f_gas_price = LEAST(f_max_priority_fee_per_gas, f_max_fee_per_gas - $1) + $1
WHERE f_block_height = $2
AND f_type = 2
AND f_gas_price IS NULL`,
baseFee,
height,
)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("failed to obtain base fee for block %d", height))
}
}
// Add the 'not null' constraint to the gas price column.
_, err = tx.Exec(ctx, `
ALTER TABLE t_transactions
ALTER COLUMN f_gas_price
SET NOT NULL`)
if err != nil {
return errors.Wrap(err, "failed to set f_gas_price to not null")
}
return nil
}
// addBalancesTable adds the t_balances table.
func addBalancesTable(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
tableExists, err := s.tableExists(ctx, "t_balances")
if err != nil {
return errors.Wrap(err, "failed to check presence of t_balances")
}
if tableExists {
return nil
}
if _, err := tx.Exec(ctx, `
CREATE TABLE t_balances (
f_address BYTEA NOT NULL
,f_currency TEXT NOT NULL
,f_from TIMESTAMPTZ NOT NULL
,f_amount NUMERIC NOT NULL
)`); err != nil {
return errors.Wrap(err, "failed to create t_balances")
}
if _, err := tx.Exec(ctx, `
CREATE UNIQUE INDEX i_balances_1 ON t_balances(f_address,f_currency,f_from);
`); err != nil {
return errors.Wrap(err, "failed to create i_balances_1")
}
return nil
}
// renameBlockRewards renames the t_block_rewards table.
func renameBlockRewards(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
tableExists, err := s.tableExists(ctx, "t_block_mevs")
if err != nil {
return errors.Wrap(err, "failed to check presence of t_block_mevs")
}
if !tableExists {
return nil
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_block_mevs
RENAME TO t_block_rewards
`); err != nil {
return errors.Wrap(err, "failed to rename t_block_mevs")
}
if _, err := tx.Exec(ctx, `
ALTER INDEX i_block_mevs_1
RENAME TO i_block_rewards_1
`); err != nil {
return errors.Wrap(err, "failed to rename i_block_mevs_1")
}
if _, err := tx.Exec(ctx, `
ALTER INDEX i_block_mevs_2
RENAME TO i_block_rewards_2
`); err != nil {
return errors.Wrap(err, "failed to rename i_block_mevs_2")
}
if _, err := tx.Exec(ctx, `
UPDATE t_metadata
SET f_key = 'rewards'
WHERE f_key = 'mev'
`); err != nil {
return errors.Wrap(err, "failed to rename metadata key")
}
return nil
}
// addBlockIndices adds indices to the t_blocks table.
func addBlockIndices(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
CREATE INDEX IF NOT EXISTS i_blocks_3 ON t_blocks(f_fee_recipient);
`); err != nil {
return errors.Wrap(err, "failed to create i_blocks_3")
}
if _, err := tx.Exec(ctx, `
CREATE UNIQUE INDEX IF NOT EXISTS i_blocks_4 ON t_blocks(f_timestamp);
`); err != nil {
return errors.Wrap(err, "failed to create i_blocks_4")
}
return nil
}
// addTransactionStorageForeignKey adds a foreign key to the t_transaction_storage_changes table.
func addTransactionStorageForeignKey(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_transaction_storage_changes ADD FOREIGN KEY(f_transaction_hash) REFERENCES t_transactions(f_hash) ON DELETE CASCADE;
`); err != nil {
return errors.Wrap(err, "failed to add foreign key to t_transaction_storage_changes")
}
return nil
}
// addCancunBlockFields adds fields to the t_blocks table.
func addCancunBlockFields(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_blocks
ADD COLUMN f_blob_gas_used BIGINT
`); err != nil {
return errors.Wrap(err, "failed to add f_blob_gas_used to t_blocks")
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_blocks
ADD COLUMN f_excess_blob_gas INTEGER
`); err != nil {
return errors.Wrap(err, "failed to add f_excess_blob_gas to t_blocks")
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_blocks
ADD COLUMN f_withdrawals_root BYTEA
`); err != nil {
return errors.Wrap(err, "failed to add f_withdrawals_root to t_blocks")
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_blocks
ADD COLUMN f_parent_beacon_block_root BYTEA
`); err != nil {
return errors.Wrap(err, "failed to add f_parent_beacon_block_root to t_blocks")
}
return nil
}
// addCancunTransactionFields adds fields to the t_transactions table.
func addCancunTransactionFields(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_transactions
ADD COLUMN f_max_fee_per_blob_gas BIGINT
`); err != nil {
return errors.Wrap(err, "failed to add f_max_fee_per_blob_gas to t_transactions")
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_transactions
ADD COLUMN f_y_parity BOOLEAN
`); err != nil {
return errors.Wrap(err, "failed to add f_y_parity to t_transactions")
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_transactions
ADD COLUMN f_blob_versioned_hashes BYTEA[]
`); err != nil {
return errors.Wrap(err, "failed to add f_blob_versioned_hashes to t_transactions")
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_transactions
ADD COLUMN f_blob_gas_price BIGINT
`); err != nil {
return errors.Wrap(err, "failed to add f_blob_gas_price to t_transactions")
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_transactions
ADD COLUMN f_blob_gas_used BIGINT
`); err != nil {
return errors.Wrap(err, "failed to add f_blob_gas_used to t_transactions")
}
return nil
}
// totalDifficultyOptional sets the f_total_difficulty field to be nullable in t_blocks.
func totalDifficultyOptional(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_blocks
ALTER COLUMN f_total_difficulty
DROP NOT NULL
`); err != nil {
return errors.Wrap(err, "failed to make f_total_difficulty nullable in t_blocks")
}
return nil
}
// addRequestsHash adds the f_requests_hash field to t_blocks.
func addRequestsHash(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
ALTER TABLE t_blocks
ADD COLUMN f_requests_hash BYTEA
`); err != nil {
return errors.Wrap(err, "failed to add f_requests_hash to t_blocks")
}
return nil
}
// addTransactionAuthorizationLists creates the t_transaction_authorization_lists table.
func addTransactionAuthorizationLists(ctx context.Context, s *Service) error {
tx := s.tx(ctx)
if tx == nil {
return ErrNoTransaction
}
if _, err := tx.Exec(ctx, `
CREATE TABLE t_transaction_authorization_lists (
f_transaction_hash BYTEA NOT NULL REFERENCES t_transactions(f_hash) ON DELETE CASCADE
,f_block_height INTEGER NOT NULL
,f_index INTEGER NOT NULL
,f_chain_id BYTEA NOT NULL
,f_address BYTEA NOT NULL
,f_nonce BIGINT NOT NULL
,f_r BYTEA NOT NULL
,f_s BYTEA NOT NULL
,f_y_parity BOOLEAN NOT NULL
)
`); err != nil {
return errors.Wrap(err, "failed to create t_transaction_authorization_lists")
}
if _, err := tx.Exec(ctx, `
CREATE UNIQUE INDEX i_transaction_authorization_lists_1 ON t_transaction_authorization_lists(f_transaction_hash,f_block_height,f_index);
`); err != nil {
return errors.Wrap(err, "failed to create _transaction_authorization_lists_1")
}
if _, err := tx.Exec(ctx, `
CREATE INDEX i_transaction_authorization_lists_2 ON t_transaction_authorization_lists(f_address);
`); err != nil {
return errors.Wrap(err, "failed to create _transaction_authorization_lists_2")
}
if _, err := tx.Exec(ctx, `
CREATE INDEX i_transaction_authorization_lists_3 ON t_transaction_authorization_lists(f_block_height);
`); err != nil {
return errors.Wrap(err, "failed to create _transaction_authorization_lists_3")
}
return nil
}