diff --git a/indexer/packages/postgres/src/loops/liquidity-tier-refresher.ts b/indexer/packages/postgres/src/loops/liquidity-tier-refresher.ts index 4082827221..0722004db9 100644 --- a/indexer/packages/postgres/src/loops/liquidity-tier-refresher.ts +++ b/indexer/packages/postgres/src/loops/liquidity-tier-refresher.ts @@ -1,4 +1,6 @@ -import { delay, logger, stats } from '@dydxprotocol-indexer/base'; +import { + NodeEnv, delay, logger, stats, +} from '@dydxprotocol-indexer/base'; import config from '../config'; import * as LiquidityTiersTable from '../stores/liquidity-tiers-table'; @@ -57,3 +59,18 @@ export function getLiquidityTierFromId(id: number): LiquidityTiersFromDatabase { export function getLiquidityTiersMap(): LiquidityTiersMap { return idToLiquidityTier; } + +export function upsertLiquidityTier(liquidityTier: LiquidityTiersFromDatabase): void { + idToLiquidityTier[liquidityTier.id] = liquidityTier; +} + +/** + * Clears the in-memory map of liquidity tier ids to liquidity tiers. + * Used for testing. + */ +export function clear(): void { + if (config.NODE_ENV !== NodeEnv.TEST) { + throw new Error('clear cannot be used in non-test env'); + } + idToLiquidityTier = {}; +} diff --git a/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts b/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts index 405e1325b4..2014acfe63 100644 --- a/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/liquidity-tier-handler.test.ts @@ -16,15 +16,21 @@ import { TendermintEventTable, testConstants, protocolTranslations, + liquidityTierRefresher, + PerpetualMarketFromDatabase, + perpetualMarketRefresher, + PerpetualMarketTable, + MarketTable, } from '@dydxprotocol-indexer/postgres'; import { KafkaMessage } from 'kafkajs'; -import { createKafkaMessage } from '@dydxprotocol-indexer/kafka'; +import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka'; import { onMessage } from '../../src/lib/on-message'; import { DydxIndexerSubtypes } from '../../src/lib/types'; import { binaryToBase64String, createIndexerTendermintBlock, createIndexerTendermintEvent, + expectPerpetualMarketKafkaMessage, } from '../helpers/indexer-proto-helpers'; import { LiquidityTierHandler } from '../../src/handlers/liquidity-tier-handler'; import { @@ -32,6 +38,7 @@ import { } from '../helpers/constants'; import { updateBlockCache } from '../../src/caches/block-cache'; import { defaultLiquidityTier } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants'; +import _ from 'lodash'; describe('liquidityTierHandler', () => { beforeAll(async () => { @@ -57,6 +64,8 @@ describe('liquidityTierHandler', () => { afterEach(async () => { await dbHelpers.clearData(); jest.clearAllMocks(); + liquidityTierRefresher.clear(); + perpetualMarketRefresher.clear(); }); afterAll(async () => { @@ -107,7 +116,9 @@ describe('liquidityTierHandler', () => { }); // Confirm there is no existing liquidity tier await expectNoExistingLiquidityTiers(); + await perpetualMarketRefresher.updatePerpetualMarkets(); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); await onMessage(kafkaMessage); const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll( @@ -118,6 +129,8 @@ describe('liquidityTierHandler', () => { expect(newLiquidityTiers.length).toEqual(1); expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent); expectTimingStats(); + validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent); + expectKafkaMessages(producerSendMock, liquidityTierEvent, 0); }); it('updates existing liquidity tier', async () => { @@ -133,6 +146,18 @@ describe('liquidityTierHandler', () => { // Create existing liquidity tier await LiquidityTiersTable.upsert(defaultLiquidityTier); + // create perpetual market with existing liquidity tier to test websockets + await Promise.all([ + MarketTable.create(testConstants.defaultMarket), + MarketTable.create(testConstants.defaultMarket2), + ]); + await Promise.all([ + PerpetualMarketTable.create(testConstants.defaultPerpetualMarket), + PerpetualMarketTable.create(testConstants.defaultPerpetualMarket2), + ]); + await perpetualMarketRefresher.updatePerpetualMarkets(); + + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); await onMessage(kafkaMessage); const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll( @@ -143,6 +168,8 @@ describe('liquidityTierHandler', () => { expect(newLiquidityTiers.length).toEqual(1); expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent); expectTimingStats(); + validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent); + expectKafkaMessages(producerSendMock, liquidityTierEvent, 2); }); }); @@ -224,3 +251,42 @@ async function expectNoExistingLiquidityTiers() { expect(liquidityTiers.length).toEqual(0); } + +function validateLiquidityTierRefresher( + liquidityTierEvent: LiquidityTierUpsertEventV1, +) { + const liquidityTier: + LiquidityTiersFromDatabase = liquidityTierRefresher.getLiquidityTierFromId( + liquidityTierEvent.id, + ); + + expect(liquidityTier).toEqual({ + id: liquidityTierEvent.id, + name: liquidityTierEvent.name, + initialMarginPpm: liquidityTierEvent.initialMarginPpm.toString(), + maintenanceFractionPpm: liquidityTierEvent.maintenanceFractionPpm.toString(), + basePositionNotional: protocolTranslations.quantumsToHuman( + liquidityTierEvent.basePositionNotional.toString(), + QUOTE_CURRENCY_ATOMIC_RESOLUTION, + ).toFixed(6), + }); +} + +function expectKafkaMessages( + producerSendMock: jest.SpyInstance, + liquidityTier: LiquidityTierUpsertEventV1, + numPerpetualMarkets: number, +) { + const perpetualMarkets: PerpetualMarketFromDatabase[] = _.filter( + perpetualMarketRefresher.getPerpetualMarketsList(), + (perpetualMarket: PerpetualMarketFromDatabase) => { + return perpetualMarket.liquidityTierId === liquidityTier.id; + }, + ); + expect(perpetualMarkets.length).toEqual(numPerpetualMarkets); + + if (perpetualMarkets.length === 0) { + return; + } + expectPerpetualMarketKafkaMessage(producerSendMock, perpetualMarkets); +} diff --git a/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts b/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts index 2cbff70615..656dddc788 100644 --- a/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/perpetual-market-handler.test.ts @@ -18,15 +18,16 @@ import { TendermintEventTable, perpetualMarketRefresher, LiquidityTiersTable, + liquidityTierRefresher, } from '@dydxprotocol-indexer/postgres'; import { KafkaMessage } from 'kafkajs'; -import { createKafkaMessage } from '@dydxprotocol-indexer/kafka'; +import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka'; import { onMessage } from '../../src/lib/on-message'; import { DydxIndexerSubtypes } from '../../src/lib/types'; import { binaryToBase64String, createIndexerTendermintBlock, - createIndexerTendermintEvent, expectPerpetualMarket, + createIndexerTendermintEvent, expectPerpetualMarket, expectPerpetualMarketKafkaMessage, } from '../helpers/indexer-proto-helpers'; import { PerpetualMarketCreationHandler } from '../../src/handlers/perpetual-market-handler'; import { @@ -62,6 +63,7 @@ describe('perpetualMarketHandler', () => { afterEach(async () => { await dbHelpers.clearData(); jest.clearAllMocks(); + liquidityTierRefresher.clear(); }); afterAll(async () => { @@ -133,6 +135,7 @@ describe('perpetualMarketHandler', () => { MarketTable.create(testConstants.defaultMarket), LiquidityTiersTable.create(testConstants.defaultLiquidityTier), ]); + await liquidityTierRefresher.updateLiquidityTiers(); await marketRefresher.updateMarkets(); const transactionIndex: number = 0; @@ -148,6 +151,7 @@ describe('perpetualMarketHandler', () => { // Confirm there is no existing perpetualMarket. await expectNoExistingPerpetualMarkets(); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); await onMessage(kafkaMessage); const newPerpetualMarkets: PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll( @@ -161,6 +165,7 @@ describe('perpetualMarketHandler', () => { const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher.getPerpetualMarketFromId('0'); expect(perpetualMarket).toBeDefined(); expectPerpetualMarket(perpetualMarket!, perpetualMarketEvent); + expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); }); }); diff --git a/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts b/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts index 09bebba6b5..55a8de7e27 100644 --- a/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/update-clob-pair-handler.test.ts @@ -3,6 +3,7 @@ import { PerpetualMarketFromDatabase, PerpetualMarketTable, dbHelpers, + liquidityTierRefresher, perpetualMarketRefresher, protocolTranslations, testMocks, @@ -25,10 +26,11 @@ import { binaryToBase64String, createIndexerTendermintBlock, createIndexerTendermintEvent, + expectPerpetualMarketKafkaMessage, } from '../helpers/indexer-proto-helpers'; import { DydxIndexerSubtypes } from '../../src/lib/types'; import { UpdateClobPairHandler } from '../../src/handlers/update-clob-pair-handler'; -import { createKafkaMessage } from '@dydxprotocol-indexer/kafka'; +import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; import { onMessage } from '../../src/lib/on-message'; @@ -44,12 +46,14 @@ describe('update-clob-pair-handler', () => { await testMocks.seedData(); updateBlockCache(defaultPreviousHeight); await perpetualMarketRefresher.updatePerpetualMarkets(); + await liquidityTierRefresher.updateLiquidityTiers(); }); afterEach(async () => { await dbHelpers.clearData(); jest.clearAllMocks(); perpetualMarketRefresher.clear(); + liquidityTierRefresher.clear(); }); afterAll(async () => { @@ -97,6 +101,7 @@ describe('update-clob-pair-handler', () => { time: defaultTime, txHash: defaultTxHash, }); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); await onMessage(kafkaMessage); const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( @@ -115,6 +120,7 @@ describe('update-clob-pair-handler', () => { stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(), })); expectTimingStats(); + expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); }); }); diff --git a/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts b/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts index d2b27a1d0c..7932f725d3 100644 --- a/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/update-perpetual-handler.test.ts @@ -3,6 +3,7 @@ import { PerpetualMarketFromDatabase, PerpetualMarketTable, dbHelpers, + liquidityTierRefresher, perpetualMarketRefresher, testMocks, } from '@dydxprotocol-indexer/postgres'; @@ -20,10 +21,15 @@ import { Timestamp, UpdatePerpetualEventV1, } from '@dydxprotocol-indexer/v4-protos'; -import { binaryToBase64String, createIndexerTendermintBlock, createIndexerTendermintEvent } from '../helpers/indexer-proto-helpers'; +import { + binaryToBase64String, + createIndexerTendermintBlock, + createIndexerTendermintEvent, + expectPerpetualMarketKafkaMessage, +} from '../helpers/indexer-proto-helpers'; import { DydxIndexerSubtypes } from '../../src/lib/types'; import { UpdatePerpetualHandler } from '../../src/handlers/update-perpetual-handler'; -import { createKafkaMessage } from '@dydxprotocol-indexer/kafka'; +import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka'; import { KafkaMessage } from 'kafkajs'; import { onMessage } from '../../src/lib/on-message'; @@ -39,12 +45,14 @@ describe('update-perpetual-handler', () => { await testMocks.seedData(); updateBlockCache(defaultPreviousHeight); await perpetualMarketRefresher.updatePerpetualMarkets(); + await liquidityTierRefresher.updateLiquidityTiers(); }); afterEach(async () => { await dbHelpers.clearData(); jest.clearAllMocks(); perpetualMarketRefresher.clear(); + liquidityTierRefresher.clear(); }); afterAll(async () => { @@ -92,6 +100,7 @@ describe('update-perpetual-handler', () => { time: defaultTime, txHash: defaultTxHash, }); + const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send'); await onMessage(kafkaMessage); const perpetualMarket: @@ -106,6 +115,7 @@ describe('update-perpetual-handler', () => { liquidityTierId: defaultUpdatePerpetualEvent.liquidityTier, })); expectTimingStats(); + expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]); }); }); diff --git a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts index a2aa4291c1..1ee815d72a 100644 --- a/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts +++ b/indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts @@ -53,6 +53,7 @@ import _ from 'lodash'; import { convertPerpetualPosition, generateFillSubaccountMessage, + generatePerpetualMarketMessage, generatePerpetualPositionsContents, isLiquidation, } from '../../src/helpers/kafka-helper'; @@ -188,6 +189,16 @@ export function expectSubaccountKafkaMessage({ expect(subaccountMessageJsons).toContainEqual(expectedSubaccountMessageJson); } +export function expectPerpetualMarketKafkaMessage( + producerSendMock: jest.SpyInstance, + perpetualMarkets: PerpetualMarketFromDatabase[], +) { + expectMarketKafkaMessage({ + producerSendMock, + contents: JSON.stringify(generatePerpetualMarketMessage(perpetualMarkets)), + }); +} + export function expectMarketKafkaMessage({ producerSendMock, contents, diff --git a/indexer/services/ender/src/handlers/liquidity-tier-handler.ts b/indexer/services/ender/src/handlers/liquidity-tier-handler.ts index 2e6e0ec0fc..515a837a60 100644 --- a/indexer/services/ender/src/handlers/liquidity-tier-handler.ts +++ b/indexer/services/ender/src/handlers/liquidity-tier-handler.ts @@ -1,7 +1,17 @@ -import { LiquidityTiersCreateObject, LiquidityTiersTable, protocolTranslations } from '@dydxprotocol-indexer/postgres'; +import { + LiquidityTiersCreateObject, + LiquidityTiersFromDatabase, + LiquidityTiersTable, + PerpetualMarketFromDatabase, + liquidityTierRefresher, + perpetualMarketRefresher, + protocolTranslations, +} from '@dydxprotocol-indexer/postgres'; import { LiquidityTierUpsertEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import _ from 'lodash'; import { QUOTE_CURRENCY_ATOMIC_RESOLUTION } from '../constants'; +import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -14,18 +24,21 @@ export class LiquidityTierHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { - await this.runFuncWithTimingStatAndErrorLogging( + const liquidityTier: + LiquidityTiersFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.upsertLiquidityTier(), this.generateTimingStatsOptions('upsert_liquidity_tier'), ); - return []; + return this.generateWebsocketEventsForLiquidityTier(liquidityTier); } - private async upsertLiquidityTier(): Promise { - await LiquidityTiersTable.upsert( + private async upsertLiquidityTier(): Promise { + const liquidityTier: LiquidityTiersFromDatabase = await LiquidityTiersTable.upsert( this.getLiquidityTiersCreateObject(this.event), { txId: this.txId }, ); + liquidityTierRefresher.upsertLiquidityTier(liquidityTier); + return liquidityTier; } private getLiquidityTiersCreateObject(liquidityTier: LiquidityTierUpsertEventV1): @@ -41,4 +54,24 @@ export class LiquidityTierHandler extends Handler { ).toFixed(6), }; } + + private generateWebsocketEventsForLiquidityTier(liquidityTier: LiquidityTiersFromDatabase): + ConsolidatedKafkaEvent[] { + const perpetualMarkets: PerpetualMarketFromDatabase[] = _.filter( + perpetualMarketRefresher.getPerpetualMarketsList(), + (perpetualMarket: PerpetualMarketFromDatabase) => { + return perpetualMarket.liquidityTierId === liquidityTier.id; + }, + ); + + if (perpetualMarkets.length === 0) { + return []; + } + + return [ + this.generateConsolidatedMarketKafkaEvent( + JSON.stringify(generatePerpetualMarketMessage(perpetualMarkets)), + ), + ]; + } } diff --git a/indexer/services/ender/src/handlers/perpetual-market-handler.ts b/indexer/services/ender/src/handlers/perpetual-market-handler.ts index 5c07371740..2a4a6b3fbd 100644 --- a/indexer/services/ender/src/handlers/perpetual-market-handler.ts +++ b/indexer/services/ender/src/handlers/perpetual-market-handler.ts @@ -7,6 +7,7 @@ import { } from '@dydxprotocol-indexer/postgres'; import { PerpetualMarketCreateEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -19,20 +20,25 @@ export class PerpetualMarketCreationHandler extends Handler { - await this.runFuncWithTimingStatAndErrorLogging( + const perpetualMarket: + PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.createPerpetualMarket(), this.generateTimingStatsOptions('create_perpetual_market'), ); - // TODO(IND-374): Send update to markets websocket channel. - return []; + return [ + this.generateConsolidatedMarketKafkaEvent( + JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])), + ), + ]; } - private async createPerpetualMarket(): Promise { + private async createPerpetualMarket(): Promise { const perpetualMarket: PerpetualMarketFromDatabase = await PerpetualMarketTable.create( this.getPerpetualMarketCreateObject(this.event), { txId: this.txId }, ); perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket); + return perpetualMarket; } /** diff --git a/indexer/services/ender/src/handlers/update-clob-pair-handler.ts b/indexer/services/ender/src/handlers/update-clob-pair-handler.ts index 40448e0407..070528b613 100644 --- a/indexer/services/ender/src/handlers/update-clob-pair-handler.ts +++ b/indexer/services/ender/src/handlers/update-clob-pair-handler.ts @@ -1,8 +1,11 @@ +import assert from 'assert'; + import { PerpetualMarketFromDatabase, PerpetualMarketTable, perpetualMarketRefresher, protocolTranslations, } from '@dydxprotocol-indexer/postgres'; import { UpdateClobPairEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -15,15 +18,19 @@ export class UpdateClobPairHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { - await this.runFuncWithTimingStatAndErrorLogging( + const perpetualMarket: + PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.updateClobPair(), this.generateTimingStatsOptions('update_clob_pair'), ); - // TODO(IND-374): Send update to markets websocket channel. - return []; + return [ + this.generateConsolidatedMarketKafkaEvent( + JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])), + ), + ]; } - private async updateClobPair(): Promise { + private async updateClobPair(): Promise { // perpetualMarketRefresher.getPerpetualMarketFromClobPairId() cannot be undefined because it // is validated by UpdateClobPairValidator. const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId( @@ -40,11 +47,15 @@ export class UpdateClobPairHandler extends Handler { }, { txId: this.txId }); if (perpetualMarket === undefined) { - return this.logAndThrowParseMessageError( + this.logAndThrowParseMessageError( 'Could not find perpetual market with corresponding updatePerpetualEvent.id', { event: this.event }, ); + // This assert should never be hit because a ParseMessageError should be thrown above. + assert(false); } + await perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket); + return perpetualMarket; } } diff --git a/indexer/services/ender/src/handlers/update-perpetual-handler.ts b/indexer/services/ender/src/handlers/update-perpetual-handler.ts index daabb01645..2c921f7e02 100644 --- a/indexer/services/ender/src/handlers/update-perpetual-handler.ts +++ b/indexer/services/ender/src/handlers/update-perpetual-handler.ts @@ -1,8 +1,11 @@ +import assert from 'assert'; + import { PerpetualMarketFromDatabase, PerpetualMarketTable, perpetualMarketRefresher, } from '@dydxprotocol-indexer/postgres'; import { UpdatePerpetualEventV1 } from '@dydxprotocol-indexer/v4-protos'; +import { generatePerpetualMarketMessage } from '../helpers/kafka-helper'; import { ConsolidatedKafkaEvent } from '../lib/types'; import { Handler } from './handler'; @@ -15,15 +18,19 @@ export class UpdatePerpetualHandler extends Handler { // eslint-disable-next-line @typescript-eslint/require-await public async internalHandle(): Promise { - await this.runFuncWithTimingStatAndErrorLogging( + const perpetualMarket: + PerpetualMarketFromDatabase = await this.runFuncWithTimingStatAndErrorLogging( this.updatePerpetual(), this.generateTimingStatsOptions('update_perpetual'), ); - // TODO(IND-374): Send update to markets websocket channel. - return []; + return [ + this.generateConsolidatedMarketKafkaEvent( + JSON.stringify(generatePerpetualMarketMessage([perpetualMarket])), + ), + ]; } - private async updatePerpetual(): Promise { + private async updatePerpetual(): Promise { const perpetualMarket: PerpetualMarketFromDatabase | undefined = await PerpetualMarketTable.update({ id: this.event.id.toString(), @@ -34,11 +41,15 @@ export class UpdatePerpetualHandler extends Handler { }, { txId: this.txId }); if (perpetualMarket === undefined) { - return this.logAndThrowParseMessageError( + this.logAndThrowParseMessageError( 'Could not find perpetual market with corresponding updatePerpetualEvent.id', { event: this.event }, ); + // This assert should never be hit because a ParseMessageError should be thrown above. + assert(false); } + await perpetualMarketRefresher.upsertPerpetualMarket(perpetualMarket); + return perpetualMarket; } } diff --git a/indexer/services/ender/src/helpers/kafka-helper.ts b/indexer/services/ender/src/helpers/kafka-helper.ts index 8027f66975..1275d574df 100644 --- a/indexer/services/ender/src/helpers/kafka-helper.ts +++ b/indexer/services/ender/src/helpers/kafka-helper.ts @@ -23,6 +23,12 @@ import { PerpetualPositionFromDatabase, AssetPositionSubaccountMessageContents, SubaccountTable, + LiquidityTiersFromDatabase, + liquidityTierRefresher, + LiquidityTiersMap, + PerpetualMarketColumns, + TradingPerpetualMarketMessage, + TradingMarketMessageContents, } from '@dydxprotocol-indexer/postgres'; import { SubaccountId } from '@dydxprotocol-indexer/v4-protos'; import Big from 'big.js'; @@ -295,3 +301,42 @@ export function generateOrderSubaccountMessage( ticker, }; } + +export function generatePerpetualMarketMessage( + perpetualMarkets: PerpetualMarketFromDatabase[], +): MarketMessageContents { + const liquidityTierMap: LiquidityTiersMap = liquidityTierRefresher.getLiquidityTiersMap(); + + const tradingMarketMessageContents: TradingMarketMessageContents = _.chain(perpetualMarkets) + .keyBy(PerpetualMarketColumns.ticker) + .mapValues((perpetualMarket: PerpetualMarketFromDatabase): TradingPerpetualMarketMessage => { + const liquidityTier: + LiquidityTiersFromDatabase = liquidityTierMap[perpetualMarket.liquidityTierId]; + + return { + id: perpetualMarket.id, + clobPairId: perpetualMarket.clobPairId.toString(), + ticker: perpetualMarket.ticker, + marketId: perpetualMarket.marketId, + status: perpetualMarket.status, + quantumConversionExponent: perpetualMarket.quantumConversionExponent, + atomicResolution: perpetualMarket.atomicResolution, + subticksPerTick: perpetualMarket.subticksPerTick, + minOrderBaseQuantums: perpetualMarket.minOrderBaseQuantums, + stepBaseQuantums: perpetualMarket.stepBaseQuantums, + initialMarginFraction: helpers.ppmToString(Number(liquidityTier.initialMarginPpm)), + maintenanceMarginFraction: helpers.ppmToString( + helpers.getMaintenanceMarginPpm( + Number(liquidityTier.initialMarginPpm), + Number(liquidityTier.maintenanceFractionPpm), + ), + ), + basePositionNotional: liquidityTier.basePositionNotional, + }; + }) + .value(); + + return { + trading: tradingMarketMessageContents, + }; +}