Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feature: callback impl #80

Closed
wants to merge 0 commits into from
Closed

feature: callback impl #80

wants to merge 0 commits into from

Conversation

enesyalinkaya
Copy link
Contributor

@enesyalinkaya enesyalinkaya commented Jan 3, 2024

For Kafka events in our outbox bucket, we want to use go-dcp-kafka to send them. When event is published, we want to remove the event document from the outbox bucket. #77

For this reason, I made a development to add a callback function after the event is published

@erayarslan
Copy link
Member

thank you so much for your contribution @enesyalinkaya
i think callback name need to be change to SinkResponseHandler to be more specific.
on connector side it should be set with SetSinkResponseHandler. (like SetEventHandler)
also can expose add message function to OnError callback, so user can retry some of rejected ones.

@erayarslan erayarslan self-requested a review January 3, 2024 11:20
@@ -136,3 +150,28 @@ func isFatalError(err error) bool {
}
return true
}

func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) {
Copy link
Member

@oguzyildirim oguzyildirim Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we write unit tests for the below functions?

}
}

func convertKafkaMessage(src kafka.Message) message.KafkaMessage {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can move this function to kafka.Message I guess and rename toKafkaMessage

Copy link
Contributor Author

@enesyalinkaya enesyalinkaya Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot move it into kafka.Message there, it is a type belonging to the segmentio library.

callback.go Outdated

import "github.com/Trendyol/go-dcp-kafka/kafka/message"

type callback struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this struct? maybe we can move the Callback(SinkResponseHandler) interface here and remove below logics?

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants