Skip to content

Commit

Permalink
Also clone and freeze the object returned by workflowInfo()
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh committed Sep 26, 2023
1 parent 4f3888c commit 30a4439
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 39 deletions.
19 changes: 19 additions & 0 deletions packages/activity/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,27 @@ export const sleep = Context.current().sleep;
export const heartbeat = Context.current().heartbeat;

/**
* A Promise that fails with a {@link CancelledFailure} when cancellation of this activity is requested. The promise
* is guaranteed to never successfully resolve. Await this promise in an Activity to get notified of cancellation.
*
* Note that to get notified of cancellation, an activity must _also_ do {@link Context.heartbeat}.
*
* This is a shortcut for `Context.current().cancelled` (see {@link Context.cancelled}).
*/
export const cancelled = Context.current().cancelled;

/**
* An {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to react to
* Activity cancellation.
*
* This can be passed in to libraries such as
* {@link https://www.npmjs.com/package/node-fetch#request-cancellation-with-abortsignal | fetch} to abort an
* in-progress request and
* {@link https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options child_process}
* to abort a child process, as well as other built-in node modules and modules found on npm.
*
* Note that to get notified of cancellation, an activity must _also_ do {@link Context.heartbeat}.
*
* This is a shortcut for `Context.current().cancellationSignal` (see {@link Context.cancellationSignal}).
*/
export const cancellationSignal = Context.current().cancellationSignal;
23 changes: 0 additions & 23 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,3 @@ export function errorMessage(error: unknown): string | undefined {
export function errorCode(error: unknown): string | undefined {
return helpers.errorCode(error);
}

// Thanks MDN: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
export function deepFreeze<T>(object: T): T {
// Retrieve the property names defined on object
const propNames = Object.getOwnPropertyNames(object);

// Freeze properties before freezing self
for (const name of propNames) {
const value = (object as any)[name];

if (value && typeof value === 'object') {
try {
deepFreeze(value);
} catch (err) {
// This is okay, there are some typed arrays that cannot be frozen (encodingKeys)
}
} else if (typeof value === 'function') {
Object.freeze(value);
}
}

return Object.freeze(object);
}
51 changes: 51 additions & 0 deletions packages/common/src/type-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,54 @@ export function SymbolBasedInstanceOfError<E extends Error>(markerName: string):
});
};
}

export type DeepReadonly<T> = T extends (infer R)[]
? DeepReadonly<R>[]
: T extends object
? { readonly [P in keyof T]: DeepReadonly<T[P]> }
: T;

// Thanks MDN: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
export function deepFreeze<T>(object: T): DeepReadonly<T> {
// Retrieve the property names defined on object
const propNames = Object.getOwnPropertyNames(object);

// Freeze properties before freezing self
for (const name of propNames) {
const value = (object as any)[name];

if (value && typeof value === 'object') {
try {
deepFreeze(value);
} catch (err) {
// This is okay, there are some typed arrays that cannot be frozen (encodingKeys)
}
} else if (typeof value === 'function') {
Object.freeze(value);
}
}

return Object.freeze(object) as DeepReadonly<T>;
}

export function deepClone<T>(object: T): T {
if ((globalThis as any).structuredClone !== undefined) {
return globalThis.structuredClone(object);
} else {
return deepClonePolyfill(object);
}
}

function deepClonePolyfill<T>(object: T): T {
if (object == null || typeof object !== 'object') {
return object;
}

if (Array.isArray(object)) {
return object.map((item) => deepClone(item)) as T;
}

const clone: any = Object.assign({}, object);
Object.keys(clone).forEach((key) => (clone[key] = deepClone(clone[key])));
return clone as T;
}
3 changes: 3 additions & 0 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { WorkflowCreateOptionsInternal, WorkflowInfo } from './interfaces';
import { Activator, getActivator } from './internals';
import { SinkCall } from './sinks';
import { setActivatorUntyped } from './global-attributes';
import { clearWorkflowInfoCache } from './workflow';

// Export the type for use on the "worker" side
export { PromiseStackStore } from './internals';
Expand Down Expand Up @@ -194,6 +195,8 @@ export function activate(activation: coresdk.workflow_activation.WorkflowActivat
// historySize === 0 means WFT was generated by pre-1.20.0 server, and the history size is unknown
activator.info.historySize = activation.historySizeBytes?.toNumber() || 0;
activator.info.continueAsNewSuggested = activation.continueAsNewSuggested ?? false;

clearWorkflowInfoCache();
}

