Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add mode to write to CSV files in statediff file writer #249

Merged
merged 8 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 1 addition & 24 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2

- uses: actions/checkout@v3
with:
ref: ${{ env.stack-orchestrator-ref }}
path: "./stack-orchestrator/"
repository: vulcanize/stack-orchestrator
fetch-depth: 0

- uses: actions/checkout@v3
with:
ref: ${{ env.ipld-eth-db-ref }}
repository: vulcanize/ipld-eth-db
path: "./ipld-eth-db/"
fetch-depth: 0

- name: Create config file
run: |
echo vulcanize_ipld_eth_db=$GITHUB_WORKSPACE/ipld-eth-db/ > $GITHUB_WORKSPACE/config.sh
echo db_write=true >> $GITHUB_WORKSPACE/config.sh
cat $GITHUB_WORKSPACE/config.sh

- name: Run docker compose
run: |
docker-compose \
-f "$GITHUB_WORKSPACE/stack-orchestrator/docker/local/docker-compose-db-sharding.yml" \
--env-file $GITHUB_WORKSPACE/config.sh \
up -d --build
docker-compose up -d

- name: Give the migration a few seconds
run: sleep 30;
Expand Down
8 changes: 8 additions & 0 deletions cmd/geth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,15 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
}
switch dbType {
case shared.FILE:
fileModeStr := ctx.GlobalString(utils.StateDiffFileMode.Name)
fileMode, err := file.ResolveFileMode(fileModeStr)
if err != nil {
utils.Fatalf("%v", err)
}

indexerConfig = file.Config{
Mode: fileMode,
OutputDir: ctx.GlobalString(utils.StateDiffFileCsvDir.Name),
FilePath: ctx.GlobalString(utils.StateDiffFilePath.Name),
WatchedAddressesFilePath: ctx.GlobalString(utils.StateDiffWatchedAddressesFilePath.Name),
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ var (
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.StateDiffDBClientNameFlag,
utils.StateDiffWritingFlag,
utils.StateDiffWorkersFlag,
utils.StateDiffFileMode,
utils.StateDiffFileCsvDir,
utils.StateDiffFilePath,
utils.StateDiffKnownGapsFilePath,
utils.StateDiffWaitForSync,
Expand Down
11 changes: 10 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,9 +902,18 @@ var (
Name: "statediff.db.nodeid",
Usage: "Node ID to use when writing state diffs to database",
}
StateDiffFileMode = cli.StringFlag{
Name: "statediff.file.mode",
Usage: "Statediff file writing mode (current options: csv, sql)",
Value: "csv",
}
StateDiffFileCsvDir = cli.StringFlag{
Name: "statediff.file.csvdir",
Usage: "Full path of output directory to write statediff data out to when operating in csv file mode",
}
StateDiffFilePath = cli.StringFlag{
Name: "statediff.file.path",
Usage: "Full path (including filename) to write statediff data out to when operating in file mode",
Usage: "Full path (including filename) to write statediff data out to when operating in sql file mode",
}
StateDiffKnownGapsFilePath = cli.StringFlag{
Name: "statediff.knowngapsfile.path",
Expand Down
23 changes: 18 additions & 5 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
version: "3.2"

services:
ipld-eth-db:
migrations:
restart: on-failure
depends_on:
- access-node
image: vulcanize/ipld-eth-db:v4.1.1-alpha
- ipld-eth-db
image: vulcanize/ipld-eth-db:v4.1.4-alpha
environment:
DATABASE_USER: "vdbm"
DATABASE_NAME: "vulcanize_testing_v4"
DATABASE_NAME: "vulcanize_testing"
DATABASE_PASSWORD: "password"
DATABASE_HOSTNAME: "access-node"
DATABASE_HOSTNAME: "ipld-eth-db"
DATABASE_PORT: 5432

ipld-eth-db:
image: timescale/timescaledb:latest-pg14
restart: always
command: ["postgres", "-c", "log_statement=all"]
environment:
POSTGRES_USER: "vdbm"
POSTGRES_DB: "vulcanize_testing"
POSTGRES_PASSWORD: "password"
ports:
- "127.0.0.1:8077:5432"
volumes:
- ./statediff/indexer/database/file:/file_indexer
57 changes: 47 additions & 10 deletions statediff/indexer/database/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@
package file

import (
"fmt"
"strings"

"github.com/ethereum/go-ethereum/statediff/indexer/node"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)

// Config holds params for writing sql statements out to a file
// FileMode to explicitly type the mode of file writer we are using
type FileMode string

const (
CSV FileMode = "CSV"
SQL FileMode = "SQL"
Unknown FileMode = "Unknown"
)

// ResolveFileMode resolves a FileMode from a provided string
func ResolveFileMode(str string) (FileMode, error) {
switch strings.ToLower(str) {
case "csv":
return CSV, nil
case "sql":
return SQL, nil
default:
return Unknown, fmt.Errorf("unrecognized file type string: %s", str)
}
}

// Config holds params for writing out CSV or SQL files
type Config struct {
Mode FileMode
OutputDir string
FilePath string
WatchedAddressesFilePath string
NodeInfo node.Info
Expand All @@ -33,15 +59,26 @@ func (c Config) Type() shared.DBType {
return shared.FILE
}

// TestConfig config for unit tests
var TestConfig = Config{
var nodeInfo = node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
}

// CSVTestConfig config for unit tests
var CSVTestConfig = Config{
Mode: CSV,
OutputDir: "./statediffing_test",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.csv",
NodeInfo: nodeInfo,
}

// SQLTestConfig config for unit tests
var SQLTestConfig = Config{
Mode: SQL,
FilePath: "./statediffing_test_file.sql",
WatchedAddressesFilePath: "./statediffing_watched_addresses_test_file.sql",
NodeInfo: node.Info{
GenesisBlock: "0xd4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3",
NetworkID: "1",
ChainID: 1,
ID: "mockNodeID",
ClientName: "go-ethereum",
},
NodeInfo: nodeInfo,
}
135 changes: 135 additions & 0 deletions statediff/indexer/database/file/csv_indexer_legacy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// VulcanizeDB
// Copyright © 2022 Vulcanize

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package file_test

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"testing"

"github.com/jmoiron/sqlx"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"

"github.com/ethereum/go-ethereum/statediff/indexer/database/file"
"github.com/ethereum/go-ethereum/statediff/indexer/database/file/types"
"github.com/ethereum/go-ethereum/statediff/indexer/database/sql/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/interfaces"
"github.com/ethereum/go-ethereum/statediff/indexer/ipld"
)

const dbDirectory = "/file_indexer"
const pgCopyStatement = `COPY %s FROM '%s' CSV`

func setupCSVLegacy(t *testing.T) {
mockLegacyBlock = legacyData.MockBlock
legacyHeaderCID, _ = ipld.RawdataToCid(ipld.MEthHeader, legacyData.MockHeaderRlp, multihash.KECCAK_256)
file.CSVTestConfig.OutputDir = "./statediffing_legacy_test"

if _, err := os.Stat(file.CSVTestConfig.OutputDir); !errors.Is(err, os.ErrNotExist) {
err := os.RemoveAll(file.CSVTestConfig.OutputDir)
require.NoError(t, err)
}

ind, err := file.NewStateDiffIndexer(context.Background(), legacyData.Config, file.CSVTestConfig)
require.NoError(t, err)
var tx interfaces.Batch
tx, err = ind.PushBlock(
mockLegacyBlock,
legacyData.MockReceipts,
legacyData.MockBlock.Difficulty())
require.NoError(t, err)

defer func() {
if err := tx.Submit(err); err != nil {
t.Fatal(err)
}
if err := ind.Close(); err != nil {
t.Fatal(err)
}
}()

for _, node := range legacyData.StateDiffs {
err = ind.PushStateNode(tx, node, legacyData.MockBlock.Hash().String())
require.NoError(t, err)
}

require.Equal(t, legacyData.BlockNumber.String(), tx.(*file.BatchTx).BlockNumber)

connStr := postgres.DefaultConfig.DbConnectionString()
sqlxdb, err = sqlx.Connect("postgres", connStr)
if err != nil {
t.Fatalf("failed to connect to db with connection string: %s err: %v", connStr, err)
}
}

func dumpCSVFileData(t *testing.T) {
outputDir := filepath.Join(dbDirectory, file.CSVTestConfig.OutputDir)

for _, tbl := range file.Tables {
var stmt string
varcharColumns := tbl.VarcharColumns()
if len(varcharColumns) > 0 {
stmt = fmt.Sprintf(
pgCopyStatement+" FORCE NOT NULL %s",
tbl.Name,
file.TableFilePath(outputDir, tbl.Name),
strings.Join(varcharColumns, ", "),
)
} else {
stmt = fmt.Sprintf(pgCopyStatement, tbl.Name, file.TableFilePath(outputDir, tbl.Name))
}

_, err = sqlxdb.Exec(stmt)
require.NoError(t, err)
}
}

func dumpWatchedAddressesCSVFileData(t *testing.T) {
outputFilePath := filepath.Join(dbDirectory, file.CSVTestConfig.WatchedAddressesFilePath)
stmt := fmt.Sprintf(pgCopyStatement, types.TableWatchedAddresses.Name, outputFilePath)

_, err = sqlxdb.Exec(stmt)
require.NoError(t, err)
}

func tearDownCSV(t *testing.T) {
file.TearDownDB(t, sqlxdb)

err := os.RemoveAll(file.CSVTestConfig.OutputDir)
require.NoError(t, err)

if err := os.Remove(file.CSVTestConfig.WatchedAddressesFilePath); !errors.Is(err, os.ErrNotExist) {
require.NoError(t, err)
}

err = sqlxdb.Close()
require.NoError(t, err)
}

func TestCSVFileIndexerLegacy(t *testing.T) {
t.Run("Publish and index header IPLDs", func(t *testing.T) {
setupCSVLegacy(t)
dumpCSVFileData(t)
defer tearDownCSV(t)
testLegacyPublishAndIndexHeaderIPLDs(t)
})
}
Loading