Skip to content

Commit

Permalink
feat: add rejection log sink response handler (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
ademekici authored Jan 6, 2025
1 parent 379756d commit 900b8a0
Show file tree
Hide file tree
Showing 12 changed files with 564 additions and 28 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration)
| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). |
| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). |
| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). |
| `kafka.rejectionLog.topic` | string | no | | Rejection topic name. |
| `kafka.rejectionLog.includeValue` | boolean | no | false | Includes rejection log source info. `false` is default. |

### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka)

Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ type Kafka struct {
Compression int8 `yaml:"compression"`
SecureConnection bool `yaml:"secureConnection"`
AllowAutoTopicCreation bool `yaml:"allowAutoTopicCreation"`
RejectionLog RejectionLog `yaml:"rejectionLog"`
}

type RejectionLog struct {
Topic string `yaml:"topic"`
IncludeValue bool `yaml:"includeValue"`
}

func (k *Kafka) GetBalancer() kafka.Balancer {
Expand Down
22 changes: 22 additions & 0 deletions example/simple-rejection-log-sink-response-handler/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM golang:1.20-alpine as builder

WORKDIR /project

COPY go.mod go.sum ./
COPY main.go ./
COPY config.yml ./config.yml

RUN go mod download
RUN CGO_ENABLED=0 go build -a -o example main.go

FROM alpine:3.17.0

WORKDIR /app

RUN apk --no-cache add ca-certificates

USER nobody
COPY --from=builder --chown=nobody:nobody /project/example .
COPY --from=builder --chown=nobody:nobody /project/config.yml ./config.yml

ENTRYPOINT ["./example"]
35 changes: 35 additions & 0 deletions example/simple-rejection-log-sink-response-handler/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
hosts:
- localhost:8091
username: user
password: password
bucketName: dcp-test
scopeName: _default
collectionNames:
- _default
metadata:
type: couchbase
config:
bucket: checkpoint-bucket-name
scope: _default
collection: _default
dcp:
group:
name: groupName
membership:
type: static
kafka:
collectionTopicMapping:
_default: topicname
brokers:
- localhost:9092
# SSL configurations
#
# secureConnection: true
# Config support env variable "$HOME/example/..."
# rootCAPath: "example/stretch-kafka/rootCA.pem"
# interCAPath: "example/stretch-kafka/interCA.pem"
# scramUsername: "username"
# scramPassword: "password"
rejectionLog:
topic: "rejection-topic"
includeValue: true
75 changes: 75 additions & 0 deletions example/simple-rejection-log-sink-response-handler/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
module example

go 1.20

replace github.com/Trendyol/go-dcp-kafka => ./../..

require github.com/Trendyol/go-dcp-kafka v0.0.0

require (
github.com/Trendyol/go-dcp v1.2.0-rc.4 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/ansrivas/fiberprometheus/v2 v2.7.0 // indirect
github.com/asaskevich/EventBus v0.0.0-20200907212545-49d423059eef // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/couchbase/gocbcore/v10 v10.5.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gofiber/fiber/v2 v2.52.5 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/mhmtszr/concurrent-swiss-map v1.0.8 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.58.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/segmentio/kafka-go v0.4.47 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.57.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sync v0.9.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.29.4 // indirect
k8s.io/apimachinery v0.29.4 // indirect
k8s.io/client-go v0.29.4 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Loading

0 comments on commit 900b8a0

Please # to comment.