// Cast from the interface to the class which has the `variant` attribute.
Expand Down
47 changes: 31 additions & 16 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
} from '@temporalio/common';
import { versioningIntentToProto } from '@temporalio/common/lib/versioning-intent-enum';
import { Duration, msOptionalToTs, msToNumber, msToTs, tsToMs } from '@temporalio/common/lib/time';
import { deepFreeze, deepClone, DeepReadonly } from '@temporalio/common/src/type-helpers';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { CancellationScope, registerSleepImplementation } from './cancellation-scope';
import {
Expand Down Expand Up @@ -198,7 +199,7 @@ async function scheduleLocalActivityNextHandler({
const activator = getActivator();
// Eagerly fail the local activity (which will in turn fail the workflow task.
// Do not fail on replay where the local activities may not be registered on the replay worker.
if (!workflowInfo().unsafe.isReplaying && !activator.registeredActivityNames.has(activityType)) {
if (!activator.info.unsafe.isReplaying && !activator.registeredActivityNames.has(activityType)) {
throw new ReferenceError(`Local activity of type '${activityType}' not registered on worker`);
}
validateLocalActivityOptions(options);
Expand Down Expand Up @@ -362,7 +363,7 @@ function startChildWorkflowExecutionNextHandler({
workflowExecutionTimeout: msOptionalToTs(options.workflowExecutionTimeout),
workflowRunTimeout: msOptionalToTs(options.workflowRunTimeout),
workflowTaskTimeout: msOptionalToTs(options.workflowTaskTimeout),
namespace: workflowInfo().namespace, // Not configurable
namespace: activator.info?.namespace, // Not configurable
headers,
cancellationType: options.cancellationType,
workflowIdReusePolicy: options.workflowIdReusePolicy,
Expand Down Expand Up @@ -827,12 +828,16 @@ export async function executeChild<T extends Workflow>(
return completedPromise as Promise<any>;
}

let immutableWorkflowInfo: DeepReadonly<WorkflowInfo> | undefined = undefined;

/**
* Get information about the current Workflow.
*
* ⚠️ We recommend calling `workflowInfo()` whenever accessing {@link WorkflowInfo} fields. Some WorkflowInfo fields
* change during the lifetime of an Execution—like {@link WorkflowInfo.historyLength} and
* {@link WorkflowInfo.searchAttributes}—and some may be changeable in the future—like {@link WorkflowInfo.taskQueue}.
* WARNING: This function returns a frozen copy of WorkflowInfo, at the point where this method has been called.
* Changes happening at later point in workflow execution will not be reflected in the returned object.
*
* For this reason, we recommend calling `workflowInfo()` on every access to {@link WorkflowInfo}'s fields,
* rather than caching the `WorkflowInfo` object (or part of it) in a local variable. For example:
*
* ```ts
* // GOOD
Expand All @@ -843,6 +848,8 @@ export async function executeChild<T extends Workflow>(
* }
* ```
*
* vs
*
* ```ts
* // BAD
* function myWorkflow() {
Expand All @@ -851,10 +858,20 @@ export async function executeChild<T extends Workflow>(
* ...
* doSomethingElse(attributes)
* }
* ```
*/
export function workflowInfo(): WorkflowInfo {
const activator = assertInWorkflowContext('Workflow.workflowInfo(...) may only be used from a Workflow Execution.');
return activator.info;
if (immutableWorkflowInfo === undefined) {
immutableWorkflowInfo = deepFreeze(deepClone(activator.info));
}
// The return value should really be DeepReadonly<WorkflowInfo>, but that would likely cause lot type errors in
// customer code. For that reason, we silently forget the Readonly modifier here; the object is frozen anyway.
return immutableWorkflowInfo as WorkflowInfo;
}

export function clearWorkflowInfoCache(): void {
immutableWorkflowInfo = undefined;
}

/**
Expand Down Expand Up @@ -907,19 +924,16 @@ export function proxySinks<T extends Sinks>(): T {
const activator = assertInWorkflowContext(
'Proxied sinks functions may only be used from a Workflow Execution.'
);
const info = workflowInfo();
activator.sinkCalls.push({
ifaceName: ifaceName as string,
fnName: fnName as string,
// Only available from node 17.
args: (globalThis as any).structuredClone ? (globalThis as any).structuredClone(args) : args,
// Clone the workflowInfo object so that any further mutations to it does not get reflected in sink
workflowInfo: {
...info,
// Make sure to clone any sub-property that may get mutated during the lifespan of an activation
searchAttributes: { ...info.searchAttributes },
memo: info.memo ? { ...info.memo } : undefined,
},
// Sink function don't get called immediately. Make a clone of sink's args, so that further mutations
// to these objects don't corrupt the args that the sink function will receive.
args: deepClone(args),
// workflowInfo() internally makes an immutable deep clone. This ensure that any further mutations
// to the workflow state in the context of the present activation will not corrupt the workflowInfo
// state that gets passed when the sink function actually gets called.
workflowInfo: workflowInfo(),
});
};
},
Expand Down Expand Up @@ -1283,6 +1297,7 @@ export function upsertSearchAttributes(searchAttributes: SearchAttributes): void
});

activator.info.searchAttributes = mergedSearchAttributes;
clearWorkflowInfoCache();
}

export const stackTraceQuery = defineQuery<string>('__stack_trace');
Expand Down

0 comments on commit 30a4439

Please # to comment.