Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(fleet): add control loop and state management #3116

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/fleet/lib/models/deployments.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ describe('Deployments', () => {
expect(deployment.createdAt).toBeInstanceOf(Date);
expect(deployment.supersededAt).toBe(null);
});

it('should supersede any active deployments', async () => {
const commitId1 = generateCommitHash();
const commitId2 = generateCommitHash();
Expand Down
56 changes: 56 additions & 0 deletions packages/fleet/lib/models/helpers.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import type { CommitHash } from '../types';
import crypto from 'crypto';
import type { NodeState, Node, RoutingId } from '../types.js';
import type { knex } from 'knex';
import { nanoid } from '@nangohq/utils';
import * as nodes from './nodes.js';

export function generateCommitHash(): CommitHash {
const charset = '0123456789abcdef';
Expand All @@ -15,3 +19,55 @@ export function generateCommitHash(): CommitHash {
}
return value as CommitHash;
}

export async function createNodeWithAttributes(
db: knex.Knex,
{
state,
deploymentId,
routingId = nanoid(),
lastStateTransitionAt
}: { state: NodeState; deploymentId: number; routingId?: RoutingId; lastStateTransitionAt?: Date }
): Promise<Node> {
return db.transaction(async (trx) => {
let node = await createNode(trx, { routingId, deploymentId });
if (state == 'ERROR') {
node = (await nodes.fail(trx, { nodeId: node.id, reason: 'my error' })).unwrap();
}
// transition to the desired state
while (node.state !== state) {
const nextState = nodes.validNodeStateTransitions.find((v) => v.from === node.state && v.to !== 'ERROR')?.to;
if (nextState === 'RUNNING') {
node = (await nodes.register(trx, { nodeId: node.id, url: 'http://my-url' })).unwrap();
} else if (nextState && nextState !== 'ERROR') {
node = (await nodes.transitionTo(trx, { nodeId: node.id, newState: nextState })).unwrap();
} else {
throw new Error(`Cannot transition node to state '${state}'`);
}
}
if (lastStateTransitionAt) {
await trx
.from(nodes.NODES_TABLE)
.update({ created_at: lastStateTransitionAt, last_state_transition_at: lastStateTransitionAt })
.where('id', node.id);
node = {
...node,
createdAt: lastStateTransitionAt,
lastStateTransitionAt
};
}
return node;
});
}

async function createNode(db: knex.Knex, { routingId, deploymentId }: { routingId: RoutingId; deploymentId: number }): Promise<Node> {
const node = await nodes.create(db, {
routingId,
deploymentId,
image: 'nangohq/my-image:latest',
cpuMilli: 500,
memoryMb: 1024,
storageMb: 512
});
return node.unwrap();
}
118 changes: 58 additions & 60 deletions packages/fleet/lib/models/nodes.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ import { expect, describe, it, beforeEach, afterEach } from 'vitest';
import * as nodes from './nodes.js';
import * as deployments from './deployments.js';
import { nodeStates } from '../types.js';
import type { NodeState, Node, RoutingId, Deployment } from '../types.js';
import type { NodeState, Deployment } from '../types.js';
import { getTestDbClient } from '../db/helpers.test.js';
import type { knex } from 'knex';
import { nanoid } from '@nangohq/utils';
import { generateCommitHash } from './helpers.test.js';
import { generateCommitHash, createNodeWithAttributes } from './helpers.test.js';

describe('Nodes', () => {
const dbClient = getTestDbClient('nodes');
const db = dbClient.db;

let previousDeployment: Deployment;
let activeDeployment: Deployment;
beforeEach(async () => {
Expand All @@ -28,7 +27,6 @@ describe('Nodes', () => {
await nodes.create(db, {
routingId: 'my-routing-id',
deploymentId: activeDeployment.id,
url: 'http://localhost:3000',
image: 'nangohq/my-image:latest',
cpuMilli: 500,
memoryMb: 1024,
Expand All @@ -39,7 +37,7 @@ describe('Nodes', () => {
id: expect.any(Number),
routingId: 'my-routing-id',
deploymentId: activeDeployment.id,
url: 'http://localhost:3000',
url: null,
state: 'PENDING',
image: 'nangohq/my-image:latest',
cpuMilli: 500,
Expand All @@ -50,68 +48,80 @@ describe('Nodes', () => {
lastStateTransitionAt: expect.any(Date)
});
});

it('should transition between valid states and error when transitioning between invalid states', async () => {
const doTransition = async ({ nodeId, newState }: { nodeId: number; newState: NodeState }) => {
if (newState === 'RUNNING') {
return await nodes.register(db, { nodeId, url: 'http://my-url' });
} else if (newState === 'ERROR') {
return await nodes.fail(db, { nodeId, reason: 'my error' });
} else {
return await nodes.transitionTo(db, { nodeId, newState });
}
};
for (const from of nodeStates) {
for (const to of nodeStates) {
const t = await createNodeWithState(db, { state: from, deploymentId: activeDeployment.id });
const t = await createNodeWithAttributes(db, { state: from, deploymentId: activeDeployment.id });
if (nodes.validNodeStateTransitions.find((v) => v.from === from && v.to === to)) {
// sleep to ensure lastStateTransitionAt is different from the previous state
await new Promise((resolve) => void setTimeout(resolve, 2));
const updated = await nodes.transitionTo(db, { nodeId: t.id, newState: to });
const updated = await doTransition({ nodeId: t.id, newState: to });
expect(updated.unwrap().state).toBe(to);
expect(updated.unwrap().lastStateTransitionAt.getTime()).toBeGreaterThan(t.lastStateTransitionAt.getTime());
} else {
const updated = await nodes.transitionTo(db, { nodeId: t.id, newState: to });
const updated = await doTransition({ nodeId: t.id, newState: to });
expect(updated.isErr(), `transition from ${from} to ${to} failed`).toBe(true);
}
}
}
});

it('should be searchable', async () => {
const route1PendingNode = await createNodeWithState(db, { state: 'PENDING', routingId: '1', deploymentId: activeDeployment.id });
const route1RunningNode = await createNodeWithState(db, {
const route1PendingNode = await createNodeWithAttributes(db, { state: 'PENDING', routingId: '1', deploymentId: activeDeployment.id });
const route1RunningNode = await createNodeWithAttributes(db, {
state: 'RUNNING',
routingId: route1PendingNode.routingId,
deploymentId: previousDeployment.id
});
const startingNode = await createNodeWithState(db, { state: 'STARTING', deploymentId: activeDeployment.id });
const runningNode = await createNodeWithState(db, { state: 'RUNNING', deploymentId: activeDeployment.id });
const outdatedNode = await createNodeWithState(db, { state: 'OUTDATED', deploymentId: activeDeployment.id });
const finishingNode = await createNodeWithState(db, { state: 'FINISHING', deploymentId: activeDeployment.id });
const idleNode = await createNodeWithState(db, { state: 'IDLE', deploymentId: activeDeployment.id });
const terminatedNode = await createNodeWithState(db, { state: 'TERMINATED', deploymentId: activeDeployment.id });
const errorNode = await createNodeWithState(db, { state: 'ERROR', deploymentId: activeDeployment.id });
const startingNode = await createNodeWithAttributes(db, { state: 'STARTING', deploymentId: activeDeployment.id });
const runningNode = await createNodeWithAttributes(db, { state: 'RUNNING', deploymentId: activeDeployment.id });
const outdatedNode = await createNodeWithAttributes(db, { state: 'OUTDATED', deploymentId: activeDeployment.id });
const finishingNode = await createNodeWithAttributes(db, { state: 'FINISHING', deploymentId: activeDeployment.id });
const idleNode = await createNodeWithAttributes(db, { state: 'IDLE', deploymentId: activeDeployment.id });
const terminatedNode = await createNodeWithAttributes(db, { state: 'TERMINATED', deploymentId: activeDeployment.id });
const errorNode = await createNodeWithAttributes(db, { state: 'ERROR', deploymentId: activeDeployment.id });

const searchAllStates = await nodes.search(db, {
states: ['PENDING', 'STARTING', 'RUNNING', 'OUTDATED', 'FINISHING', 'IDLE', 'TERMINATED', 'ERROR']
});
expect(searchAllStates.unwrap().nodes).toEqual(
new Map([
[route1PendingNode.routingId, [route1PendingNode, route1RunningNode]],
[startingNode.routingId, [startingNode]],
[runningNode.routingId, [runningNode]],
[outdatedNode.routingId, [outdatedNode]],
[finishingNode.routingId, [finishingNode]],
[idleNode.routingId, [idleNode]],
[terminatedNode.routingId, [terminatedNode]],
[errorNode.routingId, [errorNode]]
[route1PendingNode.routingId, { PENDING: [route1PendingNode], RUNNING: [route1RunningNode] }],
[startingNode.routingId, { STARTING: [startingNode] }],
[runningNode.routingId, { RUNNING: [runningNode] }],
[outdatedNode.routingId, { OUTDATED: [outdatedNode] }],
[finishingNode.routingId, { FINISHING: [finishingNode] }],
[idleNode.routingId, { IDLE: [idleNode] }],
[terminatedNode.routingId, { TERMINATED: [terminatedNode] }],
[errorNode.routingId, { ERROR: [errorNode] }]
])
);

const searchRunning = await nodes.search(db, { states: ['RUNNING'] });
expect(searchRunning.unwrap().nodes).toEqual(
new Map([
[route1RunningNode.routingId, [route1RunningNode]],
[runningNode.routingId, [runningNode]]
[route1RunningNode.routingId, { RUNNING: [route1RunningNode] }],
[runningNode.routingId, { RUNNING: [runningNode] }]
])
);

const searchWithWrongRoute = await nodes.search(db, { states: ['PENDING'], routingId: terminatedNode.routingId });
expect(searchWithWrongRoute.unwrap().nodes).toEqual(new Map());
});

it('should be searchable (with pagination support)', async () => {
for (let i = 0; i < 12; i++) {
await createNodeWithState(db, { state: 'PENDING', routingId: i.toString(), deploymentId: activeDeployment.id });
await createNodeWithAttributes(db, { state: 'PENDING', routingId: i.toString(), deploymentId: activeDeployment.id });
}
const searchFirstPage = (await nodes.search(db, { states: ['PENDING'], limit: 5 })).unwrap();
expect(searchFirstPage.nodes.size).toBe(5);
Expand All @@ -125,37 +135,25 @@ describe('Nodes', () => {
expect(searchThirdPage.nodes.size).toBe(2);
expect(searchThirdPage.nextCursor).toBe(undefined);
});
});

async function createNodeWithState(
db: knex.Knex,
{ state, deploymentId, routingId = nanoid() }: { state: NodeState; deploymentId: number; routingId?: RoutingId }
): Promise<Node> {
let node = await createNode(db, { routingId, deploymentId });
if (state == 'ERROR') {
return (await nodes.fail(db, { nodeId: node.id, error: 'my error' })).unwrap();
}
// transition to the desired state
while (node.state !== state) {
const nextState = nodes.validNodeStateTransitions.find((v) => v.from === node.state)?.to;
if (nextState) {
node = (await nodes.transitionTo(db, { nodeId: node.id, newState: nextState })).unwrap();
} else {
throw new Error(`Cannot transition node to state '${state}'`);
}
}
return node;
}
it('should be able to fail a node', async () => {
const node = await createNodeWithAttributes(db, { state: 'PENDING', deploymentId: activeDeployment.id });
const failedNode = (await nodes.fail(db, { nodeId: node.id, reason: 'my error' })).unwrap();
expect(failedNode.state).toBe('ERROR');
expect(failedNode.error).toBe('my error');
});

async function createNode(db: knex.Knex, { routingId, deploymentId }: { routingId: RoutingId; deploymentId: number }): Promise<Node> {
const node = await nodes.create(db, {
routingId,
deploymentId,
url: 'http://localhost:1234',
image: 'nangohq/my-image:latest',
cpuMilli: 500,
memoryMb: 1024,
storageMb: 512
it('should be able to register a node', async () => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you have against line break 🤣

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean empty line between the tests? :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes even in general, between logical code blocks but I guess it's keyboard-driven development :p

const node = await createNodeWithAttributes(db, { state: 'STARTING', deploymentId: activeDeployment.id });
expect(node.url).toBe(null);
const registeredNode = (await nodes.register(db, { nodeId: node.id, url: 'http://my-url' })).unwrap();
expect(registeredNode.state).toBe('RUNNING');
expect(registeredNode.url).toBe('http://my-url');
});
return node.unwrap();
}

it('should be able to idle a node', async () => {
const node = await createNodeWithAttributes(db, { state: 'FINISHING', deploymentId: activeDeployment.id });
const idledNode = (await nodes.idle(db, { nodeId: node.id })).unwrap();
expect(idledNode.state).toBe('IDLE');
});
});
Loading
Loading