Skip to content

feat: added service delivery agent #5

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions cmd/cx-delivery/delivery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"encoding/json"

"github.com/spf13/viper"
. "github.com/valmi-io/cx-pipeline/internal/log"
util "github.com/valmi-io/cx-pipeline/internal/util"
)

type ChannelTopic struct {
LinkID string `json:"link_id"`
WriteKey string `json:"write_key"`
storefront string
channel string
}

func delivery(msg string) {
Log.Info().Msgf("processing msg %v", msg)

var event map[string]interface{}
if unmarshalErr := json.Unmarshal([]byte(msg), &event); unmarshalErr != nil {
Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr)
return
}

jsonPayload := `{"channel_in": ["postgres"], "channel_not_in": [""]}`
data, _, err := util.PostUrl(
viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics",
[]byte(jsonPayload),
util.SetConfigAuth,
nil)
if err != nil {
Log.Error().Msgf("Error fetching processor destination")
}

var channelTopics []ChannelTopic
if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil {
Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr)
return
}

// TODO: send the msg to only particular store write_key(delivery)
for _, ct := range channelTopics {
headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey}
eventBytes, _ := json.Marshal(event)
_, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems)
if err != nil {
Log.Error().Msgf("error sending request to Jitsu: %v", err)
}
}

}
50 changes: 50 additions & 0 deletions cmd/cx-delivery/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package main

import (
"os"
"os/signal"
"syscall"

"github.com/spf13/viper"
"github.com/valmi-io/cx-pipeline/internal/configstore"
"github.com/valmi-io/cx-pipeline/internal/env"
. "github.com/valmi-io/cx-pipeline/internal/log"
. "github.com/valmi-io/cx-pipeline/internal/msgbroker"
)

func main() {
Log.Info().Msgf("delivery agent started")
env.InitConfig()
Log.Info().Msg(viper.GetString("APP_BACKEND_URL"))
Log.Info().Msg(viper.GetString("KAFKA_BROKER"))

// initialize ConfigStore
jsonPayload := `{"channel_in": ["processor"], "channel_not_in": ["x", "y"]}`
cs, err := configstore.Init(jsonPayload)
if err != nil {
Log.Fatal().Msg(err.Error())
}

// initialize Broker
topicMan, err := InitBroker(delivery)
if err != nil {
Log.Fatal().Msg(err.Error())
}

// Connect ConfigStore and Broker
cs.AttachTopicMan(topicMan)

cleanupChan := make(chan bool)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
cleanupChan <- true
}()

<-cleanupChan
Log.Info().Msg("Received an interrupt signal, shutting down...")
cs.Close() // close ConfigStore to stop refreshing IFTTTs
topicMan.Close() // close broker topic management
}
3 changes: 2 additions & 1 deletion cmd/cx-processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func main() {
Log.Info().Msg(viper.GetString("KAFKA_BROKER"))

// initialize ConfigStore
cs, err := configstore.Init()
jsonPayload := `{"channel_in": ["chatbox"], "channel_not_in": ["x", "y"]}`
cs, err := configstore.Init(jsonPayload)
if err != nil {
Log.Fatal().Msg(err.Error())
}
Expand Down
45 changes: 45 additions & 0 deletions cmd/cx-processor/processor.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,54 @@
package main

import (
"encoding/json"

"github.com/spf13/viper"
. "github.com/valmi-io/cx-pipeline/internal/log"
util "github.com/valmi-io/cx-pipeline/internal/util"
)

type ChannelTopic struct {
LinkID string `json:"link_id"`
WriteKey string `json:"write_key"`
storefront string
channel string
}

func processor(msg string) {
Log.Info().Msgf("processing msg %v", msg)

var event map[string]interface{}
if unmarshalErr := json.Unmarshal([]byte(msg), &event); unmarshalErr != nil {
Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr)
return
}
event["processed"] = true

jsonPayload := `{"channel_in": ["processor"], "channel_not_in": [""]}`
data, _, err := util.PostUrl(
viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics",
[]byte(jsonPayload),
util.SetConfigAuth,
nil)
if err != nil {
Log.Error().Msgf("Error fetching processor destination")
}

var channelTopics []ChannelTopic
if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil {
Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr)
return
}

// TODO: send the msg to only particular store write_key(processor)
for _, ct := range channelTopics {
headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey}
eventBytes, _ := json.Marshal(event)
_, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems)
if err != nil {
Log.Error().Msgf("error sending request to Jitsu: %v", err)
}
}

}
11 changes: 5 additions & 6 deletions internal/configstore/channel_topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type ChannelTopics struct {
topicMan *TopicMan
}

func fetchChannelTopics(currentCT *ChannelTopics) []ChannelTopic {
jsonPayload := `{"channel_in": ["chatbox"], "channel_not_in": ["x", "y"]}`
func fetchChannelTopics(currentCT *ChannelTopics, jsonPayload string) []ChannelTopic {
data, respCode, err := util.PostUrl(
viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics",
[]byte(jsonPayload),
util.SetConfigAuth)
util.SetConfigAuth,
nil)

Log.Info().Msg(data)
Log.Info().Msgf("%v", respCode)
Expand Down Expand Up @@ -86,11 +86,10 @@ func matchChannelState(newCT []ChannelTopic, currentCT []ChannelTopic, topicMan

// var i int = 0

func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) {
func initChannelTopics(wg *sync.WaitGroup, jsonPayload string) (*ChannelTopics, error) {
d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL"))
ticker := time.NewTicker(d)
channelTopics := &ChannelTopics{done: make(chan bool)}

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -102,7 +101,7 @@ func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) {
return
case t := <-ticker.C:
Log.Debug().Msgf("ChannelTopics Refresh Tick at %v", t)
newChannels := fetchChannelTopics(channelTopics)
newChannels := fetchChannelTopics(channelTopics, jsonPayload)
channelTopics.mu.Lock()
channelTopics.Channels = newChannels
channelTopics.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions internal/configstore/configstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ type ConfigStore struct {
wg *sync.WaitGroup
}

func Init() (*ConfigStore, error) {
func Init(jsonPayload string) (*ConfigStore, error) {
var wg sync.WaitGroup
channelTopics, error := initChannelTopics(&wg)
channelTopics, error := initChannelTopics(&wg, jsonPayload)
if error != nil {
return nil, error
}
Expand Down
1 change: 0 additions & 1 deletion internal/configstore/storefront_ifttt.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) {
d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL"))
ticker := time.NewTicker(d)
storefrontIfttts := &StorefrontIfttts{done: make(chan bool)}

wg.Add(1)
go func() {
defer wg.Done()
Expand Down
6 changes: 5 additions & 1 deletion internal/util/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func GetUrl(url string, setAuth func(*http.Request)) (string, int, error) {

}

func PostUrl(url string, body []byte, setAuth func(*http.Request)) (string, int, error) {
func PostUrl(url string, body []byte, setAuth func(*http.Request), headerItems map[string]string) (string, int, error) {
client := http.Client{Timeout: 5 * time.Second}

req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(body))
Expand All @@ -64,6 +64,10 @@ func PostUrl(url string, body []byte, setAuth func(*http.Request)) (string, int,
setAuth(req)
}

for headerKey, headerValue := range headerItems {
req.Header.Set(headerKey, headerValue)
}

resp, err := client.Do(req)
if err != nil {
panic(err)
Expand Down