Skip to content

Commit

Permalink
[IND-374]: Add perpetual market websocket events (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher-Li authored Aug 30, 2023
1 parent e9e4395 commit bf817d2
Show file tree
Hide file tree
Showing 11 changed files with 247 additions and 26 deletions.
19 changes: 18 additions & 1 deletion indexer/packages/postgres/src/loops/liquidity-tier-refresher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { delay, logger, stats } from '@dydxprotocol-indexer/base';
import {
NodeEnv, delay, logger, stats,
} from '@dydxprotocol-indexer/base';

import config from '../config';
import * as LiquidityTiersTable from '../stores/liquidity-tiers-table';
Expand Down Expand Up @@ -57,3 +59,18 @@ export function getLiquidityTierFromId(id: number): LiquidityTiersFromDatabase {
export function getLiquidityTiersMap(): LiquidityTiersMap {
return idToLiquidityTier;
}

export function upsertLiquidityTier(liquidityTier: LiquidityTiersFromDatabase): void {
idToLiquidityTier[liquidityTier.id] = liquidityTier;
}

/**
* Clears the in-memory map of liquidity tier ids to liquidity tiers.
* Used for testing.
*/
export function clear(): void {
if (config.NODE_ENV !== NodeEnv.TEST) {
throw new Error('clear cannot be used in non-test env');
}
idToLiquidityTier = {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,29 @@ import {
TendermintEventTable,
testConstants,
protocolTranslations,
liquidityTierRefresher,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
PerpetualMarketTable,
MarketTable,
} from '@dydxprotocol-indexer/postgres';
import { KafkaMessage } from 'kafkajs';
import { createKafkaMessage } from '@dydxprotocol-indexer/kafka';
import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka';
import { onMessage } from '../../src/lib/on-message';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import {
binaryToBase64String,
createIndexerTendermintBlock,
createIndexerTendermintEvent,
expectPerpetualMarketKafkaMessage,
} from '../helpers/indexer-proto-helpers';
import { LiquidityTierHandler } from '../../src/handlers/liquidity-tier-handler';
import {
defaultHeight, defaultLiquidityTierUpsertEvent, defaultPreviousHeight, defaultTime, defaultTxHash,
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { defaultLiquidityTier } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants';
import _ from 'lodash';

describe('liquidityTierHandler', () => {
beforeAll(async () => {
Expand All @@ -57,6 +64,8 @@ describe('liquidityTierHandler', () => {
afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
liquidityTierRefresher.clear();
perpetualMarketRefresher.clear();
});

afterAll(async () => {
Expand Down Expand Up @@ -107,7 +116,9 @@ describe('liquidityTierHandler', () => {
});
// Confirm there is no existing liquidity tier
await expectNoExistingLiquidityTiers();
await perpetualMarketRefresher.updatePerpetualMarkets();

const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll(
Expand All @@ -118,6 +129,8 @@ describe('liquidityTierHandler', () => {
expect(newLiquidityTiers.length).toEqual(1);
expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent);
expectTimingStats();
validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent);
expectKafkaMessages(producerSendMock, liquidityTierEvent, 0);
});

it('updates existing liquidity tier', async () => {
Expand All @@ -133,6 +146,18 @@ describe('liquidityTierHandler', () => {
// Create existing liquidity tier
await LiquidityTiersTable.upsert(defaultLiquidityTier);

// create perpetual market with existing liquidity tier to test websockets
await Promise.all([
MarketTable.create(testConstants.defaultMarket),
MarketTable.create(testConstants.defaultMarket2),
]);
await Promise.all([
PerpetualMarketTable.create(testConstants.defaultPerpetualMarket),
PerpetualMarketTable.create(testConstants.defaultPerpetualMarket2),
]);
await perpetualMarketRefresher.updatePerpetualMarkets();

const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const newLiquidityTiers: LiquidityTiersFromDatabase[] = await LiquidityTiersTable.findAll(
Expand All @@ -143,6 +168,8 @@ describe('liquidityTierHandler', () => {
expect(newLiquidityTiers.length).toEqual(1);
expectLiquidityTier(newLiquidityTiers[0], liquidityTierEvent);
expectTimingStats();
validateLiquidityTierRefresher(defaultLiquidityTierUpsertEvent);
expectKafkaMessages(producerSendMock, liquidityTierEvent, 2);
});
});

Expand Down Expand Up @@ -224,3 +251,42 @@ async function expectNoExistingLiquidityTiers() {

expect(liquidityTiers.length).toEqual(0);
}

function validateLiquidityTierRefresher(
liquidityTierEvent: LiquidityTierUpsertEventV1,
) {
const liquidityTier:
LiquidityTiersFromDatabase = liquidityTierRefresher.getLiquidityTierFromId(
liquidityTierEvent.id,
);

expect(liquidityTier).toEqual({
id: liquidityTierEvent.id,
name: liquidityTierEvent.name,
initialMarginPpm: liquidityTierEvent.initialMarginPpm.toString(),
maintenanceFractionPpm: liquidityTierEvent.maintenanceFractionPpm.toString(),
basePositionNotional: protocolTranslations.quantumsToHuman(
liquidityTierEvent.basePositionNotional.toString(),
QUOTE_CURRENCY_ATOMIC_RESOLUTION,
).toFixed(6),
});
}

function expectKafkaMessages(
producerSendMock: jest.SpyInstance,
liquidityTier: LiquidityTierUpsertEventV1,
numPerpetualMarkets: number,
) {
const perpetualMarkets: PerpetualMarketFromDatabase[] = _.filter(
perpetualMarketRefresher.getPerpetualMarketsList(),
(perpetualMarket: PerpetualMarketFromDatabase) => {
return perpetualMarket.liquidityTierId === liquidityTier.id;
},
);
expect(perpetualMarkets.length).toEqual(numPerpetualMarkets);

if (perpetualMarkets.length === 0) {
return;
}
expectPerpetualMarketKafkaMessage(producerSendMock, perpetualMarkets);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import {
TendermintEventTable,
perpetualMarketRefresher,
LiquidityTiersTable,
liquidityTierRefresher,
} from '@dydxprotocol-indexer/postgres';
import { KafkaMessage } from 'kafkajs';
import { createKafkaMessage } from '@dydxprotocol-indexer/kafka';
import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka';
import { onMessage } from '../../src/lib/on-message';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import {
binaryToBase64String,
createIndexerTendermintBlock,
createIndexerTendermintEvent, expectPerpetualMarket,
createIndexerTendermintEvent, expectPerpetualMarket, expectPerpetualMarketKafkaMessage,
} from '../helpers/indexer-proto-helpers';
import { PerpetualMarketCreationHandler } from '../../src/handlers/perpetual-market-handler';
import {
Expand Down Expand Up @@ -62,6 +63,7 @@ describe('perpetualMarketHandler', () => {
afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
liquidityTierRefresher.clear();
});

afterAll(async () => {
Expand Down Expand Up @@ -133,6 +135,7 @@ describe('perpetualMarketHandler', () => {
MarketTable.create(testConstants.defaultMarket),
LiquidityTiersTable.create(testConstants.defaultLiquidityTier),
]);
await liquidityTierRefresher.updateLiquidityTiers();
await marketRefresher.updateMarkets();

const transactionIndex: number = 0;
Expand All @@ -148,6 +151,7 @@ describe('perpetualMarketHandler', () => {
// Confirm there is no existing perpetualMarket.
await expectNoExistingPerpetualMarkets();

const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const newPerpetualMarkets: PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll(
Expand All @@ -161,6 +165,7 @@ describe('perpetualMarketHandler', () => {
const perpetualMarket: PerpetualMarketFromDatabase | undefined = perpetualMarketRefresher.getPerpetualMarketFromId('0');
expect(perpetualMarket).toBeDefined();
expectPerpetualMarket(perpetualMarket!, perpetualMarketEvent);
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
PerpetualMarketFromDatabase,
PerpetualMarketTable,
dbHelpers,
liquidityTierRefresher,
perpetualMarketRefresher,
protocolTranslations,
testMocks,
Expand All @@ -25,10 +26,11 @@ import {
binaryToBase64String,
createIndexerTendermintBlock,
createIndexerTendermintEvent,
expectPerpetualMarketKafkaMessage,
} from '../helpers/indexer-proto-helpers';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import { UpdateClobPairHandler } from '../../src/handlers/update-clob-pair-handler';
import { createKafkaMessage } from '@dydxprotocol-indexer/kafka';
import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
import { onMessage } from '../../src/lib/on-message';

Expand All @@ -44,12 +46,14 @@ describe('update-clob-pair-handler', () => {
await testMocks.seedData();
updateBlockCache(defaultPreviousHeight);
await perpetualMarketRefresher.updatePerpetualMarkets();
await liquidityTierRefresher.updateLiquidityTiers();
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
perpetualMarketRefresher.clear();
liquidityTierRefresher.clear();
});

afterAll(async () => {
Expand Down Expand Up @@ -97,6 +101,7 @@ describe('update-clob-pair-handler', () => {
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarketId: string = perpetualMarketRefresher.getPerpetualMarketFromClobPairId(
Expand All @@ -115,6 +120,7 @@ describe('update-clob-pair-handler', () => {
stepBaseQuantums: defaultUpdateClobPairEvent.stepBaseQuantums.toNumber(),
}));
expectTimingStats();
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
PerpetualMarketFromDatabase,
PerpetualMarketTable,
dbHelpers,
liquidityTierRefresher,
perpetualMarketRefresher,
testMocks,
} from '@dydxprotocol-indexer/postgres';
Expand All @@ -20,10 +21,15 @@ import {
Timestamp,
UpdatePerpetualEventV1,
} from '@dydxprotocol-indexer/v4-protos';
import { binaryToBase64String, createIndexerTendermintBlock, createIndexerTendermintEvent } from '../helpers/indexer-proto-helpers';
import {
binaryToBase64String,
createIndexerTendermintBlock,
createIndexerTendermintEvent,
expectPerpetualMarketKafkaMessage,
} from '../helpers/indexer-proto-helpers';
import { DydxIndexerSubtypes } from '../../src/lib/types';
import { UpdatePerpetualHandler } from '../../src/handlers/update-perpetual-handler';
import { createKafkaMessage } from '@dydxprotocol-indexer/kafka';
import { createKafkaMessage, producer } from '@dydxprotocol-indexer/kafka';
import { KafkaMessage } from 'kafkajs';
import { onMessage } from '../../src/lib/on-message';

Expand All @@ -39,12 +45,14 @@ describe('update-perpetual-handler', () => {
await testMocks.seedData();
updateBlockCache(defaultPreviousHeight);
await perpetualMarketRefresher.updatePerpetualMarkets();
await liquidityTierRefresher.updateLiquidityTiers();
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
perpetualMarketRefresher.clear();
liquidityTierRefresher.clear();
});

afterAll(async () => {
Expand Down Expand Up @@ -92,6 +100,7 @@ describe('update-perpetual-handler', () => {
time: defaultTime,
txHash: defaultTxHash,
});
const producerSendMock: jest.SpyInstance = jest.spyOn(producer, 'send');
await onMessage(kafkaMessage);

const perpetualMarket:
Expand All @@ -106,6 +115,7 @@ describe('update-perpetual-handler', () => {
liquidityTierId: defaultUpdatePerpetualEvent.liquidityTier,
}));
expectTimingStats();
expectPerpetualMarketKafkaMessage(producerSendMock, [perpetualMarket!]);
});
});

Expand Down
11 changes: 11 additions & 0 deletions indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import _ from 'lodash';
import {
convertPerpetualPosition,
generateFillSubaccountMessage,
generatePerpetualMarketMessage,
generatePerpetualPositionsContents,
isLiquidation,
} from '../../src/helpers/kafka-helper';
Expand Down Expand Up @@ -188,6 +189,16 @@ export function expectSubaccountKafkaMessage({
expect(subaccountMessageJsons).toContainEqual(expectedSubaccountMessageJson);
}

export function expectPerpetualMarketKafkaMessage(
producerSendMock: jest.SpyInstance,
perpetualMarkets: PerpetualMarketFromDatabase[],
) {
expectMarketKafkaMessage({
producerSendMock,
contents: JSON.stringify(generatePerpetualMarketMessage(perpetualMarkets)),
});
}

export function expectMarketKafkaMessage({
producerSendMock,
contents,
Expand Down
Loading

0 comments on commit bf817d2

Please # to comment.