Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

K8s headless svc plugin #150

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ Thumbs.db
/.vscode/
/.idea/
.vim/

.idea
*coverage.out
733 changes: 733 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

41 changes: 41 additions & 0 deletions v4/registry/kubernetes-headless-svc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so

# Folders
_obj
_test
_build
.DS_Store
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out

*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*

_testmain.go

*.exe
*.test
*.prof

# ignore go build and test outputs
coverage.txt
coverage.out

# ignore locally built binaries
micro
!micro/
dist

*.bak
*.log
test/services
.air.toml
tmp
.idea
144 changes: 144 additions & 0 deletions v4/registry/kubernetes-headless-svc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
Kubernetes Registry Plugin for micro
---

## about kubernetes-headless-svc registry

The current project is a go-micro register plug-in.
When we deploy the go-micro grpc server with k8s, if we use the built-in Service kind of k8s
to deploy the pod of grpc server, it is not possible to perform grpc http2.0, so I use the
headless Service mode to deploy, and we can use the headless Service as a dns host, so we
can use go package `net` to do dns resolution, and then obtain the podIp record of headless
Service endpoints return, realize the function of grpc service discovery.
## how to use kubernetes-headless-svc registry
```go
package main

import (
"fmt"
"github.com/go-micro-v4-demo/frontend/handler"
helloworldPb "github.com/go-micro-v4-demo/helloworld/proto"
userPb "github.com/go-micro-v4-demo/user/proto"
mgrpc "github.com/go-micro/plugins/v4/client/grpc"
mhttp "github.com/go-micro/plugins/v4/server/http"
"github.com/gorilla/mux"
k8sHeadlessSvc "github.com/gsmini/k8s-headless-svc"
"go-micro.dev/v4/logger"
"net/http"
)

var (
service = "frontend"
version = "latest"
)

const K8sSvcName = "user-svc"

const UserSvcName = "user-svc" //the name of Service.meta.name in k8s
const HelloWordSvcName = "helloworld" //the name of Service.meta.name in k8s
func main() {
UserSvc := &k8sHeadlessSvc.Service{Namespace: "default", SvcName: UserSvcName, PodPort: 8080}
//HelloWordSvc := &k8sHeadlessSvc.Service{Namespace: "default", SvcName: HelloWordSvcName, PodPort: 9090}
reg := k8sHeadlessSvc.NewRegistry([]*k8sHeadlessSvc.Service{UserSvc})
// when frontend has many grpc server, u can use like this
//reg := k8sHeadlessSvc.NewRegistry([]*k8sHeadlessSvc.Service{UserSvc},[]*k8sHeadlessSvc.Service{HelloWordSvcName})
srv := micro.NewService()
srv.Init(
micro.Name(service),
micro.Version(version),
micro.Address("0.0.0.0:8080"),
micro.Registry(reg),//registry our k8sHeadlessSvc registry
)
//Omit unimportant code ...
}
```

## Core code of kubernetes-headless-svc
```go
package main
import (
"fmt"
"net"
)

func main() {
//we just use net.LookupIP to get ip address of www.twitter.com
ipRecords, err := net.LookupIP("www.twitter.com")
if err != nil {
panic(err)
}
for _, value := range ipRecords {
fmt.Println(value.String())
}
}
```

```shell
104.244.42.193
```
> it is the same of this command: 'nslookup www.twitter.com'
```shell
Server: 8.8.8.8
Address: 8.8.8.8#53

Non-authoritative answer:
www.twitter.com canonical name = twitter.com.
Name: twitter.com
Address: 104.244.42.193

```


When deploying the grpc server, configure the sessionAffinity(session affinity) for the Service
to ensure that the grpc server can return messages properly after receiving them
```yaml
apiVersion: v1
kind: Service
metadata:
name: user-svc
namespace: default
spec:
clusterIP: None
ports:
- port: 8080
selector:
app: user

sessionAffinity: ClientIP
sessionAffinityConfig:
clientIP:
timeoutSeconds: 3600
```
## examples
See the fronted and user projects under examples directory
### deploy microservice
```shell
kubectl apply -f examples/fronted/k8s.yaml
kubectl apply -f examples/user/k8s.yaml
```
> deploy our microservice both user and fronted service

### list svc in k8s
```shell
root@hecs-410147:# kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
frontend-svc ClusterIP 10.108.199.130 <none> 80/TCP 40h
```
### requests frontend-svc clusterIp
```shell

curl http://10.108.199.130/index
```
### check log of user microservice
```shell
root@hecs-410147:~# kubectl logs user-5cdd5697f-vr5db
2023-04-09 22:22:21 file=build/main.go:33 level=info Starting [service] user
2023-04-09 22:22:21 file=v4@v4.9.0/service.go:96 level=info Transport [http] Listening on [::]:8080
2023-04-09 22:22:21 file=v4@v4.9.0/service.go:96 level=info Broker [http] Connected to 127.0.0.1:33039
2023-04-09 22:22:21 file=server/rpc_server.go:832 level=info Registry [memory] Registering node: user-defaaa6b-7314-4757-bb47-9a1ea6043d0d
2023-04-11 20:46:35 file=handler/user.go:16 level=info Received User.Call request: name:"gsmini@sina.cn"
2023-04-11 21:23:35 file=handler/user.go:16 level=info Received User.Call request: name:"gsmini@sina.cn"
2023-04-11 21:25:00 file=handler/user.go:16 level=info Received User.Call request: name:"gsmini@sina.cn"
2023-04-11 21:35:39 file=handler/user.go:16 level=info Received User.Call request: name:"gsmini@sina.cn"
2023-04-11 21:35:49 file=handler/user.go:16 level=info Received User.Call request: name:"gsmini@sina.cn"
```
> so we can find the request log from fronted application
28 changes: 28 additions & 0 deletions v4/registry/kubernetes-headless-svc/dns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Package k8sheadlesssvc /*
package k8sheadlesssvc

import (
"fmt"
"net"
)

// dns for headless service in k8s: $(service_name).$(k8s_namespace).svc.cluster.local
// ipMaps data like this: { "user-svc":["127.0.0.1:8080","127.0.0.1:8081"] } .
func getDNSForPodIP(svc []*Service) (map[string][]string, error) {
ipMaps := make(map[string][]string, 10)

for _, value := range svc {
dnsForK8sSvc := fmt.Sprintf("%s.%s.svc.cluster.local", value.SvcName, value.Namespace)
ipRecords, err := net.LookupIP(dnsForK8sSvc)

if err != nil {
return nil, err
}

for _, ip := range ipRecords {
ipMaps[value.SvcName] = append(ipMaps[value.SvcName], fmt.Sprintf("%s:%d", ip.String(), value.PodPort))
}
}

return ipMaps, nil
}
12 changes: 12 additions & 0 deletions v4/registry/kubernetes-headless-svc/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/go-micro/plugins/v4/registry/k8s-headless-svc

go 1.18

require go-micro.dev/v4 v4.10.2

require (
github.com/google/uuid v1.3.0 // indirect
github.com/miekg/dns v1.1.43 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
)
18 changes: 18 additions & 0 deletions v4/registry/kubernetes-headless-svc/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg=
github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4=
go-micro.dev/v4 v4.10.2 h1:GWQf1+FcAiMf1yca3P09RNjB31Xtk0C5HiKHSpq/2qA=
go-micro.dev/v4 v4.10.2/go.mod h1:RV2AolXjTAil9Xm82QCMo1gknuZwD61oMUH14wJpECk=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
112 changes: 112 additions & 0 deletions v4/registry/kubernetes-headless-svc/k8s-headless-svc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package k8sheadlesssvc

import (
"go-micro.dev/v4/registry"
)

// about services within the registry.
type k8sSvcWatcher struct {
}

func (k *k8sSvcWatcher) Next() (*registry.Result, error) {
return &registry.Result{}, nil
}
func (k *k8sSvcWatcher) Stop() {}

// Service saves the microservice u call.
type Service struct {
Namespace string // namespace of microservice u call in k8s
SvcName string // Service name of microservice u call in k8s
PodPort int32 // the port of container u deploy in k8s which is the value of containerPort
}
type k8sSvcRegister struct {
k8sService []*Service
opts *registry.Options
}

func (k *k8sSvcRegister) Init(opts ...registry.Option) error {
for _, o := range opts {
o(k.opts)
}

return nil
}
func (k *k8sSvcRegister) Options() registry.Options {
return registry.Options{}
}

// Register The resolution dns returns the pod id
// Since we intend to register self-discovery endpoints with k8s service,
// we do not need to write the registration discovery logic ourselves.
func (k *k8sSvcRegister) Register(*registry.Service, ...registry.RegisterOption) error {
return nil
}

// Deregister The resolution dns returns the pod id
// Since we intend to register self-discovery endpoints with k8s service,
// we do not need to write the registration discovery logic ourselves.
func (k *k8sSvcRegister) Deregister(*registry.Service, ...registry.DeregisterOption) error {
return nil
}

// GetService get service from endpoints of Service.
func (k *k8sSvcRegister) GetService(string, ...registry.GetOption) ([]*registry.Service, error) {
service := make([]*registry.Service, 64)
nodes := make([]*registry.Node, 64)

ipMaps, err := getDNSForPodIP(k.k8sService)
if err != nil {
return service, err
}

for svcName, ips := range ipMaps {
for _, ip := range ips {
nodes = append(nodes, &registry.Node{Address: ip})
}

service = append(service, &registry.Service{Name: svcName, Version: "latest", Nodes: nodes})
}

return service, nil
}

// ListServices get service from endpoints of Service.
func (k *k8sSvcRegister) ListServices(...registry.ListOption) ([]*registry.Service, error) {
service := make([]*registry.Service, 64)
nodes := make([]*registry.Node, 64)
ipMaps, err := getDNSForPodIP(k.k8sService)

if err != nil {
return []*registry.Service{}, err
}

for svcName, ips := range ipMaps {
for _, ip := range ips {
nodes = append(nodes, &registry.Node{Address: ip})
}

service = append(service, &registry.Service{Name: svcName, Version: "latest", Nodes: nodes})
}

return service, nil
}

// Watch Since we intend to register self-discovery endpoints with k8s service,
// we do not need to write the registration discovery logic ourselves.
func (k *k8sSvcRegister) Watch(_ ...registry.WatchOption) (registry.Watcher, error) {
return &k8sSvcWatcher{}, nil
}

func (k *k8sSvcRegister) String() string {
return "k8s-headless-svc"
}

// NewRegistry creates a kubernetes registry.
func NewRegistry(k8sService []*Service, opts ...registry.Option) registry.Registry {
k := k8sSvcRegister{
k8sService: k8sService,
opts: &registry.Options{},
}

return &k
}