Skip to content

Improve Parallel Reading (CSV, JSON) / Help Wanted #8723

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
marvinlanhenke opened this issue Jan 2, 2024 · 6 comments
Open

Improve Parallel Reading (CSV, JSON) / Help Wanted #8723

marvinlanhenke opened this issue Jan 2, 2024 · 6 comments
Labels
enhancement New feature or request

Comments

@marvinlanhenke
Copy link
Contributor

marvinlanhenke commented Jan 2, 2024

Is your feature request related to a problem or challenge?

As originally stated in #6922 (I'm not sure why the issue was closed) and discussed in #6801 the current FileOpener implementation for both, Csv and Json, are utilizing multiple GetRequests to adjust the byte range prior to parsing / reading the file itself.

This is suboptimal and can be improved - minimizing the latency due to multiple remote network requests.

Describe the solution you'd like

I would like to reduce the number of GetRequests from 3 to 1.

This can be done by "overfetching" the original partition byte range; and then adjust the range by finding the newline delimiter similar to the solution already implemented.

The approach is outlined here: #6801 (comment) by @alamb

There are some edge-cases that need consideration, like "heterogenous object sizes" within a CSV row or JSON object, that leads to partition ranges overlapping on the same line, which can lead to reading the same line twice. Error handling/ retry when no newline can be found ("overfetching" range was to small) has to be handled, as well.

POC:

I already went ahead and implemented a POC which works and can handle some edge-cases like overlapping partition ranges; appropriate error handling / retry is still missing.

However, I definitely need help to improve upon this: https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381

The solution is inefficient due to line-by-line operations and buffer cloning / copying.
I tried different ways to handle the GetResultPayload::Stream by using BytesMut::new() & buffer.extend_from_slice; but I was not able to handle all the edge-cases correctly.

I'd greatly appreciate if someone can give some pointers; or take it from here to improve upon the POC.

Describe alternatives you've considered

Leave as is.

Additional context

None.

@marvinlanhenke marvinlanhenke added the enhancement New feature or request label Jan 2, 2024
@marvinlanhenke
Copy link
Contributor Author

cc @alamb : as discussed in #6801

...happy if you have any pointers on how to improve the POC.

@marvinlanhenke marvinlanhenke changed the title Improve Parallel Reading (CSV, JSON) Improve Parallel Reading (CSV, JSON) / Help Wanted Jan 2, 2024
@alamb
Copy link
Contributor

alamb commented Jan 3, 2024

Thanks for the heads up @marvinlanhenke -- I'll check this out over the next day or two. Much appreciated

cc @devinjdangelo and @tustvold if you are interested

@alamb
Copy link
Contributor

alamb commented Jan 5, 2024

I didn't have a chance to review https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381 in fine detail, but in general I think that code is looking very hard to navagate / test as it is so deeply nested in futures / closures.

My suggestion is to try and extract the logic somehow into a structure that is easier to test and reason about.

For example, maybe you could create a struct like

struct StreamingJsonReaderBuilder {
 ...
}

impl StreamingJsonReaderBuilder {
  fn new(..) {}
  fn with_range(mut self, range: ..) ...
  fn build () -> SendableRecordBatchStream
}

And the you could start writing tests like

let object_store = ...;
let input = StreamingJsonReaderBuilder::new(object_store.read())
  .with_range(Range::new(100, 200))
  .build;

let batches = collect(input)
assert_batches_eq(..)

That might not be the right structure, but I am trying to give you the flavor of what encapsulating the complexity might look like

@marvinlanhenke
Copy link
Contributor Author

That might not be the right structure, but I am trying to give you the flavor of what encapsulating the complexity might look like

@alamb
...thanks, this might already help; guiding me on the right track - I was trying not "to put to much effort into the POC"; however this might have been the wrong decision since I kinda hit a roadbloack due to not being able to effectively reason about the code.

I will look into you suggestion and try to do some refactoring in order to understand what is and what should be happening at this point.

thanks again.

@alamb alamb reopened this Jan 8, 2024
@alamb
Copy link
Contributor

alamb commented Jan 8, 2024

I think this was accidentally closed so I am reopening it. I am happy to close it again if I missed something

@alamb
Copy link
Contributor

alamb commented Jan 29, 2025

If anyone wants a fun exercise, getting the CSV reader to read in parallel from local files owuld greatly speed up the h2o benchmarks

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants