diff --git a/edge/services/hellok8sMMS/k8s_mms_consumer/Dockerfile b/edge/services/hellok8sMMS/k8s_mms_consumer/Dockerfile new file mode 100644 index 00000000..31086edf --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_consumer/Dockerfile @@ -0,0 +1,7 @@ +FROM alpine:latest + +RUN apk --no-cache --update add jq bash + +COPY *.sh / +WORKDIR / +CMD /service.sh \ No newline at end of file diff --git a/edge/services/hellok8sMMS/k8s_mms_consumer/Makefile b/edge/services/hellok8sMMS/k8s_mms_consumer/Makefile new file mode 100644 index 00000000..6c3f37a9 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_consumer/Makefile @@ -0,0 +1,22 @@ +# Make targets for building the IBM example hello mms consumer service for edge cluster + +# This imports the variables from horizon/hzn.json. You can ignore these lines, but do not remove them. +-include horizon/.hzn.json.tmp.mk + +# Default ARCH to the architecture of this machines (as horizon/golang describes it) +export ARCH ?= $(shell hzn architecture) + +# Build the docker image for the current architecture +build: + docker build --platform linux/$(ARCH) -t $(DOCKER_IMAGE_BASE)_$(ARCH):$(SERVICE_VERSION) -f ./Dockerfile . + +# Push the docker image +push: + docker push $(DOCKER_IMAGE_BASE)_$(ARCH):$(SERVICE_VERSION) + +clean: + -docker rmi $(DOCKER_IMAGE_BASE)_$(ARCH):$(SERVICE_VERSION) 2> /dev/null || : + +# This imports the variables from horizon/hzn.cfg. You can ignore these lines, but do not remove them. +horizon/.hzn.json.tmp.mk: horizon/hzn.json + @ hzn util configconv -f $< > $@ \ No newline at end of file diff --git a/edge/services/hellok8sMMS/k8s_mms_consumer/horizon/hzn.json b/edge/services/hellok8sMMS/k8s_mms_consumer/horizon/hzn.json new file mode 100644 index 00000000..eb59cbb7 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_consumer/horizon/hzn.json @@ -0,0 +1,8 @@ +{ + "HZN_ORG_ID": "IBM", + "MetadataVars": { + "DOCKER_IMAGE_BASE": "openhorizon/ibm.hello-k8s-mms-consumer", + "SERVICE_NAME": "ibm.hello-k8s-mms-consumer", + "SERVICE_VERSION": "1.0.0" + } +} diff --git a/edge/services/hellok8sMMS/k8s_mms_consumer/service.sh b/edge/services/hellok8sMMS/k8s_mms_consumer/service.sh new file mode 100755 index 00000000..3d04a809 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_consumer/service.sh @@ -0,0 +1,24 @@ +#!/bin/bash + +OBJECT_TYPES_STRING=$MMS_OBJECT_TYPES # operator need to put MMS_OBJECT_TYPES in the deployment as env from the userinput configmap, eg: "model model1 model2" +OBJECT_ID=config.json # this is the file we are watching in this example mms consumer service + # this file is in shared volume. Operator should bind as /ess-store/- + +echo "Object types to check: $OBJECT_TYPES_STRING" +IFS=' ' read -r -a objecttypes <<< "$OBJECT_TYPES_STRING" + +while true; do + for objecttype in "${objecttypes[@]}" + do + MMS_FILE_NAME="/ess-store/$objecttype-$OBJECT_ID" + echo "MMS_FILE_NAME: $MMS_FILE_NAME" + if [[ -f $MMS_FILE_NAME ]]; then + eval $(jq -r 'to_entries[] | .key + "=\"" + .value + "\""' $MMS_FILE_NAME) + echo "$MMS_FILE_NAME: Hello from ${HW_WHO} from objectType: $objecttype, objectId: $OBJECT_ID!" + else + echo "file $MMS_FILE_NAME not found" + fi + + done + sleep 20 +done \ No newline at end of file diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/Dockerfile b/edge/services/hellok8sMMS/k8s_mms_helper/Dockerfile new file mode 100644 index 00000000..4b8aacc2 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/Dockerfile @@ -0,0 +1,11 @@ +FROM alpine:latest + +RUN apk --no-cache --update add curl + +COPY start.sh / +COPY service /usr/local/bin/ + +RUN mkdir -p /ess-store + +WORKDIR / +CMD /start.sh \ No newline at end of file diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/Makefile b/edge/services/hellok8sMMS/k8s_mms_helper/Makefile new file mode 100644 index 00000000..d53385b6 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/Makefile @@ -0,0 +1,25 @@ +# Makefile for hello mms helper service for k8s + +# This imports the variables from horizon/hzn.json. You can ignore these lines, but do not remove them. +-include horizon/.hzn.json.tmp.mk + +# Default ARCH to the architecture of this machines (as horizon/golang describes it) +export ARCH ?= $(shell hzn architecture) + +# Build the docker image for the current architecture +build: + docker build --platform linux/$(ARCH) -t $(DOCKER_IMAGE_BASE)_$(ARCH):$(SERVICE_VERSION) -f ./Dockerfile . + +service: service.go + GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build service.go + +# Push the docker image +push: + docker push $(DOCKER_IMAGE_BASE)_$(ARCH):$(SERVICE_VERSION) + +clean: + -docker rmi $(DOCKER_IMAGE_BASE)_$(ARCH):$(SERVICE_VERSION) 2> /dev/null || : + +# This imports the variables from horizon/hzn.cfg. You can ignore these lines, but do not remove them. +horizon/.hzn.json.tmp.mk: horizon/hzn.json + @ hzn util configconv -f $< > $@ \ No newline at end of file diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/config.json b/edge/services/hellok8sMMS/k8s_mms_helper/config.json new file mode 100644 index 00000000..1199fcb0 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/config.json @@ -0,0 +1,3 @@ +{ + "HW_WHO": "k8sMMS" +} diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/go.mod b/edge/services/hellok8sMMS/k8s_mms_helper/go.mod new file mode 100644 index 00000000..1958aca8 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/go.mod @@ -0,0 +1,13 @@ +module github.com/open-horizon/examples/edge/services/hellok8sMMS/k8s_mms_helper + +go 1.20 + +require ( + github.com/golang/glog v1.2.0 + github.com/open-horizon/edge-sync-service v1.10.1 +) + +require ( + github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 // indirect + golang.org/x/sync v0.6.0 // indirect +) diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/go.sum b/edge/services/hellok8sMMS/k8s_mms_helper/go.sum new file mode 100644 index 00000000..79dfe62c --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/go.sum @@ -0,0 +1,9 @@ +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/open-horizon/edge-sync-service v1.10.1 h1:+b+YTPqyxyhieixaFoV03Bs0Fmy5HGZtOIkhMG8OkMo= +github.com/open-horizon/edge-sync-service v1.10.1/go.mod h1:yCK3f59UHnoLU0Tz2/RhuLGygJFlZoqlP8kpmQ3Gqd4= +github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152 h1:YEvNOMo3ANOQ3AwsU0cCcBA4nKHDLUlyUCRWk5rBf68= +github.com/open-horizon/edge-utilities v0.0.0-20190711093331-0908b45a7152/go.mod h1:YCsJWhuG0VERquI0geFKoneCSOVAyMdSmylGz5OlZdE= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/horizon/.hzn.json.tmp.mk b/edge/services/hellok8sMMS/k8s_mms_helper/horizon/.hzn.json.tmp.mk new file mode 100644 index 00000000..a32c5bac --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/horizon/.hzn.json.tmp.mk @@ -0,0 +1,4 @@ +export HZN_ORG_ID=anaxsquad +export DOCKER_IMAGE_BASE=quay.io/zhangle/hello-k8s-mms-helper +export SERVICE_NAME=hello-k8s-mms-helper +export SERVICE_VERSION=1.0.0 diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/horizon/hzn.json b/edge/services/hellok8sMMS/k8s_mms_helper/horizon/hzn.json new file mode 100644 index 00000000..eae93893 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/horizon/hzn.json @@ -0,0 +1,8 @@ +{ + "HZN_ORG_ID": "IBM", + "MetadataVars": { + "DOCKER_IMAGE_BASE": "openhorizon/ibm.hello-k8s-mms-helper", + "SERVICE_NAME": "ibm.hello-k8s-mms-helper", + "SERVICE_VERSION": "1.0.0" + } +} diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/service b/edge/services/hellok8sMMS/k8s_mms_helper/service new file mode 100755 index 00000000..1922d96f Binary files /dev/null and b/edge/services/hellok8sMMS/k8s_mms_helper/service differ diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/service.go b/edge/services/hellok8sMMS/k8s_mms_helper/service.go new file mode 100644 index 00000000..a54b49c4 --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/service.go @@ -0,0 +1,314 @@ +package main + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "encoding/base64" + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "net/http" + "os" + "strings" + "sync" + "time" + + "github.com/golang/glog" + "github.com/open-horizon/edge-sync-service/common" +) + +const ( + HZN_ESS_AUTH_VAR = "HZN_ESS_AUTH" + HZN_ESS_CERT_VAR = "HZN_ESS_CERT" + HZN_ESS_API_ADDRESS_VAR = "HZN_ESS_API_ADDRESS" + HZN_ESS_API_PORT_VAR = "HZN_ESS_API_PORT" + HZN_ESS_API_PROTOCOL_VAR = "HZN_ESS_API_PROTOCOL" + MMS_OBJECT_TYPES_VAR = "MMS_OBJECT_TYPES" // MMS_OBJECT_TYPES should be passed as an array in the userinput. it will be put into hzn-env- configmap + // operator should set values in hzn-env- configmap as env var in MMS helper and consumer deployment +) + +const ( + MMS_HELPER_STORAGE = "/ess-store" + ESS_AUTH_FILE = "/ess-auth/auth.json" + ESS_CERT_FILE = "/ess-cert/cert.pem" +) + +var essApiAddress string +var essApiPort string + +type AuthenticationCredential struct { + Id string `json:"id"` + Token string `json:"token"` + Version string `json:"version"` +} + +func ReadCredFromAuthFile(authFilePath string) (*AuthenticationCredential, error) { + if authFile, err := os.Open(authFilePath); err != nil { + return nil, fmt.Errorf("unable to open auth file %v, error: %v", authFilePath, err) + } else if bytes, err := ioutil.ReadAll(authFile); err != nil { + return nil, fmt.Errorf("unable to read auth file %v, error: %v", authFilePath, err) + } else { + authObj := new(AuthenticationCredential) + if err := json.Unmarshal(bytes, authObj); err != nil { + return nil, fmt.Errorf("unable to demarshal auth file %v, error: %v", authFilePath, err) + } else { + return authObj, nil + } + } +} + +func getFromEnv(envName string, defaultVal string) string { + val := os.Getenv(envName) + if val == "" { + val = defaultVal + } + return val +} + +func getHttpClient(essCertFile string) (*http.Client, error) { + caCert, err := ioutil.ReadFile(essCertFile) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + t := &http.Transport{ + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + }, + } + + client := http.Client{Transport: t, Timeout: 30 * time.Second} + return &client, nil +} + +func constructHttpGetRequest(url string, auth string) (*http.Request, error) { + if req, err := http.NewRequest("GET", url, nil); err != nil { + return nil, err + } else { + req.Header.Add("Accept", "application/json") + req.Header.Add("Content-Type", "application/json") + req.Header.Add("Authorization", fmt.Sprintf("Basic %v", base64.StdEncoding.EncodeToString([]byte(auth)))) + return req, nil + } +} + +func invokeESSGet(httpClient *http.Client, url string, authValue string, respStruct interface{}) error { + req, err := constructHttpGetRequest(url, authValue) + if err != nil { + return err + } + + httpResp, _ := httpClient.Do(req) + if httpResp != nil && httpResp.Body != nil { + defer httpResp.Body.Close() + } + if httpResp != nil { + glog.V(2).Infof("Get response returns %v for %v", httpResp.Status, url) + + if outBytes, readErr := ioutil.ReadAll(httpResp.Body); readErr != nil { + glog.Errorf("Error reading response body for %v, status code: %v , error was: %v", url, httpResp.Status, readErr) + return readErr + } else if httpResp.StatusCode != 200 && httpResp.StatusCode != 404 { + msg := fmt.Sprintf("Get response returns %v, for %v", httpResp.Status, url) + glog.Errorf(msg) + return errors.New(msg) + } else if httpResp.StatusCode == 404 { + msg := fmt.Sprintf("Get response returns %v, for %v", httpResp.Status, url) + glog.V(3).Infof(msg) + } else { + glog.V(3).Infof("Get response returns %v, body: %v for %v", httpResp.Status, string(outBytes), url) + switch s := respStruct.(type) { + case *[]byte: + *s = outBytes + case *string: + *s = string(outBytes) + default: + if err = json.Unmarshal(outBytes, respStruct); err != nil { + msg := fmt.Sprintf("Error unmarshal the response body from %v, error was: %v", url, err) + glog.Errorf(msg) + return errors.New(msg) + } + } + } + } else { + glog.Errorf("received nil response from Get objects call") + } + return nil +} + +func constructHttpPutRequest(url string, auth string) (*http.Request, error) { + if req, err := http.NewRequest("PUT", url, nil); err != nil { + return nil, err + } else { + req.Header.Add("Authorization", fmt.Sprintf("Basic %v", base64.StdEncoding.EncodeToString([]byte(auth)))) + return req, nil + } +} + +func invokeESSPut(httpClient *http.Client, url string, authValue string) error { + req, err := constructHttpPutRequest(url, authValue) + if err != nil { + return err + } + + httpResp, _ := httpClient.Do(req) + if httpResp != nil && httpResp.Body != nil { + defer httpResp.Body.Close() + } + + if httpResp != nil && httpResp.StatusCode != 204 { + return fmt.Errorf("receive error status code: %v from %v", httpResp.StatusCode, url) + } else if httpResp == nil { + glog.Errorf("received nil response from PUT objects call") + } + return nil +} + +func writeDateStreamToFile(dataReader io.Reader, fileName string) error { + file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, 0600) + if err != nil { + return err + } + defer file.Close() + + if _, err := file.Seek(0, io.SeekStart); err != nil { + return err + } + + if _, err := io.Copy(file, dataReader); err != nil && err != io.EOF { + return err + } + + return nil +} + +func convertToObjectList(objListString string) []string { + objList := strings.Split(objListString, " ") + return objList +} + +func checkingMMSObject(authValue string, objectType string) { + glog.V(2).Infof("Start checking MMS update for objectType: %v", objectType) + + // curl --cacert /path/to/cert.pem https://agent-service..svc.cluster.local:8443/api/v1/objects/ -u /: + getUpdatedObjectUrl := fmt.Sprintf("https://%v:%v/api/v1/objects/%v", essApiAddress, essApiPort, objectType) + + var getUpdatedObjectDataUrl string + var markObjectReceivedUrl string + var markObjectDeletedUrl string + for { + httpClient, err := getHttpClient(ESS_CERT_FILE) + if err != nil { + glog.Error("Error opening cert file %s, Error: %s", ESS_CERT_FILE, err) + return + } + + var metas []common.MetaData + if err := invokeESSGet(httpClient, getUpdatedObjectUrl, authValue, &metas); err != nil { + glog.Errorf("Failed to call ESS API: %v, error was: %v", getUpdatedObjectUrl, err) + } else { + glog.V(3).Infof("Receive %v objects updates", len(metas)) + for _, meta := range metas { + filePath := fmt.Sprintf("%v/%v-%v", MMS_HELPER_STORAGE, meta.ObjectType, meta.ObjectID) + if meta.Deleted { + // this update is about to delete the object + glog.V(3).Infof("MMS file %v/%v was deleted", meta.ObjectType, meta.ObjectID) + + // delete the file /ess-store/- + if err = os.Remove(filePath); err != nil { + glog.Errorf("failed to remove MMS file %v/%v, error was: %v", meta.ObjectType, meta.ObjectID, err) + } + + // ack deleted + // curl -sSLw "%{http_code}" -X PUT ${AUTH} ${CERT} $SOCKET $BASEURL/$OBJECT_TYPE/$OBJECT_ID/deleted + // expect 204 + markObjectDeletedUrl = fmt.Sprintf("%v/%v/deleted", getUpdatedObjectUrl, meta.ObjectID) + if err = invokeESSPut(httpClient, markObjectDeletedUrl, authValue); err != nil { + glog.Errorf("failed to mark object %v/%v as deleted, error was: %v", meta.ObjectType, meta.ObjectID, err) + } + glog.V(3).Infof("mark object %v/%v is marked as 'deleted'", meta.ObjectType, meta.ObjectID) + } else { + // this update is about to get an object + // get object and save to a file in the shared volume: /ess-store/- + //curl -sSLw "%{http_code}" -o $OBJECT_ID ${AUTH} ${CERT} $SOCKET $BASEURL/$OBJECT_TYPE/$OBJECT_ID/data + glog.V(3).Infof("Received new/updated %v/%v from MMS", meta.ObjectType, meta.ObjectID) + getUpdatedObjectDataUrl = fmt.Sprintf("%v/%v/data", getUpdatedObjectUrl, meta.ObjectID) + var dataBytes []byte + + if err = invokeESSGet(httpClient, getUpdatedObjectDataUrl, authValue, &dataBytes); err != nil { + glog.Errorf("Failed to get object data from %v, error was: %v", getUpdatedObjectDataUrl, err) + continue + } + glog.V(3).Infof("get data for %v/%v", meta.ObjectType, meta.ObjectID) + + glog.V(3).Infof("saving data at %v", filePath) + r := bytes.NewReader(dataBytes) + if err := writeDateStreamToFile(r, filePath); err != nil { + glog.Errorf("failed to save data at %v", filePath) + continue + } + + // ack received + // curl -sSLw "%{http_code}" -X PUT ${AUTH} ${CERT} $SOCKET $BASEURL/$OBJECT_TYPE/$OBJECT_ID/received + // expect 200 or 204 + glog.V(3).Infof("mark object %v/%v is received", meta.ObjectType, meta.ObjectID) + markObjectReceivedUrl = fmt.Sprintf("%v/%v/received", getUpdatedObjectUrl, meta.ObjectID) + if err = invokeESSPut(httpClient, markObjectReceivedUrl, authValue); err != nil { + glog.Errorf("failed to mark object %v/%v as received, error was: %v", meta.ObjectType, meta.ObjectID, err) + } + glog.V(3).Infof("mark object %v/%v is marked as 'received'", meta.ObjectType, meta.ObjectID) + } + } + } + time.Sleep(5 * time.Second) + } + +} + +func main() { + glog.V(3).Info("Starting checking updates for MMS objects") + flag.Parse() + /* + env example: + HZN_ESS_API_ADDRESS: agent-service..svc.cluster.local + HZN_ESS_API_PORT: "8443" + HZN_ESS_API_PROTOCOL: secure + + operator should bind these following 2 secrets into the /ess-auth/auth.json and /ess-cert/cert.pem + HZN_ESS_AUTH: ess-auth-46e44a7d46530ecad0719e0ca24797054863717172407d9eb6fd755674c737fb + HZN_ESS_CERT: ess-cert-46e44a7d46530ecad0719e0ca24797054863717172407d9eb6fd755674c737fb + */ + + // get ess env + essApiAddress = getFromEnv(HZN_ESS_API_ADDRESS_VAR, "") + essApiPort = getFromEnv(HZN_ESS_API_PORT_VAR, "8443") + objectTypesAsString := getFromEnv(MMS_OBJECT_TYPES_VAR, "") // need to add MMS_HELPER_OBJECT_TYPES as array in the userinput. echo $MMS_OBJECT_TYPE will return: model model1 model2 model3 + objectTypes := convertToObjectList(objectTypesAsString) + + // Get auth from auth.json + var authValue string + if auth, err := ReadCredFromAuthFile(ESS_AUTH_FILE); err != nil { + glog.Error("error reading /ess-auth/auth.json file") + return + } else { + // auth.json content: {"id":"/abc","token":".........","version":"1.0.0"} + authValue = fmt.Sprintf("%v:%v", auth.Id, auth.Token) + } + + glog.V(3).Infof("Object types are: %v", objectTypesAsString) + glog.V(3).Infof("authValue: %v", authValue) + + var wg sync.WaitGroup + for _, objType := range objectTypes { + wg.Add(1) + go checkingMMSObject(authValue, objType) + } + wg.Wait() + glog.V(3).Info("Done") +} diff --git a/edge/services/hellok8sMMS/k8s_mms_helper/start.sh b/edge/services/hellok8sMMS/k8s_mms_helper/start.sh new file mode 100755 index 00000000..4492434e --- /dev/null +++ b/edge/services/hellok8sMMS/k8s_mms_helper/start.sh @@ -0,0 +1,20 @@ +#!/bin/sh + +# Check env vars that we know should be set to verify that everything is working +function verify { + if [ "$2" == "" ] + then + echo -e "Error: $1 should be set but is not." + exit 2 + fi +} + +verify "HZN_ESS_AUTH" $HZN_ESS_AUTH +verify "HZN_ESS_CERT" $HZN_ESS_CERT +verify "HZN_ESS_API_ADDRESS" $HZN_ESS_API_ADDRESS +verify "HZN_ESS_API_PORT" $HZN_ESS_API_PORT +verify "HZN_ESS_API_PROTOCOL" $HZN_ESS_API_PROTOCOL +verify "MMS_OBJECT_TYPES" $MMS_OBJECT_TYPES +echo -e "All ESS env vars verified." + +/usr/local/bin/service -v 3 -logtostderr \ No newline at end of file