From 698d1427c6eec9850d2526f3444eef3251700d7b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Fri, 21 Jul 2023 09:45:34 -0700 Subject: [PATCH 1/2] grpc-js: Implement pick_first sticky TF and address list shuffling --- .../grpc-js/src/load-balancer-pick-first.ts | 437 ++++++------- packages/grpc-js/src/subchannel-interface.ts | 9 + packages/grpc-js/src/subchannel.ts | 9 +- packages/grpc-js/test/common.ts | 70 +- packages/grpc-js/test/test-pick-first.ts | 603 ++++++++++++++++++ 5 files changed, 882 insertions(+), 246 deletions(-) create mode 100644 packages/grpc-js/test/test-pick-first.ts diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index e91bb3c5a..0805e5fb2 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -31,11 +31,7 @@ import { PickResultType, UnavailablePicker, } from './picker'; -import { - SubchannelAddress, - subchannelAddressEqual, - subchannelAddressToString, -} from './subchannel-address'; +import { SubchannelAddress } from './subchannel-address'; import * as logging from './logging'; import { LogVerbosity } from './constants'; import { @@ -58,21 +54,35 @@ const TYPE_NAME = 'pick_first'; const CONNECTION_DELAY_INTERVAL_MS = 250; export class PickFirstLoadBalancingConfig implements LoadBalancingConfig { + constructor(private readonly shuffleAddressList: boolean) {} + getLoadBalancerName(): string { return TYPE_NAME; } - constructor() {} - toJsonObject(): object { return { - [TYPE_NAME]: {}, + [TYPE_NAME]: { + shuffleAddressList: this.shuffleAddressList, + }, }; } + getShuffleAddressList() { + return this.shuffleAddressList; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any static createFromJson(obj: any) { - return new PickFirstLoadBalancingConfig(); + if ( + 'shuffleAddressList' in obj && + !(typeof obj.shuffleAddressList === 'boolean') + ) { + throw new Error( + 'pick_first config field shuffleAddressList must be a boolean if provided' + ); + } + return new PickFirstLoadBalancingConfig(obj.shuffleAddressList === true); } } @@ -94,24 +104,33 @@ class PickFirstPicker implements Picker { } } -interface ConnectivityStateCounts { - [ConnectivityState.CONNECTING]: number; - [ConnectivityState.IDLE]: number; - [ConnectivityState.READY]: number; - [ConnectivityState.SHUTDOWN]: number; - [ConnectivityState.TRANSIENT_FAILURE]: number; +interface SubchannelChild { + subchannel: SubchannelInterface; + hasReportedTransientFailure: boolean; +} + +/** + * Return a new array with the elements of the input array in a random order + * @param list The input array + * @returns A shuffled array of the elements of list + */ +export function shuffled(list: T[]): T[] { + const result = list.slice(); + for (let i = result.length - 1; i > 1; i--) { + const j = Math.floor(Math.random() * (i + 1)); + const temp = result[i]; + result[i] = result[j]; + result[j] = temp; + } + return result; } export class PickFirstLoadBalancer implements LoadBalancer { - /** - * The list of backend addresses most recently passed to `updateAddressList`. - */ - private latestAddressList: SubchannelAddress[] = []; /** * The list of subchannels this load balancer is currently attempting to * connect to. */ - private subchannels: SubchannelInterface[] = []; + private children: SubchannelChild[] = []; /** * The current connectivity state of the load balancer. */ @@ -121,8 +140,6 @@ export class PickFirstLoadBalancer implements LoadBalancer { * recently started connection attempt. */ private currentSubchannelIndex = 0; - - private subchannelStateCounts: ConnectivityStateCounts; /** * The currently picked subchannel used for making calls. Populated if * and only if the load balancer's current state is READY. In that case, @@ -133,11 +150,13 @@ export class PickFirstLoadBalancer implements LoadBalancer { * Listener callback attached to each subchannel in the `subchannels` list * while establishing a connection. */ - private subchannelStateListener: ConnectivityStateListener; - /** - * Listener callback attached to the current picked subchannel. - */ - private pickedSubchannelStateListener: ConnectivityStateListener; + private subchannelStateListener: ConnectivityStateListener = ( + subchannel, + previousState, + newState + ) => { + this.onSubchannelStateUpdate(subchannel, previousState, newState); + }; /** * Timer reference for the timer tracking when to start */ @@ -145,6 +164,14 @@ export class PickFirstLoadBalancer implements LoadBalancer { private triedAllSubchannels = false; + /** + * The LB policy enters sticky TRANSIENT_FAILURE mode when all + * subchannels have failed to connect at least once, and it stays in that + * mode until a connection attempt is successful. While in sticky TF mode, + * the LB policy continuously attempts to connect to all of its subchannels. + */ + private stickyTransientFailureMode = false; + /** * Load balancer that attempts to connect to each backend in the address list * in order, and picks the first one that connects, using it for every @@ -153,136 +180,88 @@ export class PickFirstLoadBalancer implements LoadBalancer { * this load balancer's owner. */ constructor(private readonly channelControlHelper: ChannelControlHelper) { - this.subchannelStateCounts = { - [ConnectivityState.CONNECTING]: 0, - [ConnectivityState.IDLE]: 0, - [ConnectivityState.READY]: 0, - [ConnectivityState.SHUTDOWN]: 0, - [ConnectivityState.TRANSIENT_FAILURE]: 0, - }; - this.subchannelStateListener = ( - subchannel: SubchannelInterface, - previousState: ConnectivityState, - newState: ConnectivityState - ) => { - this.subchannelStateCounts[previousState] -= 1; - this.subchannelStateCounts[newState] += 1; - /* If the subchannel we most recently attempted to start connecting - * to goes into TRANSIENT_FAILURE, immediately try to start - * connecting to the next one instead of waiting for the connection - * delay timer. */ - if ( - subchannel.getRealSubchannel() === - this.subchannels[this.currentSubchannelIndex].getRealSubchannel() && - newState === ConnectivityState.TRANSIENT_FAILURE - ) { - this.startNextSubchannelConnecting(); - } - if (newState === ConnectivityState.READY) { - this.pickSubchannel(subchannel); - return; + this.connectionDelayTimeout = setTimeout(() => {}, 0); + clearTimeout(this.connectionDelayTimeout); + } + + private allChildrenHaveReportedTF(): boolean { + return this.children.every(child => child.hasReportedTransientFailure); + } + + private calculateAndReportNewState() { + if (this.currentPick) { + this.updateState( + ConnectivityState.READY, + new PickFirstPicker(this.currentPick) + ); + } else if (this.children.length === 0) { + this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); + } else { + if (this.stickyTransientFailureMode) { + this.updateState( + ConnectivityState.TRANSIENT_FAILURE, + new UnavailablePicker() + ); } else { - if ( - this.triedAllSubchannels && - this.subchannelStateCounts[ConnectivityState.IDLE] === - this.subchannels.length - ) { - /* If all of the subchannels are IDLE we should go back to a - * basic IDLE state where there is no subchannel list to avoid - * holding unused resources. We do not reset triedAllSubchannels - * because that is a reminder to request reresolution the next time - * this LB policy needs to connect. */ - this.resetSubchannelList(false); - this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); - return; - } - if (this.currentPick === null) { - if (this.triedAllSubchannels) { - let newLBState: ConnectivityState; - if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) { - newLBState = ConnectivityState.CONNECTING; - } else if ( - this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] > - 0 - ) { - newLBState = ConnectivityState.TRANSIENT_FAILURE; - } else { - newLBState = ConnectivityState.IDLE; - } - if (newLBState !== this.currentState) { - if (newLBState === ConnectivityState.TRANSIENT_FAILURE) { - this.updateState(newLBState, new UnavailablePicker()); - } else { - this.updateState(newLBState, new QueuePicker(this)); - } - } - } else { - this.updateState( - ConnectivityState.CONNECTING, - new QueuePicker(this) - ); - } - } + this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); } - }; - this.pickedSubchannelStateListener = ( - subchannel: SubchannelInterface, - previousState: ConnectivityState, - newState: ConnectivityState - ) => { + } + } + + private maybeEnterStickyTransientFailureMode() { + if (this.stickyTransientFailureMode) { + return; + } + if (!this.allChildrenHaveReportedTF()) { + return; + } + this.stickyTransientFailureMode = true; + this.channelControlHelper.requestReresolution(); + for (const { subchannel } of this.children) { + subchannel.startConnecting(); + } + this.calculateAndReportNewState(); + } + + private onSubchannelStateUpdate( + subchannel: SubchannelInterface, + previousState: ConnectivityState, + newState: ConnectivityState + ) { + if (this.currentPick?.realSubchannelEquals(subchannel)) { if (newState !== ConnectivityState.READY) { this.currentPick = null; - subchannel.unref(); - subchannel.removeConnectivityStateListener( - this.pickedSubchannelStateListener - ); - this.channelControlHelper.removeChannelzChild( - subchannel.getChannelzRef() - ); - if (this.subchannels.length > 0) { - if (this.triedAllSubchannels) { - let newLBState: ConnectivityState; - if (this.subchannelStateCounts[ConnectivityState.CONNECTING] > 0) { - newLBState = ConnectivityState.CONNECTING; - } else if ( - this.subchannelStateCounts[ConnectivityState.TRANSIENT_FAILURE] > - 0 - ) { - newLBState = ConnectivityState.TRANSIENT_FAILURE; - } else { - newLBState = ConnectivityState.IDLE; - } - if (newLBState === ConnectivityState.TRANSIENT_FAILURE) { - this.updateState(newLBState, new UnavailablePicker()); - } else { - this.updateState(newLBState, new QueuePicker(this)); - } - } else { - this.updateState( - ConnectivityState.CONNECTING, - new QueuePicker(this) - ); + this.calculateAndReportNewState(); + this.channelControlHelper.requestReresolution(); + } + return; + } + for (const [index, child] of this.children.entries()) { + if (subchannel.realSubchannelEquals(child.subchannel)) { + if (newState === ConnectivityState.READY) { + this.pickSubchannel(child.subchannel); + } + if (newState === ConnectivityState.TRANSIENT_FAILURE) { + child.hasReportedTransientFailure = true; + this.maybeEnterStickyTransientFailureMode(); + if (index === this.currentSubchannelIndex) { + this.startNextSubchannelConnecting(index + 1); } - } else { - /* We don't need to backoff here because this only happens if a - * subchannel successfully connects then disconnects, so it will not - * create a loop of attempting to connect to an unreachable backend - */ - this.updateState(ConnectivityState.IDLE, new QueuePicker(this)); } + child.subchannel.startConnecting(); + return; } - }; - this.connectionDelayTimeout = setTimeout(() => {}, 0); - clearTimeout(this.connectionDelayTimeout); + } } - private startNextSubchannelConnecting() { - if (this.triedAllSubchannels) { + private startNextSubchannelConnecting(startIndex: number) { + clearTimeout(this.connectionDelayTimeout); + if (this.triedAllSubchannels || this.stickyTransientFailureMode) { return; } - for (const [index, subchannel] of this.subchannels.entries()) { - if (index > this.currentSubchannelIndex) { - const subchannelState = subchannel.getConnectivityState(); + for (const [index, child] of this.children.entries()) { + if (index >= startIndex) { + const subchannelState = child.subchannel.getConnectivityState(); if ( subchannelState === ConnectivityState.IDLE || subchannelState === ConnectivityState.CONNECTING @@ -293,6 +272,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { } } this.triedAllSubchannels = true; + this.maybeEnterStickyTransientFailureMode(); } /** @@ -303,37 +283,43 @@ export class PickFirstLoadBalancer implements LoadBalancer { clearTimeout(this.connectionDelayTimeout); this.currentSubchannelIndex = subchannelIndex; if ( - this.subchannels[subchannelIndex].getConnectivityState() === + this.children[subchannelIndex].subchannel.getConnectivityState() === ConnectivityState.IDLE ) { trace( 'Start connecting to subchannel with address ' + - this.subchannels[subchannelIndex].getAddress() + this.children[subchannelIndex].subchannel.getAddress() ); process.nextTick(() => { - this.subchannels[subchannelIndex].startConnecting(); + this.children[subchannelIndex].subchannel.startConnecting(); }); } this.connectionDelayTimeout = setTimeout(() => { - this.startNextSubchannelConnecting(); - }, CONNECTION_DELAY_INTERVAL_MS); + this.startNextSubchannelConnecting(subchannelIndex + 1); + }, CONNECTION_DELAY_INTERVAL_MS).unref?.(); } private pickSubchannel(subchannel: SubchannelInterface) { + if (subchannel === this.currentPick) { + return; + } trace('Pick subchannel with address ' + subchannel.getAddress()); + this.stickyTransientFailureMode = false; if (this.currentPick !== null) { this.currentPick.unref(); + this.channelControlHelper.removeChannelzChild( + this.currentPick.getChannelzRef() + ); this.currentPick.removeConnectivityStateListener( - this.pickedSubchannelStateListener + this.subchannelStateListener ); } this.currentPick = subchannel; - subchannel.addConnectivityStateListener(this.pickedSubchannelStateListener); subchannel.ref(); this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); this.resetSubchannelList(); clearTimeout(this.connectionDelayTimeout); - this.updateState(ConnectivityState.READY, new PickFirstPicker(subchannel)); + this.calculateAndReportNewState(); } private updateState(newState: ConnectivityState, picker: Picker) { @@ -346,115 +332,80 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.channelControlHelper.updateState(newState, picker); } - private resetSubchannelList(resetTriedAllSubchannels = true) { - for (const subchannel of this.subchannels) { - subchannel.removeConnectivityStateListener(this.subchannelStateListener); - subchannel.unref(); + private resetSubchannelList() { + for (const child of this.children) { + if (child.subchannel !== this.currentPick) { + /* The connectivity state listener is the same whether the subchannel + * is in the list of children or it is the currentPick, so if it is in + * both, removing it here would cause problems. In particular, that + * always happens immediately after the subchannel is picked. */ + child.subchannel.removeConnectivityStateListener( + this.subchannelStateListener + ); + } + /* Refs are counted independently for the children list and the + * currentPick, so we call unref whether or not the child is the + * currentPick. Channelz child references are also refcounted, so + * removeChannelzChild can be handled the same way. */ + child.subchannel.unref(); this.channelControlHelper.removeChannelzChild( - subchannel.getChannelzRef() + child.subchannel.getChannelzRef() ); } this.currentSubchannelIndex = 0; - this.subchannelStateCounts = { - [ConnectivityState.CONNECTING]: 0, - [ConnectivityState.IDLE]: 0, - [ConnectivityState.READY]: 0, - [ConnectivityState.SHUTDOWN]: 0, - [ConnectivityState.TRANSIENT_FAILURE]: 0, - }; - this.subchannels = []; - if (resetTriedAllSubchannels) { - this.triedAllSubchannels = false; - } + this.children = []; + this.triedAllSubchannels = false; } - /** - * Start connecting to the address list most recently passed to - * `updateAddressList`. - */ - private connectToAddressList(): void { - this.resetSubchannelList(); - trace( - 'Connect to address list ' + - this.latestAddressList.map(address => - subchannelAddressToString(address) - ) - ); - this.subchannels = this.latestAddressList.map(address => - this.channelControlHelper.createSubchannel(address, {}) - ); - for (const subchannel of this.subchannels) { + updateAddressList( + addressList: SubchannelAddress[], + lbConfig: LoadBalancingConfig + ): void { + if (!(lbConfig instanceof PickFirstLoadBalancingConfig)) { + return; + } + /* Previously, an update would be discarded if it was identical to the + * previous update, to minimize churn. Now the DNS resolver is + * rate-limited, so that is less of a concern. */ + if (lbConfig.getShuffleAddressList()) { + addressList = shuffled(addressList); + } + const newChildrenList = addressList.map(address => ({ + subchannel: this.channelControlHelper.createSubchannel(address, {}), + hasReportedTransientFailure: false, + })); + /* Ref each subchannel before resetting the list, to ensure that + * subchannels shared between the list don't drop to 0 refs during the + * transition. */ + for (const { subchannel } of newChildrenList) { subchannel.ref(); this.channelControlHelper.addChannelzChild(subchannel.getChannelzRef()); } - for (const subchannel of this.subchannels) { + this.resetSubchannelList(); + this.children = newChildrenList; + for (const { subchannel } of this.children) { subchannel.addConnectivityStateListener(this.subchannelStateListener); - this.subchannelStateCounts[subchannel.getConnectivityState()] += 1; if (subchannel.getConnectivityState() === ConnectivityState.READY) { this.pickSubchannel(subchannel); - this.resetSubchannelList(); return; } } - for (const [index, subchannel] of this.subchannels.entries()) { - const subchannelState = subchannel.getConnectivityState(); + for (const child of this.children) { if ( - subchannelState === ConnectivityState.IDLE || - subchannelState === ConnectivityState.CONNECTING + child.subchannel.getConnectivityState() === + ConnectivityState.TRANSIENT_FAILURE ) { - this.startConnecting(index); - if (this.currentPick === null) { - this.updateState(ConnectivityState.CONNECTING, new QueuePicker(this)); - } - return; + child.hasReportedTransientFailure = true; } } - // If the code reaches this point, every subchannel must be in TRANSIENT_FAILURE - if (this.currentPick === null) { - this.updateState( - ConnectivityState.TRANSIENT_FAILURE, - new UnavailablePicker() - ); - } - } - - updateAddressList( - addressList: SubchannelAddress[], - lbConfig: LoadBalancingConfig - ): void { - // lbConfig has no useful information for pick first load balancing - /* To avoid unnecessary churn, we only do something with this address list - * if we're not currently trying to establish a connection, or if the new - * address list is different from the existing one */ - if ( - this.subchannels.length === 0 || - this.latestAddressList.length !== addressList.length || - !this.latestAddressList.every( - (value, index) => - addressList[index] && - subchannelAddressEqual(addressList[index], value) - ) - ) { - this.latestAddressList = addressList; - this.connectToAddressList(); - } + this.startNextSubchannelConnecting(0); + this.calculateAndReportNewState(); } exitIdle() { - if ( - this.currentState === ConnectivityState.IDLE || - this.triedAllSubchannels - ) { - this.channelControlHelper.requestReresolution(); - } - for (const subchannel of this.subchannels) { - subchannel.startConnecting(); - } - if (this.currentState === ConnectivityState.IDLE) { - if (this.latestAddressList.length > 0) { - this.connectToAddressList(); - } - } + /* The pick_first LB policy is only in the IDLE state if it has no + * addresses to try to connect to and it has no picked subchannel. + * In that case, there is no meaningful action that can be taken here. */ } resetBackoff() { @@ -470,9 +421,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { * does not impact this function. */ const currentPick = this.currentPick; currentPick.unref(); - currentPick.removeConnectivityStateListener( - this.pickedSubchannelStateListener - ); + currentPick.removeConnectivityStateListener(this.subchannelStateListener); this.channelControlHelper.removeChannelzChild( currentPick.getChannelzRef() ); diff --git a/packages/grpc-js/src/subchannel-interface.ts b/packages/grpc-js/src/subchannel-interface.ts index 557d62870..9b947ad32 100644 --- a/packages/grpc-js/src/subchannel-interface.ts +++ b/packages/grpc-js/src/subchannel-interface.ts @@ -49,6 +49,12 @@ export interface SubchannelInterface { * If this is a wrapper, return the wrapped subchannel, otherwise return this */ getRealSubchannel(): Subchannel; + /** + * Returns true if this and other both proxy the same underlying subchannel. + * Can be used instead of directly accessing getRealSubchannel to allow mocks + * to avoid implementing getRealSubchannel + */ + realSubchannelEquals(other: SubchannelInterface): boolean; } export abstract class BaseSubchannelWrapper implements SubchannelInterface { @@ -84,4 +90,7 @@ export abstract class BaseSubchannelWrapper implements SubchannelInterface { getRealSubchannel(): Subchannel { return this.child.getRealSubchannel(); } + realSubchannelEquals(other: SubchannelInterface): boolean { + return this.getRealSubchannel() === other.getRealSubchannel(); + } } diff --git a/packages/grpc-js/src/subchannel.ts b/packages/grpc-js/src/subchannel.ts index 480314e4a..6fad9500a 100644 --- a/packages/grpc-js/src/subchannel.ts +++ b/packages/grpc-js/src/subchannel.ts @@ -36,7 +36,10 @@ import { ChannelzCallTracker, unregisterChannelzRef, } from './channelz'; -import { ConnectivityStateListener } from './subchannel-interface'; +import { + ConnectivityStateListener, + SubchannelInterface, +} from './subchannel-interface'; import { SubchannelCallInterceptingListener } from './subchannel-call'; import { SubchannelCall } from './subchannel-call'; import { CallEventTracker, SubchannelConnector, Transport } from './transport'; @@ -462,6 +465,10 @@ export class Subchannel { return this; } + realSubchannelEquals(other: SubchannelInterface): boolean { + return other.getRealSubchannel() === this; + } + throttleKeepalive(newKeepaliveTime: number) { if (newKeepaliveTime > this.keepaliveTime) { this.keepaliveTime = newKeepaliveTime; diff --git a/packages/grpc-js/test/common.ts b/packages/grpc-js/test/common.ts index 26192d3eb..d15a9d5ed 100644 --- a/packages/grpc-js/test/common.ts +++ b/packages/grpc-js/test/common.ts @@ -27,6 +27,10 @@ import { loadPackageDefinition, } from '../src/make-client'; import { readFileSync } from 'fs'; +import { SubchannelInterface } from '../src/subchannel-interface'; +import { SubchannelRef } from '../src/channelz'; +import { Subchannel } from '../src/subchannel'; +import { ConnectivityState } from '../src/connectivity-state'; const protoLoaderOptions = { keepCase: true, @@ -119,7 +123,7 @@ export class TestClient { this.client.waitForReady(deadline, callback); } - sendRequest(callback: (error: grpc.ServiceError) => void) { + sendRequest(callback: (error?: grpc.ServiceError) => void) { this.client.echo({}, callback); } @@ -132,4 +136,68 @@ export class TestClient { } } +/** + * A mock subchannel that transitions between states on command, to test LB + * policy behavior + */ +export class MockSubchannel implements SubchannelInterface { + private state: grpc.connectivityState; + private listeners: Set = + new Set(); + constructor( + private readonly address: string, + initialState: grpc.connectivityState = grpc.connectivityState.IDLE + ) { + this.state = initialState; + } + getConnectivityState(): grpc.connectivityState { + return this.state; + } + addConnectivityStateListener( + listener: grpc.experimental.ConnectivityStateListener + ): void { + this.listeners.add(listener); + } + removeConnectivityStateListener( + listener: grpc.experimental.ConnectivityStateListener + ): void { + this.listeners.delete(listener); + } + transitionToState(nextState: grpc.connectivityState) { + grpc.experimental.trace( + grpc.logVerbosity.DEBUG, + 'subchannel', + this.address + + ' ' + + ConnectivityState[this.state] + + ' -> ' + + ConnectivityState[nextState] + ); + for (const listener of this.listeners) { + listener(this, this.state, nextState, 0); + } + this.state = nextState; + } + startConnecting(): void {} + getAddress(): string { + return this.address; + } + throttleKeepalive(newKeepaliveTime: number): void {} + ref(): void {} + unref(): void {} + getChannelzRef(): SubchannelRef { + return { + kind: 'subchannel', + id: -1, + name: this.address, + }; + } + getRealSubchannel(): Subchannel { + throw new Error('Method not implemented.'); + } + realSubchannelEquals(other: grpc.experimental.SubchannelInterface): boolean { + return this === other; + } +} + export { assert2 }; diff --git a/packages/grpc-js/test/test-pick-first.ts b/packages/grpc-js/test/test-pick-first.ts new file mode 100644 index 000000000..e9e9e5601 --- /dev/null +++ b/packages/grpc-js/test/test-pick-first.ts @@ -0,0 +1,603 @@ +/* + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import * as assert from 'assert'; + +import { ConnectivityState } from '../src/connectivity-state'; +import { + ChannelControlHelper, + createChildChannelControlHelper, +} from '../src/load-balancer'; +import { + PickFirstLoadBalancer, + PickFirstLoadBalancingConfig, + shuffled, +} from '../src/load-balancer-pick-first'; +import { Metadata } from '../src/metadata'; +import { Picker } from '../src/picker'; +import { + SubchannelAddress, + subchannelAddressToString, +} from '../src/subchannel-address'; +import { MockSubchannel, TestClient, TestServer } from './common'; + +function updateStateCallBackForExpectedStateSequence( + expectedStateSequence: ConnectivityState[], + done: Mocha.Done +) { + const actualStateSequence: ConnectivityState[] = []; + let lastPicker: Picker | null = null; + return (connectivityState: ConnectivityState, picker: Picker) => { + // Ignore duplicate state transitions + if ( + connectivityState === actualStateSequence[actualStateSequence.length - 1] + ) { + // Ignore READY duplicate state transitions if the picked subchannel is the same + if ( + connectivityState !== ConnectivityState.READY || + lastPicker?.pick({ extraPickInfo: {}, metadata: new Metadata() }) + ?.subchannel === + picker.pick({ extraPickInfo: {}, metadata: new Metadata() }) + .subchannel + ) { + return; + } + } + if ( + expectedStateSequence[actualStateSequence.length] !== connectivityState + ) { + done( + new Error( + `Unexpected state ${ + ConnectivityState[connectivityState] + } after [${actualStateSequence.map( + value => ConnectivityState[value] + )}]` + ) + ); + } + actualStateSequence.push(connectivityState); + lastPicker = picker; + if (actualStateSequence.length === expectedStateSequence.length) { + done(); + } + }; +} + +describe('Shuffler', () => { + it('Should maintain the multiset of elements from the original array', () => { + const originalArray = [1, 2, 2, 3, 3, 3, 4, 4, 5]; + for (let i = 0; i < 100; i++) { + assert.deepStrictEqual( + shuffled(originalArray).sort((a, b) => a - b), + originalArray + ); + } + }); +}); + +describe('pick_first load balancing policy', () => { + const config = new PickFirstLoadBalancingConfig(false); + let subchannels: MockSubchannel[] = []; + const baseChannelControlHelper: ChannelControlHelper = { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress) + ); + subchannels.push(subchannel); + return subchannel; + }, + addChannelzChild: () => {}, + removeChannelzChild: () => {}, + requestReresolution: () => {}, + updateState: () => {}, + }; + beforeEach(() => { + subchannels = []; + }); + it('Should report READY when a subchannel connects', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.CONNECTING, ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.READY); + }); + }); + it('Should report READY when updated with a subchannel that is already READY', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + ConnectivityState.READY + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + }); + it('Should stay CONNECTING if only some subchannels fail to connect', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.CONNECTING], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + config + ); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + }); + }); + it('Should enter TRANSIENT_FAILURE when subchannels fail to connect', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + config + ); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + }); + process.nextTick(() => { + subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + }); + }); + it('Should stay in TRANSIENT_FAILURE if subchannels go back to CONNECTING', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.CONNECTING, ConnectivityState.TRANSIENT_FAILURE], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + config + ); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + process.nextTick(() => { + subchannels[1].transitionToState(ConnectivityState.TRANSIENT_FAILURE); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.CONNECTING); + process.nextTick(() => { + subchannels[1].transitionToState(ConnectivityState.CONNECTING); + }); + }); + }); + }); + }); + it('Should immediately enter TRANSIENT_FAILURE if subchannels start in TRANSIENT_FAILURE', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + ConnectivityState.TRANSIENT_FAILURE + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.TRANSIENT_FAILURE], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + config + ); + }); + it('Should enter READY if a subchannel connects after entering TRANSIENT_FAILURE mode', done => { + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + ConnectivityState.TRANSIENT_FAILURE + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + config + ); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.READY); + }); + }); + it('Should stay in TRANSIENT_FAILURE after an address update with non-READY subchannels', done => { + let currentStartState = ConnectivityState.TRANSIENT_FAILURE; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.TRANSIENT_FAILURE], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + config + ); + process.nextTick(() => { + currentStartState = ConnectivityState.CONNECTING; + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 3 }, + { host: 'localhost', port: 4 }, + ], + config + ); + }); + }); + it('Should transition from TRANSIENT_FAILURE to READY after an address update with a READY subchannel', done => { + let currentStartState = ConnectivityState.TRANSIENT_FAILURE; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.TRANSIENT_FAILURE, ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList( + [ + { host: 'localhost', port: 1 }, + { host: 'localhost', port: 2 }, + ], + config + ); + process.nextTick(() => { + currentStartState = ConnectivityState.READY; + pickFirst.updateAddressList([{ host: 'localhost', port: 3 }], config); + }); + }); + it('Should transition from READY to IDLE if the connected subchannel disconnects', done => { + const currentStartState = ConnectivityState.READY; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.READY, ConnectivityState.IDLE], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.IDLE); + }); + }); + it('Should transition from READY to CONNECTING if the connected subchannel disconnects after an update', done => { + let currentStartState = ConnectivityState.READY; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.READY, ConnectivityState.CONNECTING], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + process.nextTick(() => { + currentStartState = ConnectivityState.IDLE; + pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.IDLE); + }); + }); + }); + it('Should transition from READY to TRANSIENT_FAILURE if the connected subchannel disconnects and the update fails', done => { + let currentStartState = ConnectivityState.READY; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.READY, ConnectivityState.TRANSIENT_FAILURE], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + process.nextTick(() => { + currentStartState = ConnectivityState.TRANSIENT_FAILURE; + pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.IDLE); + }); + }); + }); + it('Should transition from READY to READY if a subchannel is connected and an update has a connected subchannel', done => { + const currentStartState = ConnectivityState.READY; + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + currentStartState + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: updateStateCallBackForExpectedStateSequence( + [ConnectivityState.READY, ConnectivityState.READY], + done + ), + } + ); + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList([{ host: 'localhost', port: 1 }], config); + process.nextTick(() => { + pickFirst.updateAddressList([{ host: 'localhost', port: 2 }], config); + process.nextTick(() => { + subchannels[0].transitionToState(ConnectivityState.IDLE); + }); + }); + }); + describe('Address list randomization', () => { + const shuffleConfig = new PickFirstLoadBalancingConfig(true); + it('Should pick different subchannels after multiple updates', done => { + const pickedSubchannels: Set = new Set(); + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + ConnectivityState.READY + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: (connectivityState, picker) => { + if (connectivityState === ConnectivityState.READY) { + const pickedSubchannel = picker.pick({ + extraPickInfo: {}, + metadata: new Metadata(), + }).subchannel; + if (pickedSubchannel) { + pickedSubchannels.add(pickedSubchannel.getAddress()); + } + } + }, + } + ); + const addresses: SubchannelAddress[] = []; + for (let i = 0; i < 10; i++) { + addresses.push({ host: 'localhost', port: i + 1 }); + } + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + /* Pick from 10 subchannels 5 times, with address randomization enabled, + * and verify that at least two different subchannels are picked. The + * probability choosing the same address every time is 1/10,000, which + * I am considering an acceptable flake rate */ + pickFirst.updateAddressList(addresses, shuffleConfig); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, shuffleConfig); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, shuffleConfig); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, shuffleConfig); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, shuffleConfig); + process.nextTick(() => { + assert(pickedSubchannels.size > 1); + done(); + }); + }); + }); + }); + }); + }); + it('Should pick the same subchannel if address randomization is disabled', done => { + /* This is the same test as the previous one, except using the config + * that does not enable address randomization. In this case, false + * positive probability is 1/10,000. */ + const pickedSubchannels: Set = new Set(); + const channelControlHelper = createChildChannelControlHelper( + baseChannelControlHelper, + { + createSubchannel: (subchannelAddress, subchannelArgs) => { + const subchannel = new MockSubchannel( + subchannelAddressToString(subchannelAddress), + ConnectivityState.READY + ); + subchannels.push(subchannel); + return subchannel; + }, + updateState: (connectivityState, picker) => { + if (connectivityState === ConnectivityState.READY) { + const pickedSubchannel = picker.pick({ + extraPickInfo: {}, + metadata: new Metadata(), + }).subchannel; + if (pickedSubchannel) { + pickedSubchannels.add(pickedSubchannel.getAddress()); + } + } + }, + } + ); + const addresses: SubchannelAddress[] = []; + for (let i = 0; i < 10; i++) { + addresses.push({ host: 'localhost', port: i + 1 }); + } + const pickFirst = new PickFirstLoadBalancer(channelControlHelper); + pickFirst.updateAddressList(addresses, config); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, config); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, config); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, config); + process.nextTick(() => { + pickFirst.updateAddressList(addresses, config); + process.nextTick(() => { + assert(pickedSubchannels.size === 1); + done(); + }); + }); + }); + }); + }); + }); + describe('End-to-end functionality', () => { + const serviceConfig = { + methodConfig: [], + loadBalancingConfig: [ + { + pick_first: { + shuffleAddressList: true, + }, + }, + ], + }; + let server: TestServer; + let client: TestClient; + before(async () => { + server = new TestServer(false); + await server.start(); + client = new TestClient(server.port!, false, { + 'grpc.service_config': JSON.stringify(serviceConfig), + }); + }); + after(() => { + client.close(); + server.shutdown(); + }); + it('Should still work with shuffleAddressList set', done => { + client.sendRequest(error => { + done(error); + }); + }); + }); + }); +}); From 66cd8519bd05097faa20a2deb72309c9431b0f4b Mon Sep 17 00:00:00 2001 From: Michael Lumish Date: Mon, 24 Jul 2023 16:00:13 -0700 Subject: [PATCH 2/2] grpc-js: pick_first: Properly dispose of current pick when it disconnects --- .../grpc-js/src/load-balancer-pick-first.ts | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/packages/grpc-js/src/load-balancer-pick-first.ts b/packages/grpc-js/src/load-balancer-pick-first.ts index 0805e5fb2..08971980b 100644 --- a/packages/grpc-js/src/load-balancer-pick-first.ts +++ b/packages/grpc-js/src/load-balancer-pick-first.ts @@ -223,6 +223,21 @@ export class PickFirstLoadBalancer implements LoadBalancer { this.calculateAndReportNewState(); } + private removeCurrentPick() { + if (this.currentPick !== null) { + /* Unref can cause a state change, which can cause a change in the value + * of this.currentPick, so we hold a local reference to make sure that + * does not impact this function. */ + const currentPick = this.currentPick; + this.currentPick = null; + currentPick.unref(); + currentPick.removeConnectivityStateListener(this.subchannelStateListener); + this.channelControlHelper.removeChannelzChild( + currentPick.getChannelzRef() + ); + } + } + private onSubchannelStateUpdate( subchannel: SubchannelInterface, previousState: ConnectivityState, @@ -230,7 +245,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { ) { if (this.currentPick?.realSubchannelEquals(subchannel)) { if (newState !== ConnectivityState.READY) { - this.currentPick = null; + this.removeCurrentPick(); this.calculateAndReportNewState(); this.channelControlHelper.requestReresolution(); } @@ -415,17 +430,7 @@ export class PickFirstLoadBalancer implements LoadBalancer { destroy() { this.resetSubchannelList(); - if (this.currentPick !== null) { - /* Unref can cause a state change, which can cause a change in the value - * of this.currentPick, so we hold a local reference to make sure that - * does not impact this function. */ - const currentPick = this.currentPick; - currentPick.unref(); - currentPick.removeConnectivityStateListener(this.subchannelStateListener); - this.channelControlHelper.removeChannelzChild( - currentPick.getChannelzRef() - ); - } + this.removeCurrentPick(); } getTypeName(): string {