From 63d3dbb45afdc2851af2811066000ff6af27953a Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 10 Apr 2023 20:32:18 -0300 Subject: [PATCH 01/37] feat: Add Hey :D --- cmd/main.go | 7 +++++++ go.mod | 3 +++ 2 files changed, 10 insertions(+) create mode 100644 cmd/main.go create mode 100644 go.mod diff --git a/cmd/main.go b/cmd/main.go new file mode 100644 index 0000000..a896b32 --- /dev/null +++ b/cmd/main.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func main() { + fmt.Println("Hey, we're doing something :D !!!") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ab6104d --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/fiuskyws/pegasus + +go 1.20 From e01c362bf8165f05dda57b51c2f47215e5bdc47d Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 10 Apr 2023 20:32:32 -0300 Subject: [PATCH 02/37] feat: Add Makefile run command --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..1e9cd77 --- /dev/null +++ b/Makefile @@ -0,0 +1,5 @@ +main=cmd/main.go + +run: + # the '@' suppress the command echo + @go run $(main) From be5e4fcba32c50998386f2871626dbfaf99c17b3 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 10 Apr 2023 20:34:05 -0300 Subject: [PATCH 03/37] feat: Add simple TODO --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 616e2fc..b2556d7 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,12 @@ # pegasus Simple Message Broker service written in Go. + +## TODO: +- [ ] Alpha release: + - [ ] Pub/Sub Logic + - [ ] Client/Server + - [ ] Makefile: + - [ ] `run` + - [ ] `test` + - [ ] `build` + - [ ] Test Coverage > 50% From 8e4d9cd6a63bc05a2f4961e018264b08640660e7 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 10 Apr 2023 21:07:05 -0300 Subject: [PATCH 04/37] feat: Add ToPtr helper --- src/helper/ptr.go | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/helper/ptr.go diff --git a/src/helper/ptr.go b/src/helper/ptr.go new file mode 100644 index 0000000..55d9953 --- /dev/null +++ b/src/helper/ptr.go @@ -0,0 +1,6 @@ +package helper + +// ToPtr converts a given T to *T +func ToPtr[T any](v T) *T { + return &v +} From 2c42b14a31abd05d94b45d885502f8bdb9b1da52 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 10 Apr 2023 21:09:12 -0300 Subject: [PATCH 05/37] feat: Add list data structure --- src/queue/list.go | 138 +++++++++++++++++++ src/queue/list_test.go | 303 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 441 insertions(+) create mode 100644 src/queue/list.go create mode 100644 src/queue/list_test.go diff --git a/src/queue/list.go b/src/queue/list.go new file mode 100644 index 0000000..4f879c9 --- /dev/null +++ b/src/queue/list.go @@ -0,0 +1,138 @@ +package queue + +type ( + // list is a linked-list data structure implementation. + list[T any] struct { + Value *T + Next *list[T] + } +) + +// newList returns a new list with given type T. +func newList[T any]() *list[T] { + return &list[T]{} +} + +// Append will Append a Value. +func (l *list[T]) Append(v T) { + if l.Value == nil { + l.Value = &v + return + } + + l.Next = &list[T]{ + Value: &v, + } +} + +// Empty returns whether the list is empty or not. +func (l *list[T]) Empty() bool { + return (l.Value == nil) && (l.Next == nil) +} + +// Head returns list's head. +func (l *list[T]) Head() *T { + return l.Value +} + +func (l *list[T]) Last() *T { + current := l + for { + if current.Next == nil { + return current.Value + } + current = current.Next + } +} + +// Len return the amount of item in the list. +func (l *list[T]) Len() int { + i := 0 + current := l + for { + if current.Next == nil { + return i + } + i++ + current = current.Next + } +} + +// ListAt - returns the list instance at position `p`. +func (l *list[T]) ListAt(p int) *list[T] { + current := *l + if l.Next == nil && l.Value == nil { + return nil + } + for i := 0; i <= p; i++ { + if i == p { + return current.Next + } + if current.Next == nil { + break + } + current = *current.Next + } + return nil +} + +// Pop - removes a value from the Linked list. +func (l *list[T]) Pop(p int) *T { + current := *l + currentPtr := l + if l.Next == nil && l.Value == nil { + return nil + } + for i := 0; i <= p; i++ { + if i == p { + val := *current.Value + if currentPtr.Next != nil { + currentPtr.Value = currentPtr.Next.Value + currentPtr.Next = currentPtr.Next.Next + } else { + currentPtr.Value = nil + } + return &val + } + if current.Next == nil { + break + } + current = *current.Next + currentPtr = currentPtr.Next + } + return nil +} + +// Prepend will add a Value to end of the list. +func (l *list[T]) Prepend(v T) { + currentlist := *l + newL := newList[T]() + + newL.Value = &v + newL.Next = ¤tlist + + *l = *newL +} + +// Tail - returns the list without the first item. +func (l *list[T]) Tail() *list[T] { + return l.Next +} + +// ValueAt - Returns a Value at position p. +func (l *list[T]) ValueAt(p int) *T { + current := *l + if l.Value == nil { + return nil + } + for i := 0; i <= p; i++ { + if i == p { + return current.Value + } + if current.Next == nil { + break + } + current = *current.Next + } + return nil +} diff --git a/src/queue/list_test.go b/src/queue/list_test.go new file mode 100644 index 0000000..b62edd7 --- /dev/null +++ b/src/queue/list_test.go @@ -0,0 +1,303 @@ +package queue + +import ( + "testing" + + "github.com/fiuskyws/pegasus/src/helper" + "github.com/stretchr/testify/require" +) + +func TestNew(t *testing.T) { + require := require.New(t) + + expected := list[int]{} + actual := newList[int]() + + require.Equal(expected, *actual) +} + +func TestAppend(t *testing.T) { + list := newList[int]() + + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + expected := 1 + list.Append(expected) + + require.Equal(expected, *list.Value) + }) + + t.Run("Non Empty List", func(t *testing.T) { + require := require.New(t) + expected := 2 + list.Append(expected) + + require.Equal(expected, *list.Next.Value) + }) +} + +func TestEmpty(t *testing.T) { + list := newList[int]() + + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + require.True(list.Empty()) + }) + + t.Run("List Not Empty", func(t *testing.T) { + require := require.New(t) + + list.Append(1) + + require.False(list.Empty()) + }) +} + +func TestHead(t *testing.T) { + list := newList[int]() + + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + require.Nil(list.Head()) + }) + + t.Run("List Not Empty", func(t *testing.T) { + require := require.New(t) + + expected := 1 + list.Append(expected) + + actual := list.Head() + + require.Equal(expected, *actual) + }) +} + +func TestListAt(t *testing.T) { + list := newList[int]() + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + actualVal := list.ListAt(100) + + require.Nil(actualVal) + }) + + t.Run("Shorter Than", func(t *testing.T) { + require := require.New(t) + + pos := 100 + + list.Append(1) + + actualVal := list.ListAt(pos) + + require.Nil(actualVal) + }) + + t.Run("Found Value", func(t *testing.T) { + require := require.New(t) + + pos := 0 + + list.Append(2) + + actualVal := list.ListAt(pos) + + require.NotNil(actualVal) + }) + + t.Run("Found Nil Value", func(t *testing.T) { + require := require.New(t) + + pos := 1 + + actualVal := list.ListAt(pos) + + require.Nil(actualVal) + }) +} + +func TestPop(t *testing.T) { + list := newList[int]() + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + actualVal := list.Pop(100) + + require.Nil(actualVal) + }) + + t.Run("Shorter Than", func(t *testing.T) { + require := require.New(t) + + pos := 100 + + list.Append(1) + + actual := list.Pop(pos) + + require.Nil(actual) + }) + + t.Run("Found Value", func(t *testing.T) { + require := require.New(t) + + pos := 0 + + list.Append(2) + + actualVal := list.Pop(pos) + + require.NotNil(actualVal) + }) + + t.Run("Found Nil Value", func(t *testing.T) { + require := require.New(t) + list.Append(3) + + pos := 1 + expectedVal := 3 + + actualVal := list.Pop(pos) + + require.Equal(expectedVal, *actualVal) + }) +} + +func TestPrepend(t *testing.T) { + list := newList[int]() + + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + expected := 1 + + list.Prepend(expected) + + require.Equal(expected, *list.Value) + }) + + t.Run("List w/ 1 Item", func(t *testing.T) { + require := require.New(t) + + expected := 2 + + list.Prepend(expected) + + require.Equal(expected, *list.Value) + require.Equal(1, *list.Next.Value) + }) +} + +func TestLast(t *testing.T) { + l := newList[int]() + + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + actual := l.Last() + + require.Nil(actual) + }) + + t.Run("List w/ 1 item", func(t *testing.T) { + require := require.New(t) + + expected := list[int]{ + Value: helper.ToPtr(1), + } + + l.Append(1) + + actual := l.Last() + + require.Equal(*expected.Value, *actual) + }) + + for i := 2; i <= 10; i++ { + l.Append(i) + } + + t.Run("List w/ 10 item", func(t *testing.T) { + require := require.New(t) + + expected := list[int]{ + Value: helper.ToPtr(10), + } + + actual := l.Last() + + require.NotNil(actual) + require.Equal(*expected.Value, *actual) + }) +} + +func TestTail(t *testing.T) { + list := newList[int]() + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + actual := list.Tail() + + require.Nil(actual) + }) + + t.Run("List w/ 1 item", func(t *testing.T) { + require := require.New(t) + + list.Append(1) + + actual := list.Tail() + + require.Nil(actual) + }) + + t.Run("Non Nil Tail", func(t *testing.T) { + require := require.New(t) + + list.Append(2) + + actual := list.Tail() + + require.NotNil(actual) + }) +} + +func TestValueAt(t *testing.T) { + list := newList[int]() + t.Run("Empty List", func(t *testing.T) { + require := require.New(t) + + actual := list.ValueAt(100) + + require.Nil(actual) + }) + + t.Run("Shorter Than", func(t *testing.T) { + require := require.New(t) + + pos := 100 + + list.Append(1) + + actual := list.ValueAt(pos) + + require.Nil(actual) + }) + + t.Run("Found Value", func(t *testing.T) { + require := require.New(t) + + pos := 1 + + list.Append(2) + + expected := 2 + actual := list.ValueAt(pos) + + require.Equal(expected, *actual) + }) +} From 169e61e1a01f8432a4a57aba7e08a45be8326e6f Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 10 Apr 2023 21:09:23 -0300 Subject: [PATCH 06/37] fix: go mod tidy --- go.mod | 8 ++++++++ go.sum | 17 +++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 go.sum diff --git a/go.mod b/go.mod index ab6104d..511127b 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,11 @@ module github.com/fiuskyws/pegasus go 1.20 + +require github.com/stretchr/testify v1.8.2 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6a56e69 --- /dev/null +++ b/go.sum @@ -0,0 +1,17 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 7e303697baddf15e36921795551e79a572351e29 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Tue, 11 Apr 2023 18:48:35 -0300 Subject: [PATCH 07/37] feat: Add TODO --- src/queue/list_bench_test.go | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 src/queue/list_bench_test.go diff --git a/src/queue/list_bench_test.go b/src/queue/list_bench_test.go new file mode 100644 index 0000000..0d6c970 --- /dev/null +++ b/src/queue/list_bench_test.go @@ -0,0 +1,5 @@ +package queue + +// TODO: +// - add benchmarks +// - look for improving performance From 51a4e13901c6f7d81f7f976deb0e19275707099a Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Tue, 11 Apr 2023 19:04:30 -0300 Subject: [PATCH 08/37] feat: Add simple Queue struct --- src/queue/queue.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 src/queue/queue.go diff --git a/src/queue/queue.go b/src/queue/queue.go new file mode 100644 index 0000000..27549cc --- /dev/null +++ b/src/queue/queue.go @@ -0,0 +1,35 @@ +package queue + +type ( + // Queue is a linked-list data structure, that will be implemented + // instead of a common `chan` to allow us to peek. + // FIFO + Queue[T any] struct { + _l *list[T] + } +) + +// New returns a new instance of Queue. +func New[T any]() *Queue[T] { + return &Queue[T]{ + _l: newList[T](), + } +} + +// Push - Add a item to the end of the queue. +func (q *Queue[T]) Push(v T) { + q._l.Append(v) +} + +// Get - Retrieves the Queue's first item, but +// doesn't delete it, if you call `Get()` again, +// it will return the same value as the first call. +func (q *Queue[T]) Get() *T { + return q._l.Value +} + +// Pop - Retrieves Queue's first item and +// removes it. +func (q *Queue[T]) Pop() *T { + return q._l.Pop(0) +} From 810b44ac9bd41bd2b2b69a4b89a2af8c2d8fe2d0 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Tue, 11 Apr 2023 19:07:27 -0300 Subject: [PATCH 09/37] feat: Add Queue tests --- src/queue/queue_test.go | 88 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) create mode 100644 src/queue/queue_test.go diff --git a/src/queue/queue_test.go b/src/queue/queue_test.go new file mode 100644 index 0000000..024aaba --- /dev/null +++ b/src/queue/queue_test.go @@ -0,0 +1,88 @@ +package queue + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewQueue(t *testing.T) { + require := require.New(t) + + actual := New[int]() + + require.NotNil(actual) + require.NotNil(actual._l) + require.Nil(actual._l.Value) + require.Nil(actual._l.Next) +} + +func TestQueuePush(t *testing.T) { + q := New[int]() + + t.Run("Check First Item", func(t *testing.T) { + require := require.New(t) + + actual := q._l.Value + + require.Nil(actual) + }) + + t.Run("Push", func(t *testing.T) { + require := require.New(t) + + expected := 10 + q.Push(expected) + + actual := *q._l.Value + + require.Equal(expected, actual) + }) +} + +func TestQueueGet(t *testing.T) { + q := New[int]() + + t.Run("Get Empty Queue", func(t *testing.T) { + require := require.New(t) + + actual := q.Get() + + require.Nil(actual) + }) + + t.Run("Get w/ 1 value", func(t *testing.T) { + require := require.New(t) + + expectedValue := 10 + q.Push(expectedValue) + + actualValue := q.Get() + + require.NotNil(actualValue) + require.Equal(expectedValue, *actualValue) + }) +} + +func TestQueuePop(t *testing.T) { + q := New[int]() + + t.Run("Pop Empty Queue", func(t *testing.T) { + require := require.New(t) + + actual := q.Pop() + + require.Nil(actual) + }) + + t.Run("Pop w/ 1 value", func(t *testing.T) { + require := require.New(t) + + expectedValue := 10 + q.Push(expectedValue) + + actual := q.Pop() + + require.NotNil(actual) + }) +} From 2bb4c9d09905297d54dcee08da9f8e131e142db5 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 19:36:35 -0300 Subject: [PATCH 10/37] feat: Add topic --- go.mod | 5 ++++- go.sum | 2 ++ src/topic/topic.go | 51 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 src/topic/topic.go diff --git a/go.mod b/go.mod index 511127b..36ef5cb 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,10 @@ module github.com/fiuskyws/pegasus go 1.20 -require github.com/stretchr/testify v1.8.2 +require ( + github.com/google/uuid v1.3.0 + github.com/stretchr/testify v1.8.2 +) require ( github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 6a56e69..4fe7991 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/src/topic/topic.go b/src/topic/topic.go new file mode 100644 index 0000000..b6bd98f --- /dev/null +++ b/src/topic/topic.go @@ -0,0 +1,51 @@ +package topic + +import "github.com/google/uuid" + +type ( + // Topic is a "named queue" that services can subscribe to. + Topic[T any] struct { + // q is the Topic's internal queue. + q chan T + Name string + } +) + +// NewTopic creates a new Topic, if given name is empty, it generates +// an UUID as a name. +func NewTopic[T any](name string) (*Topic[T], error) { + if name == "" { + id, err := uuid.NewRandom() + if err != nil { + return nil, err + } + name = id.String() + } + + return &Topic[T]{ + q: make(chan T, 10), + Name: name, + }, nil +} + +// GetReceiver will return a read-only Queue. +func (t *Topic[T]) GetReceiver() <-chan T { + return t.q +} + +// GetSender will return a write-only Queue. +func (t *Topic[T]) GetSender() chan<- T { + return t.q +} + +// Pop will retrieve and delete a message from the output channel. +func (t *Topic[T]) Pop() (T, error) { + msg := <-t.q + return msg, nil +} + +// Send will put a message in Topic input channel. +func (t *Topic[T]) Send(m T) error { + t.q <- m + return nil +} From 2c6a7ea9613db0036b73fff19e69257aa826eab9 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 19:38:52 -0300 Subject: [PATCH 11/37] feat: Add topic test --- src/topic/topic_test.go | 66 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 src/topic/topic_test.go diff --git a/src/topic/topic_test.go b/src/topic/topic_test.go new file mode 100644 index 0000000..9149099 --- /dev/null +++ b/src/topic/topic_test.go @@ -0,0 +1,66 @@ +package topic + +import ( + "regexp" + "testing" + + "github.com/stretchr/testify/require" +) + +var ( + uuidRegex = regexp.MustCompile("^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$") +) + +func TestTopic(t *testing.T) { + t.Run("NewTopic", func(t *testing.T) { + t.Run("With name", func(t *testing.T) { + require := require.New(t) + name := "topic" + + actual, err := NewTopic[string](name) + + require.Nil(err) + require.NotNil(actual) + require.Equal(name, actual.Name) + }) + t.Run("Empty name", func(t *testing.T) { + require := require.New(t) + + actual, err := NewTopic[string]("") + + require.Nil(err) + require.NotNil(actual) + require.Regexp(uuidRegex, actual.Name) + }) + }) + + t.Run("Send", func(t *testing.T) { + require := require.New(t) + topic, err := NewTopic[string]("") + + require.Nil(err) + + actual := topic.Send("foo") + + require.Nil(actual) + }) + + t.Run("Pop", func(t *testing.T) { + require := require.New(t) + topic, err := NewTopic[string]("") + msg := "foo" + + require.Nil(err) + + { + err := topic.Send(msg) + require.Nil(err) + } + + { + actual, err := topic.Pop() + require.Nil(err) + require.Equal(msg, actual) + } + }) +} From 2933fd7d481c2506cb20ec21055c6cb1bfa420af Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 19:51:52 -0300 Subject: [PATCH 12/37] fix: Add TODOs --- src/queue/list.go | 3 +++ src/queue/queue.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/queue/list.go b/src/queue/list.go index 4f879c9..186ab49 100644 --- a/src/queue/list.go +++ b/src/queue/list.go @@ -1,5 +1,8 @@ package queue +// TODO: +// - Improve current implementation, to be memory/concurrency/parallel safe. + type ( // list is a linked-list data structure implementation. list[T any] struct { diff --git a/src/queue/queue.go b/src/queue/queue.go index 27549cc..10333bc 100644 --- a/src/queue/queue.go +++ b/src/queue/queue.go @@ -1,5 +1,8 @@ package queue +// TODO: +// - Improve current implementation, to be memory/concurrency/parallel safe. + type ( // Queue is a linked-list data structure, that will be implemented // instead of a common `chan` to allow us to peek. From 6b9e77cc9efb667183ca3d661447cbc8e9974304 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 20:21:35 -0300 Subject: [PATCH 13/37] feat: Add simple Message struct --- src/message/message.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 src/message/message.go diff --git a/src/message/message.go b/src/message/message.go new file mode 100644 index 0000000..6be7148 --- /dev/null +++ b/src/message/message.go @@ -0,0 +1,20 @@ +package message + +import "time" + +type ( + // Message represents the data that's in the queue. + Message struct { + // Timestamp registers the time the message was Published + Timestamp *time.Time `json:"timestamp"` + Body []byte `json:"body"` + } +) + +func (m Message) BodyBytes() []byte { + return m.Body +} + +func (m Message) BodyString() string { + return string(m.Body) +} From 27d52f612fb570bd8e6650f4ae06de53a5ddf260 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 20:22:51 -0300 Subject: [PATCH 14/37] feat: Add Topic TODO --- src/topic/topic.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/topic/topic.go b/src/topic/topic.go index b6bd98f..9e80bc3 100644 --- a/src/topic/topic.go +++ b/src/topic/topic.go @@ -2,6 +2,9 @@ package topic import "github.com/google/uuid" +// TODO: +// - Implement Queue data structure in Topic +// - Add `Peek() T` and `PeekAt(i uint) (*T)` methods type ( // Topic is a "named queue" that services can subscribe to. Topic[T any] struct { From 71b9908d3cfaaff8717a6545025eee107703f9ef Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 20:30:28 -0300 Subject: [PATCH 15/37] feat: Add Attr field into Message struct --- src/message/message.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/message/message.go b/src/message/message.go index 6be7148..2dee8b2 100644 --- a/src/message/message.go +++ b/src/message/message.go @@ -7,14 +7,8 @@ type ( Message struct { // Timestamp registers the time the message was Published Timestamp *time.Time `json:"timestamp"` - Body []byte `json:"body"` + // Attr is a map of message Attributes. + Attr map[string]any `json:"attr"` + Body []byte `json:"body"` } ) - -func (m Message) BodyBytes() []byte { - return m.Body -} - -func (m Message) BodyString() string { - return string(m.Body) -} From 6dd0bea0b4c1c345aaa4456ffa665a75a994eeb8 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 20:34:25 -0300 Subject: [PATCH 16/37] feat: Add Validate method in Message --- src/message/message.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/message/message.go b/src/message/message.go index 2dee8b2..e7f08c2 100644 --- a/src/message/message.go +++ b/src/message/message.go @@ -1,10 +1,14 @@ package message -import "time" +import ( + "fmt" + "time" +) type ( // Message represents the data that's in the queue. Message struct { + TopicName string `json:"topic_name"` // Timestamp registers the time the message was Published Timestamp *time.Time `json:"timestamp"` // Attr is a map of message Attributes. @@ -12,3 +16,14 @@ type ( Body []byte `json:"body"` } ) + +const ( + errEmptyField = "field '%s' is empty" +) + +func (m *Message) Validate() error { + if m.TopicName == "" { + return fmt.Errorf(errEmptyField, "topic_name") + } + return nil +} From 8db15762b4c1fc474e5a9b231ba1b95bbe0cd357 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 20:34:41 -0300 Subject: [PATCH 17/37] feat: Add Manager to handle topics --- src/manager/manager.go | 75 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) create mode 100644 src/manager/manager.go diff --git a/src/manager/manager.go b/src/manager/manager.go new file mode 100644 index 0000000..91bbcfd --- /dev/null +++ b/src/manager/manager.go @@ -0,0 +1,75 @@ +package manager + +import ( + "fmt" + + "github.com/fiuskyws/pegasus/src/message" + "github.com/fiuskyws/pegasus/src/topic" +) + +type ( + // Manager handles all actions related to Pegasus. + Manager struct { + topics map[string]*topic.Topic[*message.Message] + } +) + +// NewManager returns a new Manager +func NewManager() *Manager { + return &Manager{ + topics: map[string]*topic.Topic[*message.Message]{}, + } +} + +// GetTopicNames returns a list of names of all created Topics. +func (m *Manager) GetTopicNames() []string { + topics := []string{} + + for k := range m.topics { + topics = append(topics, k) + } + + return topics +} + +const ( + errTopicAlreadyExists = "topic named '%s' already exists" + errTopicNotFound = "topic named '%s' not found" +) + +// NewTopic creates a new Topic and adds it to the Manager. +func (m *Manager) NewTopic(name string) error { + if _, ok := m.topics[name]; ok { + return fmt.Errorf(errTopicAlreadyExists, name) + } + createdTopic, err := topic.NewTopic[*message.Message](name) + if err != nil { + return err + } + + m.topics[createdTopic.Name] = createdTopic + return nil +} + +// Send inserts a message into Topic's internal queue. +func (m *Manager) Send(msg *message.Message) error { + if err := msg.Validate(); err != nil { + return err + } + topic, ok := m.topics[msg.TopicName] + if !ok { + return fmt.Errorf(errTopicNotFound, msg.TopicName) + } + + return topic.Send(msg) +} + +// Pop retrieves a message from Topic's internal queue. +func (m *Manager) Pop(name string) (*message.Message, error) { + topic, ok := m.topics[name] + if !ok { + return nil, fmt.Errorf(errTopicNotFound, name) + } + + return topic.Pop() +} From a0335c261113f6f96a403af86ac9f681950e6907 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Wed, 12 Apr 2023 20:58:16 -0300 Subject: [PATCH 18/37] feat: Add GetTopic method --- src/manager/manager.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/manager/manager.go b/src/manager/manager.go index 91bbcfd..abfa9c3 100644 --- a/src/manager/manager.go +++ b/src/manager/manager.go @@ -32,6 +32,16 @@ func (m *Manager) GetTopicNames() []string { return topics } +// GetTopicNames returns a list of names of all created Topics. +func (m *Manager) GetTopic(topicName string) (*topic.Topic[*message.Message], error) { + t, ok := m.topics[topicName] + if !ok { + return nil, fmt.Errorf(errTopicNotFound, topicName) + } + + return t, nil +} + const ( errTopicAlreadyExists = "topic named '%s' already exists" errTopicNotFound = "topic named '%s' not found" From f5b4311437ab5c77681659506e4edde74bbb6d5d Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 15:52:27 -0300 Subject: [PATCH 19/37] feat: Add manager tests --- src/manager/manager_test.go | 221 ++++++++++++++++++++++++++++++++++++ 1 file changed, 221 insertions(+) create mode 100644 src/manager/manager_test.go diff --git a/src/manager/manager_test.go b/src/manager/manager_test.go new file mode 100644 index 0000000..523ce56 --- /dev/null +++ b/src/manager/manager_test.go @@ -0,0 +1,221 @@ +package manager + +import ( + "testing" + + "github.com/fiuskyws/pegasus/src/message" + "github.com/stretchr/testify/require" +) + +func TestNewManager(t *testing.T) { + require := require.New(t) + + actual := NewManager() + + require.NotNil(actual) + +} + +func TestNewTopic(t *testing.T) { + t.Run("Success", func(t *testing.T) { + require := require.New(t) + m := NewManager() + actualErr := m.NewTopic("") + + require.Nil(actualErr) + }) + + t.Run("Topic Exists", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + { + actual := m.NewTopic("topic") + require.Nil(actual) + } + { + actual := m.NewTopic("topic") + require.NotNil(actual) + } + }) +} + +func TestGetTopicNames(t *testing.T) { + t.Run("Success - Empty", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + actual := m.GetTopicNames() + + require.Empty(actual) + }) + + t.Run("Success - 1 Topic", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + { + actual := m.NewTopic("topic") + require.Nil(actual) + } + + { + expected := []string{"topic"} + actual := m.GetTopicNames() + + require.Equal(expected, actual) + } + }) + + t.Run("Success - 10 Topics", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + amountOfTopics := 10 + + { + for i := 0; i < amountOfTopics; i++ { + actual := m.NewTopic("") + require.Nil(actual) + } + } + + { + topics := m.GetTopicNames() + + actual := len(topics) + require.Equal(amountOfTopics, actual) + } + }) +} + +func TestSend(t *testing.T) { + t.Run("Invalid Message", func(t *testing.T) { + t.Run("Empty TopicName", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + actual := m.Send(&message.Message{ + Body: []byte("body"), + }) + + require.NotNil(actual) + }) + + t.Run("Empty Body", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + actual := m.Send(&message.Message{ + TopicName: "topic", + }) + + require.NotNil(actual) + }) + }) + + t.Run("Topic Doesn't Exists", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + actual := m.Send(&message.Message{ + TopicName: "some topic", + Body: []byte("body"), + }) + + require.NotNil(actual) + }) + + t.Run("Topic Doesn't Exists", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + { + err := m.NewTopic("topic") + require.Nil(err) + } + + { + actual := m.Send(&message.Message{ + TopicName: "wrong_topic", + Body: []byte("body"), + }) + + require.NotNil(actual) + } + }) + + t.Run("Valid", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + { + err := m.NewTopic("topic") + require.Nil(err) + } + + { + actual := m.Send(&message.Message{ + TopicName: "topic", + Body: []byte("body"), + }) + + require.Nil(actual) + } + }) +} + +func TestPop(t *testing.T) { + t.Run("Topic Doesn't Exists", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + actual, err := m.Pop("some_topic") + + require.NotNil(err) + require.Nil(actual) + }) + + t.Run("Topic Doesn't Exists", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + { + err := m.NewTopic("topic") + require.Nil(err) + } + + { + actual, err := m.Pop("some_topic") + + require.NotNil(err) + require.Nil(actual) + } + }) + + t.Run("Valid", func(t *testing.T) { + require := require.New(t) + m := NewManager() + + { + err := m.NewTopic("topic") + require.Nil(err) + } + + { + err := m.Send(&message.Message{ + TopicName: "topic", + Body: []byte("body"), + }) + + require.Nil(err) + } + + { + actual, err := m.Pop("topic") + + require.Nil(err) + require.NotNil(actual) + } + }) +} From a30653fdeb85de899a16ad2b8651b778fc2b124a Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 22:29:43 -0300 Subject: [PATCH 20/37] feat: Add proto file --- .proto/pegasus.proto | 67 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 .proto/pegasus.proto diff --git a/.proto/pegasus.proto b/.proto/pegasus.proto new file mode 100644 index 0000000..9799116 --- /dev/null +++ b/.proto/pegasus.proto @@ -0,0 +1,67 @@ +syntax = "proto3"; + +package proto; +option go_package = "/proto"; + +service Pegasus { + rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse); + rpc GetTopics(GetTopicsRequest) returns (GetTopicsResponse); + + rpc Send(SendRequest) returns (SendResponse); + rpc Pop(PopRequest) returns (PopResponse); + + rpc Consumer(ConsumerRequest) returns (stream ConsumerResponse); + rpc Producer(stream ProducerRequest) returns (stream ProducerResponse); +} + +message GetTopicsRequest {} +message GetTopicsResponse { + repeated string topics = 1; +} + +message CreateTopicRequest { + string name = 1; +} + +message CreateTopicResponse { + string error = 1; +} + +message ConsumerRequest { + string topic_name = 1; +} + +message ConsumerResponse { + string topic_name = 1; + string body = 2; +} + +message ProducerRequest { + string topic_name = 1; + string body = 2; +} + +message ProducerResponse { + string message = 1; + string error = 2; +} + +message SendRequest { + string topic_name = 1; + string body = 2; +} + +message SendResponse { + string message = 1; + string error = 2; +} + +message PopRequest { + string topic_name = 1; +} + +message PopResponse { + string topic_name = 1; + string body = 2; + string error = 3; +} From a0f3a1addddb1ac0da81bb37197413b8d32b9c9e Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 22:31:29 -0300 Subject: [PATCH 21/37] feat: Add ignore proto generated files --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 66fd13c..7cdc2cb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,6 @@ # Dependency directories (remove the comment below to include it) # vendor/ + +# Generated proto file +src/proto/* From de054c847f63fbbd57f10855ad892c012c892147 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 22:31:43 -0300 Subject: [PATCH 22/37] feat: Add Makefile cmd to generated the proto files --- Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile b/Makefile index 1e9cd77..f1c7484 100644 --- a/Makefile +++ b/Makefile @@ -3,3 +3,8 @@ main=cmd/main.go run: # the '@' suppress the command echo @go run $(main) + +proto: + protoc --go_out=./src/ \ + --go-grpc_out=./src/ \ + .proto/pegasus.proto From 5c6b65e2cabcfea804a54efbb3e913e1060231e4 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 22:39:09 -0300 Subject: [PATCH 23/37] feat: Add GRPCRepo --- src/repository/grpc.go | 140 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 src/repository/grpc.go diff --git a/src/repository/grpc.go b/src/repository/grpc.go new file mode 100644 index 0000000..7eb16c8 --- /dev/null +++ b/src/repository/grpc.go @@ -0,0 +1,140 @@ +package repository + +import ( + "context" + "time" + + "github.com/fiuskyws/pegasus/src/manager" + "github.com/fiuskyws/pegasus/src/proto" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type GRPCRepo struct { + proto.UnimplementedPegasusServer + mgr *manager.Manager +} + +func NewGRPCRepo(mgr *manager.Manager) *GRPCRepo { + return &GRPCRepo{ + mgr: mgr, + } +} + +// CreateTopic - +func (g *GRPCRepo) CreateTopic(ctx context.Context, req *proto.CreateTopicRequest) (*proto.CreateTopicResponse, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + errChan := make(chan error, 1) + + go func() { + errChan <- g.mgr.NewTopic(req.Name) + }() + + select { + case <-ctx.Done(): + return nil, status.Error(codes.DeadlineExceeded, "timeout") + case err := <-errChan: + if err != nil { + return nil, status.Error(codes.Canceled, err.Error()) + } + return &proto.CreateTopicResponse{ + Error: "", + }, nil + } +} + +// GetTopics returns a list of all topics. +func (g *GRPCRepo) GetTopics(ctx context.Context, _ *proto.GetTopicsRequest) (*proto.GetTopicsResponse, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + + topics := make(chan []string, 1) + + go func() { + topics <- g.mgr.GetTopicNames() + }() + + select { + case <-ctx.Done(): + return nil, status.Error(codes.DeadlineExceeded, "timeout") + case tpcs := <-topics: + return &proto.GetTopicsResponse{ + Topics: tpcs, + }, nil + } +} + +// Send - Endpoint for inserting messages into a Topic. +func (g *GRPCRepo) Send(ctx context.Context, req *proto.SendRequest) (*proto.SendResponse, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + msgChan := make(chan *message.Message, 1) + errChan := make(chan error, 1) + go func() { + msg, err := message.FromRequest(req) + if err != nil { + errChan <- err + return + } + if err = g.mgr.Send(msg); err != nil { + zap.L().Error(err.Error()) + errChan <- err + return + } + msgChan <- msg + }() + select { + case <-ctx.Done(): + return nil, status.Error(codes.DeadlineExceeded, "timeout") + case err := <-errChan: + zap.L().Error(err.Error()) + return nil, status.Error(codes.Canceled, err.Error()) + case <-msgChan: + return &proto.SendResponse{ + Message: "message sent!", + }, nil + } +} + +// Pop - Retrieves and delete the first item in the Topic's queue. +func (g *GRPCRepo) Pop(ctx context.Context, req *proto.PopRequest) (*proto.PopResponse, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*2) + defer cancel() + + msgChan := make(chan *message.Message, 1) + errChan := make(chan error, 1) + + go func() { + msg, err := g.mgr.Pop(req.TopicName) + if err != nil { + errChan <- err + return + } + msgChan <- msg + }() + + select { + case <-ctx.Done(): + return nil, status.Error(codes.DeadlineExceeded, "timeout") + case err := <-errChan: + zap.L().Error(err.Error()) + return nil, status.Error(codes.Canceled, err.Error()) + case msg := <-msgChan: + return &proto.PopResponse{ + TopicName: msg.TopicName, + Body: msg.Body, + }, nil + } +} + +// Consumer - +func (g *GRPCRepo) Consumer(in *proto.ConsumerRequest, srv proto.Pegasus_ConsumerServer) error { + return nil +} + +// Producer - +func (g *GRPCRepo) Producer(srv proto.Pegasus_ProducerServer) error { + return nil +} From 35eb67f668d915f6621a9e556f3899e441ebe38a Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 22:39:16 -0300 Subject: [PATCH 24/37] fix: go mod tidy --- go.mod | 10 ++++++++++ go.sum | 30 ++++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/go.mod b/go.mod index 36ef5cb..cfe81c6 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,20 @@ go 1.20 require ( github.com/google/uuid v1.3.0 github.com/stretchr/testify v1.8.2 + go.uber.org/zap v1.24.0 + google.golang.org/grpc v1.54.0 + google.golang.org/protobuf v1.30.0 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/text v0.8.0 // indirect + google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 4fe7991..1971731 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,47 @@ +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= +go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag= +google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 42d6289665948b4934303fa267a72f8da77b1152 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 22:40:31 -0300 Subject: [PATCH 25/37] feat: Add GRPC server --- src/server/grpc.go | 66 ++++++++++++++++++++++++++++++++++++++++++++ src/server/server.go | 6 ++++ 2 files changed, 72 insertions(+) create mode 100644 src/server/grpc.go create mode 100644 src/server/server.go diff --git a/src/server/grpc.go b/src/server/grpc.go new file mode 100644 index 0000000..7dcbf26 --- /dev/null +++ b/src/server/grpc.go @@ -0,0 +1,66 @@ +package server + +import ( + "fmt" + "net" + + "github.com/fiuskyws/pegasus/src/manager" + "github.com/fiuskyws/pegasus/src/proto" + "github.com/fiuskyws/pegasus/src/repository" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type ( + // GRPC stores all info/data necessary to establish a + // gRPC server. + GRPC struct { + listener net.Listener + mgr *manager.Manager + repo *repository.GRPCRepo + srv *grpc.Server + opts []grpc.ServerOption + } +) + +// NewGRPC returns a new GRPC connector. +func NewGRPC(mgr *manager.Manager) *GRPC { + var opts []grpc.ServerOption + + srv := grpc.NewServer(opts...) + + repo := repository.NewGRPCRepo(mgr) + + return &GRPC{ + mgr: mgr, + opts: opts, + repo: repo, + srv: srv, + } +} + +// Start - starts the gRPC server. +func (g *GRPC) Start(port uint) error { + var err error + g.listener, err = net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + zap.L().Error(err.Error()) + return err + } + proto.RegisterPegasusServer(g.srv, proto.PegasusServer(g.repo)) + if err := g.srv.Serve(g.listener); err != nil { + zap.L().Error(err.Error()) + return err + } + return nil +} + +// Close - closes the gRPC server. +func (g *GRPC) Close() error { + g.srv.GracefulStop() + if err := g.listener.Close(); err != nil { + zap.L().Error(err.Error()) + return err + } + return nil +} diff --git a/src/server/server.go b/src/server/server.go new file mode 100644 index 0000000..9ae3e67 --- /dev/null +++ b/src/server/server.go @@ -0,0 +1,6 @@ +package server + +type ( + // API wraps up server methods. + API interface{} +) From 90b05b9d2863b998d08673ed70330ee3b1af2cf3 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Fri, 14 Apr 2023 22:41:28 -0300 Subject: [PATCH 26/37] feat: Add server cmd file --- cmd/server/main.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 cmd/server/main.go diff --git a/cmd/server/main.go b/cmd/server/main.go new file mode 100644 index 0000000..4139fa5 --- /dev/null +++ b/cmd/server/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "os" + "os/signal" + "syscall" + + "github.com/fiuskyws/pegasus/src/manager" + "github.com/fiuskyws/pegasus/src/server" +) + +var ( + port = uint(8090) + topics = []string{ + "topic-1", "topic-2", + } +) + +func main() { + + exit := make(chan os.Signal, 1) + + signal.Notify(exit, syscall.SIGTERM, syscall.SIGINT) + + m := manager.NewManager() + + for _, topic := range topics { + if err := m.NewTopic(topic); err != nil { + panic(err) + } + } + + g := server.NewGRPC(m) + + if err := g.Start(port); err != nil { + panic(err) + } + + <-exit + + if err := g.Close(); err != nil { + panic(err) + } +} From 267074ab6efa897c5925295fd49f8686554da8b1 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Sat, 15 Apr 2023 20:44:15 -0300 Subject: [PATCH 27/37] fix: Type conversion --- src/repository/grpc.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/repository/grpc.go b/src/repository/grpc.go index 7eb16c8..5ab4530 100644 --- a/src/repository/grpc.go +++ b/src/repository/grpc.go @@ -5,6 +5,7 @@ import ( "time" "github.com/fiuskyws/pegasus/src/manager" + "github.com/fiuskyws/pegasus/src/message" "github.com/fiuskyws/pegasus/src/proto" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -124,7 +125,7 @@ func (g *GRPCRepo) Pop(ctx context.Context, req *proto.PopRequest) (*proto.PopRe case msg := <-msgChan: return &proto.PopResponse{ TopicName: msg.TopicName, - Body: msg.Body, + Body: string(msg.Body), }, nil } } From d77d63ed5caee4591d9814b90bd9a8d240333acc Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Sat, 15 Apr 2023 20:44:45 -0300 Subject: [PATCH 28/37] feat: Add FromRequest function --- src/message/message.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/message/message.go b/src/message/message.go index e7f08c2..0c174fc 100644 --- a/src/message/message.go +++ b/src/message/message.go @@ -3,6 +3,8 @@ package message import ( "fmt" "time" + + "github.com/fiuskyws/pegasus/src/proto" ) type ( @@ -27,3 +29,16 @@ func (m *Message) Validate() error { } return nil } + +func FromRequest(req *proto.SendRequest) (*Message, error) { + msg := Message{ + Body: []byte(req.Body), + TopicName: req.TopicName, + } + + if err := msg.Validate(); err != nil { + return nil, err + } + + return &msg, nil +} From 15d349ee8f971e0a0c05f80d4c8c700b0d392e2a Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 19:33:16 -0300 Subject: [PATCH 29/37] feat: Add global logs --- cmd/server/main.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index 4139fa5..9331fa6 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -7,6 +7,7 @@ import ( "github.com/fiuskyws/pegasus/src/manager" "github.com/fiuskyws/pegasus/src/server" + "go.uber.org/zap" ) var ( @@ -17,6 +18,13 @@ var ( ) func main() { + l, err := zap.NewProduction() + if err != nil { + panic(err) + } + + undo := zap.ReplaceGlobals(l) + defer undo() exit := make(chan os.Signal, 1) @@ -26,19 +34,21 @@ func main() { for _, topic := range topics { if err := m.NewTopic(topic); err != nil { - panic(err) + zap.L().Panic(err.Error()) } } g := server.NewGRPC(m) - if err := g.Start(port); err != nil { - panic(err) - } + go func() { + if err := g.Start(port); err != nil { + zap.L().Panic(err.Error()) + } + }() <-exit if err := g.Close(); err != nil { - panic(err) + zap.L().Panic(err.Error()) } } From 25d99e6b8afb9f9d61684c0a9ecd39c895a66ce9 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 19:34:05 -0300 Subject: [PATCH 30/37] fix: Add TODO --- cmd/server/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/server/main.go b/cmd/server/main.go index 9331fa6..b0dec64 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -23,6 +23,8 @@ func main() { panic(err) } + // TODO: + // - Remove this implementation, MUST NOT use `ReplaceGlobals` undo := zap.ReplaceGlobals(l) defer undo() From 91e839590a8808318d4a11afa43e1dd141ee17d3 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 22:26:03 -0300 Subject: [PATCH 31/37] feat: Implement requestAPI interface in FromRequest function --- src/message/message.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/message/message.go b/src/message/message.go index 0c174fc..64cdb24 100644 --- a/src/message/message.go +++ b/src/message/message.go @@ -3,8 +3,6 @@ package message import ( "fmt" "time" - - "github.com/fiuskyws/pegasus/src/proto" ) type ( @@ -15,7 +13,12 @@ type ( Timestamp *time.Time `json:"timestamp"` // Attr is a map of message Attributes. Attr map[string]any `json:"attr"` - Body []byte `json:"body"` + Body string `json:"body"` + } + + requestAPI interface { + GetBody() string + GetTopicName() string } ) @@ -30,10 +33,10 @@ func (m *Message) Validate() error { return nil } -func FromRequest(req *proto.SendRequest) (*Message, error) { +func FromRequest[T requestAPI](req T) (*Message, error) { msg := Message{ - Body: []byte(req.Body), - TopicName: req.TopicName, + Body: req.GetBody(), + TopicName: req.GetTopicName(), } if err := msg.Validate(); err != nil { From 7839f670d65edd217f44de596e7b67d8bafb0e94 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 22:26:10 -0300 Subject: [PATCH 32/37] feat: Implement Consumer and Producer --- src/repository/grpc.go | 94 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 2 deletions(-) diff --git a/src/repository/grpc.go b/src/repository/grpc.go index 5ab4530..0210cc7 100644 --- a/src/repository/grpc.go +++ b/src/repository/grpc.go @@ -131,11 +131,101 @@ func (g *GRPCRepo) Pop(ctx context.Context, req *proto.PopRequest) (*proto.PopRe } // Consumer - -func (g *GRPCRepo) Consumer(in *proto.ConsumerRequest, srv proto.Pegasus_ConsumerServer) error { - return nil +func (g *GRPCRepo) Consumer(req *proto.ConsumerRequest, srv proto.Pegasus_ConsumerServer) error { + msgChan := make(chan *message.Message, 1) + errChan := make(chan error, 1) + + go func() { + exit := false + for !exit { + msg, err := g.mgr.Pop(req.TopicName) + if err != nil { + errChan <- err + exit = true + return + } + msgChan <- msg + } + }() + + for { + select { + case <-srv.Context().Done(): + if err := srv.Context().Err(); err != nil { + zap.L().Error(err.Error()) + return err + } + return nil + case err := <-errChan: + zap.L().Error(err.Error()) + return err + case msg := <-msgChan: + resp := proto.ConsumerResponse{ + TopicName: msg.TopicName, + Body: string(msg.Body), + } + if err := srv.Send(&resp); err != nil { + zap.L().Error(err.Error()) + return err + } + } + } } // Producer - func (g *GRPCRepo) Producer(srv proto.Pegasus_ProducerServer) error { + // TODO: Encapsulate this on it's own Job. + exit := false + done := srv.Context().Done() + req, err := srv.Recv() + if err != nil { + zap.L().Error(err.Error()) + return err + } + + topic, err := g.mgr.GetTopic(req.TopicName) + if err != nil { + zap.L().Error(err.Error()) + return err + } + + msg, err := message.FromRequest(req) + if err != nil { + zap.L().Error(err.Error()) + return err + } + + if err := topic.Send(msg); err != nil { + zap.L().Error(err.Error()) + return err + } + + for !exit { + select { + case <-done: + if err := srv.Context().Err(); err != nil { + zap.L().Error(err.Error()) + return err + } + return nil + default: + req, err := srv.Recv() + if err != nil { + zap.L().Error(err.Error()) + return err + } + + msg, err := message.FromRequest(req) + if err != nil { + zap.L().Error(err.Error()) + return err + } + + if err := topic.Send(msg); err != nil { + zap.L().Error(err.Error()) + return err + } + } + } return nil } From 3620277d809450d9412204731b2709b7470b2ea4 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 22:31:47 -0300 Subject: [PATCH 33/37] fix: Tests and TODO update --- Makefile | 10 +++++++++- README.md | 11 +++++------ src/manager/manager_test.go | 10 +++++----- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index f1c7484..9d22d4b 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,15 @@ main=cmd/main.go +test: +# the '@' suppress the command echo + @go test ./... + +test-v: +# the '@' suppress the command echo + @go test -v ./... + run: - # the '@' suppress the command echo +# the '@' suppress the command echo @go run $(main) proto: diff --git a/README.md b/README.md index b2556d7..a087048 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,9 @@ Simple Message Broker service written in Go. ## TODO: - [ ] Alpha release: - - [ ] Pub/Sub Logic - - [ ] Client/Server - - [ ] Makefile: - - [ ] `run` - - [ ] `test` - - [ ] `build` + - [x] Pub/Sub Logic + - [x] Client/Server + - [x] Makefile: + - [x] `run` + - [x] `test` - [ ] Test Coverage > 50% diff --git a/src/manager/manager_test.go b/src/manager/manager_test.go index 523ce56..e551e8b 100644 --- a/src/manager/manager_test.go +++ b/src/manager/manager_test.go @@ -96,7 +96,7 @@ func TestSend(t *testing.T) { m := NewManager() actual := m.Send(&message.Message{ - Body: []byte("body"), + Body: "body", }) require.NotNil(actual) @@ -120,7 +120,7 @@ func TestSend(t *testing.T) { actual := m.Send(&message.Message{ TopicName: "some topic", - Body: []byte("body"), + Body: "body", }) require.NotNil(actual) @@ -138,7 +138,7 @@ func TestSend(t *testing.T) { { actual := m.Send(&message.Message{ TopicName: "wrong_topic", - Body: []byte("body"), + Body: "body", }) require.NotNil(actual) @@ -157,7 +157,7 @@ func TestSend(t *testing.T) { { actual := m.Send(&message.Message{ TopicName: "topic", - Body: []byte("body"), + Body: "body", }) require.Nil(actual) @@ -205,7 +205,7 @@ func TestPop(t *testing.T) { { err := m.Send(&message.Message{ TopicName: "topic", - Body: []byte("body"), + Body: "body", }) require.Nil(err) From 1d86f6d3408c784af7c14582ccc5ef7f72fac0ab Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 22:42:59 -0300 Subject: [PATCH 34/37] feat: Add gRPC client connection --- src/client/client.go | 9 +++++++++ src/client/grpc.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 src/client/client.go create mode 100644 src/client/grpc.go diff --git a/src/client/client.go b/src/client/client.go new file mode 100644 index 0000000..cd08baa --- /dev/null +++ b/src/client/client.go @@ -0,0 +1,9 @@ +package client + +type ( + Client interface { + Connect() + Close() error + Ping() error + } +) diff --git a/src/client/grpc.go b/src/client/grpc.go new file mode 100644 index 0000000..0859a36 --- /dev/null +++ b/src/client/grpc.go @@ -0,0 +1,39 @@ +package client + +import ( + "context" + + "github.com/fiuskyws/pegasus/src/proto" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +type ( + gRPC struct { + conn *grpc.ClientConn + client proto.PegasusClient + } +) + +func NewGRPC(target string) Client { + conn, err := grpc.Dial(target) + if err != nil { + zap.L().Panic(err.Error()) + } + client := proto.NewPegasusClient(conn) + return &gRPC{ + conn: conn, + client: client, + } +} + +func (g *gRPC) Ping() error { + _, err := g.client.GetTopics(context.Background(), &proto.GetTopicsRequest{}) + return err +} + +func (g *gRPC) Connect() {} + +func (g *gRPC) Close() error { + return g.conn.Close() +} From fc507c39babe2b31470305cf15857a55f7ade2b5 Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 22:45:11 -0300 Subject: [PATCH 35/37] feat: Add GetClient method in client --- README.md | 14 +++++++------- src/client/client.go | 3 +++ src/client/grpc.go | 6 ++++++ 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index a087048..96babd3 100644 --- a/README.md +++ b/README.md @@ -2,10 +2,10 @@ Simple Message Broker service written in Go. ## TODO: -- [ ] Alpha release: - - [x] Pub/Sub Logic - - [x] Client/Server - - [x] Makefile: - - [x] `run` - - [x] `test` - - [ ] Test Coverage > 50% + +### Alpha Release +- [x] Pub/Sub Logic +- [x] Server +- [x] Makefile: + - [x] `run` + - [x] `test` diff --git a/src/client/client.go b/src/client/client.go index cd08baa..bcf489b 100644 --- a/src/client/client.go +++ b/src/client/client.go @@ -1,9 +1,12 @@ package client +import "github.com/fiuskyws/pegasus/src/proto" + type ( Client interface { Connect() Close() error Ping() error + GetClient() proto.PegasusClient } ) diff --git a/src/client/grpc.go b/src/client/grpc.go index 0859a36..4662891 100644 --- a/src/client/grpc.go +++ b/src/client/grpc.go @@ -27,6 +27,12 @@ func NewGRPC(target string) Client { } } +// TODO: +// - Implement a REAL Client, not just this $hit +func (g *gRPC) GetClient() proto.PegasusClient { + return g.client +} + func (g *gRPC) Ping() error { _, err := g.client.GetTopics(context.Background(), &proto.GetTopicsRequest{}) return err From bd5a2ec723bc9569e0a316366a3af4f09234d51d Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 22:50:56 -0300 Subject: [PATCH 36/37] feat: Update README.md --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 96babd3..d5a412b 100644 --- a/README.md +++ b/README.md @@ -9,3 +9,7 @@ Simple Message Broker service written in Go. - [x] Makefile: - [x] `run` - [x] `test` + +## Disclaimer + +This repository still in research and `PoC's` phase, meaning, it is NOT ready to be used in production environments. (if it'll be) From 8495874c851e4fb4f26f900c16df3328c34cd1da Mon Sep 17 00:00:00 2001 From: Rafael Breno Date: Mon, 17 Apr 2023 22:53:56 -0300 Subject: [PATCH 37/37] fix: reamde --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d5a412b..a505f31 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ Simple Message Broker service written in Go. ## TODO: -### Alpha Release +### POC Release 1 - [x] Pub/Sub Logic - [x] Server - [x] Makefile: