Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Copy messages from topic A to topic B #108

Open
birdayz opened this issue Apr 30, 2020 · 8 comments
Open

Copy messages from topic A to topic B #108

birdayz opened this issue Apr 30, 2020 · 8 comments

Comments

@birdayz
Copy link
Owner

birdayz commented Apr 30, 2020

No description provided.

@arujit
Copy link

arujit commented May 9, 2020

Is anyone working on this or can I pick up this issue. I am kinda new to golang and have worked on some cli implementations in go.

@birdayz
Copy link
Owner Author

birdayz commented May 9, 2020

afaik nobody is currently working on it. however there's definitely a hunger for it - at least from my side. at work, it's a common case for me to copy messages from a topic - a dead letter queue topic - to another topic, so it will be re-processed.

i can definitely support you working on this and getting it merged, but i won't be able to help on the code level as i'm currently working on #69.

i suggest as a first step, we should discuss the goals and how the CLI command & flags could look like. i'd prefer to stay with obvious names, e.g. kaf cp - like in unix. also i suppose using arguments like in the unix cp would make sense - i.e. first argument is the source, and second argument is the target. when it boils down to supplying different offsets for different partitions, it might become ugly :)

@arujit
Copy link

arujit commented May 9, 2020

Thanks, @birdayz for the quick response. I can clearly understand the urge of this feature and I have myself wrote small python scripts of Kafka streams to do these things.

However, before implementing the code I guess the right thing to do would be to think more on the flags and arguments. For now, I can see these things would have to be supported for cure.

  • Yes UNIX like command like kaf cp makes complete sense and denoting source and destination after that. [Doubt] should the feature be and intra-cluster copy or both inter and Intra-cluster? And in the latter case how to pass cluster details in the command.
  • I think we should suggest offset-reset flag in the command too and for now support only the earliest and latest offsets(default would be earliest for sure). The consumer group would be random in every case. A way to visualize the lag would be great.
  • Apart from consuming from the earliest and latest I instead of giving separate offset for partitions, I guess copying from a timestamp would be cleaner.

I can start working on the first cut of this command with a basic implementation. Please give me any suggestion on the features to be implemented. I will think a little more on the flags that needs to be there for sure. I guess I can implement in the code myself but may need some of your help in refactoring the code.

@arujit
Copy link

arujit commented May 11, 2020

Hi @birdayz. Sorry for the disturbance. But start the basic implementation?

@birdayz
Copy link
Owner Author

birdayz commented May 11, 2020

  1. Inter-cluster copy is a great idea. Can we maybe just infer it from the args? Example:

kaf cp topic-a topic-b (intra-cluster)
kaf cp my-cluster-a:topic-a my-cluster-b:topic-b (inter-cluster - kinda like scp syntax IIRC)

  1. what about this:

Examples:

Copy from offset 5k on all partitions to 6k:
kaf cp topic-a topic-b -o 5000:6000 -g my-group-name

Copy from offset 5k on all partitions to end:
kaf cp topic-a topic-b -o 5000: -g my-group-name

Copying from timestamp is also VERY nice. You can look at the kaf group commit command, it already supports timestamps and i'm so far quite happy with that.

About group: if the group already exists - so the tool was started multiple times with the same flags to scale the copy - it should detect that and not reset the offset again to the given offset.

happy to hear your ideas!

@arujit
Copy link

arujit commented May 12, 2020

Thanks @birdayz for validating my ideas. The command
kaf cp my-cluster-a:topic-a my-cluster-b:topic-b -g group-name makes complete sense.

But I am little sceptical around giving offset ranges for a copy as I can not see a use case where I need from a certain offset ranges for all the partitions.

Secondly, how are we planning to have the latest flag incorporated before consuming? If we are planning for inter-cluster copy(I am still not sure how feasible it is but it's a thing that most people would want as mirror make is not that straight forward to use). So I can see people want to consume from the latest offset to a topic in another cluster.

Please give me more suggestion around this. I am starting to go through the modules and all and will start the implementation soon. In the first-cut, I was thinking to have an intra-cluster copy with offset range only which we can extend in the later implementations to give other features like timestamp ranges and inter-cluster copy.
As I told you before I am a little beginner to go and it will be great if I may get your assistance in code level in some cases. Thanks again for giving me the opportunity to contribute. :)

@birdayz
Copy link
Owner Author

birdayz commented May 12, 2020

Can you elaborate the use cases you see? Different offsets is probably very specific, i agree, let's ignore this for the moment.
A start offset makes sense, i suppose in most cases oldest, latest, or timestamp based is used.
About the end - when should it stop? should it ever stop ? In case of a mirror maker like behavior it would not stop. If i just want to retrieve all messages that are "now" in the topic, it should exit after reaching the end of each partition. what would be the default behavior?

@arujit
Copy link

arujit commented May 13, 2020

Hi @birdayz. Yeah, I was also thinking about the behaviour of end offset. I am jotting down all the possible scenarios that I can see.

  • Case 1: A user-defined end offset. It can either be timestamp based or a predefined end offset. In case of end offset not given, will treat the latest message at the start of the job as end offset. So it's a bounded consumer and guaranteed to exit after all the data consumed. We can saw a progress bar kind of thing in the console to show how much of data been copied. In the case of Data loss (let's say retention period expired) we can show how much/percentage of Data been consumed/lost. It's easy to use and it will perfectly suffice copy from Dead letter queue or small inter/intra-cluster copies for debugging.

  • Case 2: A user-defined end offset(latest included).However in the default case when end offset is not given, it will copy data in a while(true) loop. You can think this as a dumb Kafka-stream job whose responsibility is just to copy from source to sink. And the use case I can see would be something like mirror maker. But the issue is I am unsure on the adaptability as Mirror maker is pretty popular and if people are gonna use kaf for more crucial use cases like replication of Data in between streams and all. Second thing is what about scaling, resource management, fault recovery etc. Thirdly go has very little support on Kafka streams, so implementation can be a little tough. So I can see a lot of blockers in this type of implementation.

I feel like there is a hunger for both the kind of features for two completely different use cases. I would suggest let's start with a simpler implementation first(case-1). If we feel like there is a higher demand for the later feature, we will implement that accordingly.
Would love to hear your opinion on this.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants