Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

NIFI-14238 - Add CaptureBoxEvents processor #9695

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

pvillard31
Copy link
Contributor

@pvillard31 pvillard31 commented Feb 5, 2025

Summary

NIFI-14238 - Add CaptureBoxEvents processor

This pull request adds a new processor that will consume all Box events and output the content of the events as an array of JSON records. In addition to this processor, the pull request also adds another controller service to connect to the Box API using the developer token option. This option is used only for testing as this token expires after 60 minutes but it allows testing the Box components in NiFi without the need for a Business/Enterprise account.

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using mvn clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting together this new Processor @pvillard31. The Box API seems fairly straightforward. I noted several recommendations on naming. On the general approach, I'm concerned about creating a new array of JSON objects in memory, instead of streaming the currently queued events.

Comment on lines 186 to 189
final JsonArray jsonEvents = Json.array();
for (BoxEvent event : eventList) {
jsonEvents.add(toRecord(event));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach ends up creating a new object for every BoxEvent, which has some memory impacts. Instead of this approach, it would be much more efficient to use a streaming JSON writer to add each event to the output stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good point!


@Override
public boolean onException(Throwable e) {
getLogger().warn("An error has been received from the stream.", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend tracking the position so that it could be logged.

Suggested change
getLogger().warn("An error has been received from the stream.", e);
getLogger().warn("Event stream processing failed at position {}", position, e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have the position when an exception is thrown. We have it in onNextPosition and it's already logged there. Or am I missing something?

eventStream = new EventStream(boxAPIConnection);

final int eventsCapacity = context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger();
if (events == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the purpose of the null check to avoid clearing any queued events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I figured that in case we stop the processor, we may not want to clear the queued events. Should I instead clear the queue and just drop what we already consumed? In any case we would have ignored the events while the processor was stopped...

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants