-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Optimize ParquetPublisher and make ParquetSubscriber safer #346
Conversation
@@ -48,8 +48,7 @@ object Parquet { | |||
* @tparam T A hinder kinded type that represents element type of the parquet file to be read. | |||
* @return All the elements of type [[T]] the specified parquet file as [[Observable]] | |||
*/ | |||
def reader[T](reader: ParquetReader[T])(implicit scheduler: Scheduler): Observable[T] = { | |||
ParquetPublisher(reader).create | |||
def fromReaderUnsafe[T](reader: ParquetReader[T])(implicit scheduler: Scheduler): Observable[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering about API and I feel like it should be similar to Observable.fromIterator
.
Unsafe
version that takes the reader as a strict value, and "normal" version that takes Task[ParquetReader[T]]
or => ParquetReader[T]
and also takes care of closing the reader.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Avasil that's a very good, thanks for bringing it up.
what about other connectors such like aws s3 or dynamodb using directly the client AwsAsyncClient
, would it be an unsafe api too? and a safer on would expect Task[AwsAsyncClient]
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a different situation because ParquetReader
is like a filehandle/iterator and AwsAsyncClient
is more like a connection.
You probably want to reuse it anyway so that's not that big of a deal but if you reuse Observable
created with Observable.fromReaderUnsafe
then you might be surprised because it will use the same iterator instead of creating a new one.
I think S3 could be nicer if we create our own interface, e.g.
trait S3 {
def createBucket
def deleteBucket
...
}
The constructor would take/or create actual AwsAsyncClient
and return Resource[Task, S3]
or something like that.
That's a big API change but we could remove implicit s3AsyncClient: S3AsyncClient
from all methods.
Probably the same case with DynamoDb but I'm not sure, I'm not familiar with it and I just glanced over it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Avasil guess something similar to [fs2](https://github.com/laserdisc-io/fs2-aws#getting-started-with-the-s3-module, it seems a bit verbose to me to create a client, maybe because I am already familiarised with this api... but I am open to new ideas/improvements!
Also would be cool to allow the option of configuring the aws client from .config
files, and let us take care of its creation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it would be nicer if we could create a client for the user if they don't want to do it manually, something like monix-kafka
. Although it's a bit outdated.
it seems a bit verbose to me to create a client
True, but I feel like if we make some helpers and the difference will be 1-2 lines of code then it could be a small price to pay for hiding Java-iness of S3AsyncClient
The changes are not binary compatible but I can make it so if you are OK with them :D |
7b2c746
to
90923a6
Compare
The changes looks good, will do a better review this afternoon to see what the execution model does, since it is new me :P |
No worries, I'm happy to explain whatever you're not familiar with, just ask :D |
I have reimplemented ParquetPublisher to be similar to
Observable.repeatEval
.I didn't do any benchmarks but I'm pretty sure it will be faster because it doesn't have to go through the
Task
Run-loop at all.