Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[#32003][prism] Support empty transform input sets, such as for flattens. #32029

Merged
merged 1 commit into from
Jul 30, 2024

Conversation

lostluck
Copy link
Contributor

@lostluck lostluck commented Jul 30, 2024

In very rare cases a transform, such as a flatten can be in a pipeline without any real inputs.

Without inputs, nothing kicked off watermark processing, which then blocked use of the empty flatten output as a side input, which blocked that transform from executing.

This manifested as the upstream stage (the flatten), never getting watermark refreshed, and therefore the downstream stage never being able to execute, since the side input data from the upstream stage was never ready.

The solution in this PR is when a stage is added without inputs, it must be part of the initial watermark refresh set. Further taken to keep this part of the "impulse" set when a TestStream is involved in the Pipeline, since that manipulates initial refreshes.

Additionally adds a line to the stuck pipeline dump to clarify the inputs and output consumers of each stage, to make the progression structure clearer.

Fixes #32003.

In particular allows the Java Validates Runner tests in "org.apache.beam.sdk.transforms.FlattenTest" testEmptyFlattenAsSideInput, testFlattenPCollectionsEmptyThenParDo, and testFlattenPCollectionsEmpty now pass.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@lostluck lostluck added this to the 2.59.0 Release milestone Jul 30, 2024
@damondouglas damondouglas self-requested a review July 30, 2024 23:12
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @jrmccluskey for label go.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

// In very rare cases, we can have a stage without any inputs, such as a flatten.
// In that case, there's nothing that will start the watermark refresh cycle,
// so we must do it here.
if len(inputIDs) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-PR blocking comment to validate my understanding. Do I understand correctly?

In the PTransform with no PCollection case, there are no inputIDs because the result of internal.transformPreparer.PrepareTransform returns an internal.prepareResult where prepResult.SubbedComps.GetPcollections() is empty.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Effectively yes, but basically it's due to flatten being special: all Other Stages must have a single parallel input pCollection. Flattens may have any number, which apparently includes 0.

In this case, by construction, the flatten has no inputs to begin with.

Impulses and TestStream are also special: Impulses are required to kick off SDK side processing. TestStream is just weird.

@damondouglas damondouglas merged commit 88a0102 into apache:master Jul 30, 2024
6 checks passed
reeba212 pushed a commit to reeba212/beam that referenced this pull request Dec 4, 2024
… flattens. (apache#32029)

Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[prism] Support Empty Flattens as Side Input - no way to make progress with pending elements
2 participants