From d993f9be0064a62c6e74118feead682209f5be71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Fri, 23 Aug 2024 18:51:57 +0200 Subject: [PATCH] Fix stuff --- .../basic/1-your-first-app/docker-compose.yml | 18 +------ _examples/basic/1-your-first-app/main.go | 4 +- dev/validate-examples/main.go | 48 +++++++++++-------- 3 files changed, 32 insertions(+), 38 deletions(-) diff --git a/_examples/basic/1-your-first-app/docker-compose.yml b/_examples/basic/1-your-first-app/docker-compose.yml index ec4fa607b..5366dcf17 100644 --- a/_examples/basic/1-your-first-app/docker-compose.yml +++ b/_examples/basic/1-your-first-app/docker-compose.yml @@ -2,31 +2,17 @@ services: server: image: golang:1.23 restart: unless-stopped - depends_on: - - kafka volumes: - .:/app - $GOPATH/pkg/mod:/go/pkg/mod working_dir: /app command: go run main.go - zookeeper: - image: confluentinc/cp-zookeeper:7.3.1 - logging: - driver: none - restart: unless-stopped - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - kafka: - image: confluentinc/cp-kafka:7.3.1 - logging: - driver: none + image: bitnami/kafka:3.5.0 restart: unless-stopped - depends_on: - - zookeeper environment: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + ALLOW_PLAINTEXT_LISTENER: yes KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" diff --git a/_examples/basic/1-your-first-app/main.go b/_examples/basic/1-your-first-app/main.go index 3eb303322..4b17288f4 100644 --- a/_examples/basic/1-your-first-app/main.go +++ b/_examples/basic/1-your-first-app/main.go @@ -3,7 +3,7 @@ package main import ( "context" "encoding/json" - "log" + "fmt" "time" "github.com/ThreeDotsLabs/watermill" @@ -67,7 +67,7 @@ func main() { return nil, err } - log.Printf("received event %+v", consumedPayload) + fmt.Printf("received event %+v\n", consumedPayload) newPayload, err := json.Marshal(processedEvent{ ProcessedID: consumedPayload.ID, diff --git a/dev/validate-examples/main.go b/dev/validate-examples/main.go index 8e9454271..37c77ff83 100644 --- a/dev/validate-examples/main.go +++ b/dev/validate-examples/main.go @@ -4,7 +4,6 @@ import ( "bufio" "fmt" "io" - "io/ioutil" "os" "os/exec" "path/filepath" @@ -24,7 +23,7 @@ type Config struct { } func (c *Config) LoadFrom(path string) error { - file, err := ioutil.ReadFile(path) + file, err := os.ReadFile(path) if err != nil { return err } @@ -109,35 +108,44 @@ func validate(path string) error { } }() - success := make(chan error) + success := make(chan bool) + lines := make(chan string) - go func() { - io.MultiReader() - - output := bufio.NewReader(io.MultiReader(stdout, stderr)) - for { - line, _, err := output.ReadLine() - if err != nil { - if err == io.EOF { - break - } - } + go readLines(stdout, lines) + go readLines(stderr, lines) - fmt.Printf("[%s] > %s\n", color.CyanString(dirName), string(line)) + go func() { + for line := range lines { + fmt.Printf("[%s] > %s\n", color.CyanString(dirName), line) - ok, _ := regexp.Match(config.ExpectedOutput, line) + ok, _ := regexp.MatchString(config.ExpectedOutput, line) if ok { - success <- nil + success <- true return } } - success <- fmt.Errorf("could not find expected output: %s", config.ExpectedOutput) }() select { - case err := <-success: - return err + case <-success: + return nil case <-time.After(time.Duration(config.Timeout) * time.Second): return fmt.Errorf("validation command timed out") } } + +func readLines(reader io.Reader, output chan<- string) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + if scanner.Err() != nil { + if scanner.Err() == io.EOF { + return + } + + continue + } + + line := scanner.Text() + output <- line + } +}