Skip to content

Commit

Permalink
Merge pull request #37 from flanksource/moshloop
Browse files Browse the repository at this point in the history
fix: postgres readiness checks
  • Loading branch information
moshloop authored Apr 27, 2021
2 parents 9022a11 + 438dbe2 commit 981c824
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 15 deletions.
1 change: 1 addition & 0 deletions apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (c *Client) Apply(namespace string, objects ...runtime.Object) error {
if IsNil(obj) {
continue
}

client, _, unstructuredObj, err := c.GetDynamicClientFor(namespace, obj)
if IsAPIResourceMissing(err) {
if err := c.WaitForAPIResource(unstructuredObj.GetAPIVersion(), unstructuredObj.GetKind(), 3*time.Minute); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ func (c *Client) GetClientByKind(kind string) (dynamic.NamespaceableResourceInte
}

func (c *Client) GetDynamicClientFor(namespace string, obj runtime.Object) (dynamic.ResourceInterface, *schema.GroupVersionResource, *unstructured.Unstructured, error) {
if obj.GetObjectKind().GroupVersionKind().Kind == "" {
return nil, nil, nil, fmt.Errorf("cannot apply object, missing kind: %v", obj)
}
dynamicClient, err := c.GetDynamicClient()
if err != nil {
return nil, nil, nil, perrors.Wrap(err, "failed to get dynamic client")
Expand Down
20 changes: 12 additions & 8 deletions shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,18 @@ func (c *Client) CreateOrUpdateNamespace(name string, labels, annotations map[st
cm, err := ns.Get(context.TODO(), name, metav1.GetOptions{})

if cm == nil || err != nil {
cm = &v1.Namespace{}
cm = &v1.Namespace{
TypeMeta: metav1.TypeMeta{
Kind: "Namespace",
APIVersion: "v1",
},
}
cm.Name = name
cm.Labels = labels
cm.Annotations = annotations

if !c.ApplyDryRun {
if _, err := ns.Create(context.TODO(), cm, metav1.CreateOptions{}); err != nil {
return err
}
return c.Apply("", cm)
}
} else {
// update incoming and current labels
Expand All @@ -69,10 +72,12 @@ func (c *Client) CreateOrUpdateNamespace(name string, labels, annotations map[st
(*cm).Name = name
(*cm).Labels = labels
(*cm).Annotations = annotations
(*cm).TypeMeta = metav1.TypeMeta{
Kind: "Namespace",
APIVersion: "v1",
}
if !c.ApplyDryRun {
if _, err := ns.Update(context.TODO(), cm, metav1.UpdateOptions{}); err != nil {
return err
}
return c.Apply("", cm)
}
return nil
}
Expand Down Expand Up @@ -360,7 +365,6 @@ func (c *Client) GetEnvValue(input EnvVar, namespace string) (string, string, er
return "", "", perrors.New("could not extract value from incomplete EnvVar")
}


func (c *Client) GetConditionsForNode(name string) (map[v1.NodeConditionType]v1.ConditionStatus, error) {
client, err := c.GetClientset()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ func IsService(obj *unstructured.Unstructured) bool {
func IsServiceAccount(obj *unstructured.Unstructured) bool {
return obj.GetKind() == "ServiceAccount"
}

func IsIngress(obj *unstructured.Unstructured) bool {
return obj.GetKind() == "Ingress"
}
Expand Down Expand Up @@ -390,7 +391,11 @@ func IsRedisFailover(obj *unstructured.Unstructured) bool {
}

func IsPostgresql(obj *unstructured.Unstructured) bool {
return obj.GetKind() == "postgresql"
return strings.ToLower(obj.GetKind()) == "postgresql"
}

func IsPostgresqlDB(obj *unstructured.Unstructured) bool {
return strings.ToLower(obj.GetKind()) == "postgresqldb"
}

func IsMongoDB(obj *unstructured.Unstructured) bool {
Expand Down
63 changes: 57 additions & 6 deletions wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func (c *Client) IsReady(item *unstructured.Unstructured) (bool, string) {
if item == nil {
return false, "⏳ waiting to be created"
}
c.Debugf("[%s] checking readiness", GetName(item))

switch {
case IsSecret(item):
Expand All @@ -211,6 +212,8 @@ func (c *Client) IsReady(item *unstructured.Unstructured) (bool, string) {
return c.IsKibanaReady(item)
case IsRedisFailover(item):
return c.IsRedisFailoverReady(item)
case IsPostgresqlDB(item):
return c.IsPostgresqlDBReady(item)
case IsPostgresql(item):
return c.IsPostgresqlReady(item)
case IsConstraintTemplate(item):
Expand Down Expand Up @@ -352,21 +355,37 @@ func (c *Client) IsRedisFailoverReady(item *unstructured.Unstructured) (bool, st
return stsReady && deplReady, msg
}

func (c *Client) IsPostgresqlDBReady(item *unstructured.Unstructured) (bool, string) {
// PostgresqlDB instances are backed by zalando postgres instances
return c.isPostgresqlReady(item.GetNamespace(), "postgres-"+item.GetName())
}

func (c *Client) IsPostgresqlReady(item *unstructured.Unstructured) (bool, string) {
name := item.GetName()
namespace := item.GetNamespace()
return c.isPostgresqlReady(item.GetNamespace(), item.GetName())
}

func (c *Client) isPostgresqlReady(namespace, name string) (bool, string) {
clientset, err := c.GetClientset()
if err != nil {
return false, fmt.Sprintf("failed to get clientset: %v", err)
}

// zalando postgres instances are backed by a stateful set
sts, err := clientset.AppsV1().StatefulSets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return false, fmt.Sprintf("failed to get sts %s: %v", name, err)
return false, fmt.Sprintf("⏳ waiting for statefulset")
}

return IsStatefulSetReady(sts)
if ready, msg := IsStatefulSetReady(sts); ready {
// once the sts is up, check that the postgres instance is up and serving queries
if err := c.WaitForPodCommand(namespace, name+"-0", "postgres", 30*time.Second, "su", "postgres", "-c", "psql -c 'SELECT 1;'"); err == nil {
return true, ""
} else {
return false, "⏳ waiting for postgres to be running: " + err.Error()
}
} else {
return ready, msg
}
}

func IsStatefulSetReady(sts *appsv1.StatefulSet) (bool, string) {
Expand Down Expand Up @@ -412,6 +431,38 @@ func (c *Client) WaitForJob(ns, name string, timeout time.Duration) error {
}
}

// WaitForPod waits for a pod to be in the specified phase, or returns an
// error if the timeout is exceeded
func (c *Client) WaitForPodByLabel(ns, label string, timeout time.Duration, phases ...v1.PodPhase) (*v1.Pod, error) {
if c.ApplyDryRun {
return &v1.Pod{}, nil
}
client, err := c.GetClientset()
if err != nil {
return nil, err
}
pods := client.CoreV1().Pods(ns)
id := Name{Kind: "Pod", Namespace: ns, Name: label}
start := time.Now()
msg := false
for {
items, _ := pods.List(context.TODO(), metav1.ListOptions{LabelSelector: label})
if items != nil && len(items.Items) > 0 {
return &items.Items[0], nil
}
if start.Add(timeout).Before(time.Now()) {
return nil, fmt.Errorf("timeout exceeded waiting for pod %s", id)
}

if !msg {
c.Infof("%s ⏳ waiting for pod", id)
msg = true
}

time.Sleep(2 * time.Second)
}
}

// WaitForPod waits for a pod to be in the specified phase, or returns an
// error if the timeout is exceeded
func (c *Client) WaitForPod(ns, name string, timeout time.Duration, phases ...v1.PodPhase) error {
Expand Down Expand Up @@ -599,7 +650,7 @@ outerLoop:
}
}

// WaitForPodCommand waits for a command executed in pod to succeed with an exit code of 9
// WaitForPodCommand waits for a command executed in pod to succeed with an exit code of 0
// error if the timeout is exceeded
func (c *Client) WaitForPodCommand(ns, name string, container string, timeout time.Duration, command ...string) error {
if c.ApplyDryRun {
Expand All @@ -612,7 +663,7 @@ func (c *Client) WaitForPodCommand(ns, name string, container string, timeout ti
return nil
}
if start.Add(timeout).Before(time.Now()) {
return fmt.Errorf("timeout exceeded waiting for %s stdout: %s, stderr: %s", name, stdout, stderr)
return fmt.Errorf("timeout exceeded waiting for %s: %v, stdout: %s, stderr: %s", name, command, stdout, stderr)
}
time.Sleep(5 * time.Second)
}
Expand Down

0 comments on commit 981c824

Please # to comment.