Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Created sink parallel execution (#624) #917

Merged
merged 5 commits into from
Jun 19, 2022

Conversation

MattSzm
Copy link
Contributor

@MattSzm MattSzm commented Jun 2, 2022

This is a proof of concept. Please validate the idea before I write all methods implementations/tests because for me the issue description can be interpreted in a couple of ways.

@paualarco
Copy link
Member

This is a proof of concept. Please validate the idea before I write all methods implementations/tests because for me the issue description can be interpreted in a couple of ways.

That was the idea @MattSzm, I like it :)

@paualarco
Copy link
Member

Will do a release after once merged, with all these recent mongo improvements

@MattSzm MattSzm marked this pull request as ready for review June 4, 2022 13:25
@MattSzm MattSzm changed the title Created poc of sink parallel execution (#624) Created sink parallel execution (#624) Jun 5, 2022
@MattSzm
Copy link
Contributor Author

MattSzm commented Jun 6, 2022

@paualarco pr ready for review :)

Copy link
Member

@paualarco paualarco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job 🥇

@MattSzm
Copy link
Contributor Author

MattSzm commented Jun 18, 2022

Hi @paualarco I found should update many documents per each received request whereas requests are grouped in lists. Requests from a single group are executed at once in parallel. test being flaky.

[info]   60 was not equal to 40 (MongoSinkSuite.scala:294)

I've added the commit which fixes this. Fortunately, it was a problem with the test - not with the feature.
What happened:

    val e1 = genEmployeesWith(name = Some(name1), n = 10).sample.get
    val e2 = genEmployeesWith(name = Some(name2), age = Some(31), n = 20).sample.get
    val e3 = genEmployeesWith(name = Some(name3), n = 30).sample.get

    val u1 = (Filters.eq("name", name1), Updates.set("name", name3))
    val u2 = (Filters.eq("name", name2), Updates.combine(Updates.set("name", name1), Updates.inc("age", 10)))

In the test case, we run these updates in parallel, so the results depend on the order.

  1. u1 first and u2 second -> we have 40 items with name3 (10 + 30)
  2. u2 first and u1 second -> we change items with name2 to name1 and next we change all items with name1 to name3 which gave us 60 items with name3.

I'm sorry for the mistake. But, we can be sure now, that these calls are made in parallel haha :D

I can see there are still quite a few failing tests, but I don't think these are related to my change.

@paualarco
Copy link
Member

@MattSzm yep there is some flaky test that need fixing... the failures are mainly related to random generators in combination with parallel tests. I will merge your pr after addressing my comment :)

@paualarco paualarco merged commit 4c53530 into monix:master Jun 19, 2022
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants