Ergo Streaming is a Kafka-based streaming service for Ergo chain data.
This service polls the Ergo node and streams blockchain data via Kafka, handling both chain progression and rollbacks.
There are three topics:
blocks_topic
: Block-related eventstx_topic
: Transaction-related eventsmempool_topic
: Mempool transaction events
The blocks_topic
fires when blocks are added or removed from the chain. The message is a JSON object with one of two types:
BlockApply (new block added):
{
"timestamp": <block_timestamp>,
"height": <block_height>,
"id": <block_id>
}
BlockUnapply (block removed during rollback):
{
"timestamp": <block_timestamp>,
"height": <block_height>,
"id": <block_id>
}
The tx_topic
fires for transaction events. Messages can be either:
AppliedEvent (new transaction):
{
"timestamp": <block_timestamp>,
"height": <block_height>,
"tx": <base64_encoded_transaction>
}
UnappliedEvent (transaction removed during rollback):
{
"tx": <base64_encoded_transaction>
}
The mempool_topic
tracks transactions entering and leaving the mempool:
TxAccepted (transaction entered mempool):
{
"tx": <base64_encoded_transaction>
}
TxWithdrawn (transaction left mempool):
{
"tx": <base64_encoded_transaction>
}
docker compose up --build -d
The service handles blockchain reorganizations (rollbacks) by:
- Emitting
BlockUnapply
events for each block being rolled back - Emitting
UnappliedEvent
for each transaction in those blocks (in reverse order) - Then emitting new
BlockApply
andAppliedEvent
messages for the new chain
All transaction events are guaranteed to be sequential and properly ordered relative to their blocks:
- When a new block is added, you'll first receive a
BlockApply
event - This is followed by
AppliedEvent
messages for each transaction in that block - During rollbacks, you'll first receive a
BlockUnapply
event - This is followed by
UnappliedEvent
messages for each transaction in reverse order
This sequencing is enforced by the service's architecture:
- Block events are processed through
block_event_source
- Transaction events are derived from these blocks via
tx_event_source
- The
process_upgrade
function ensures transactions are handled in the correct order (forward for applies, reverse for rollbacks)
chain_sync_starting_height
: The block height where chain synchronization begins (e.g., 1400000)chain_sync_batch_size
: Number of blocks to request in a single batch from the node (e.g., 50). The larger, the faster the sync. However it puts too much strain on the node.chain_sync_chunk_size
: Number of full blocks to retrive at once from node (e.g., 5). The larger, the faster the sync. However it puts too much strain on the node.
chain_cache_db_path
: Location for the RocksDB database storing chain statemempool_cache_db_path
: Location for the RocksDB database storing mempool state
http_client_timeout_duration_secs
: Maximum time to wait for node API responses (in seconds)mempool_sync_interval
: How often to poll the mempool for changes (in seconds)
node_addr
: Ergo node API endpointkafka_address
: Kafka broker address (format: "host:port")- Topic names can be configured via
blocks_topic
,tx_topic
, andmempool_topic
This project was forked from Spectrum.
Its modified to sync a lot faster by using batch endpoints. The kafka service is also modified to use kraft rather than zookeeper.