Skip to content

Commit

Permalink
Merge branch 'main' into fix-api-flags-again
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso committed Jan 16, 2025
2 parents 822b05d + 397af3a commit 407c112
Show file tree
Hide file tree
Showing 59 changed files with 360 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ jobs:
org.opencontainers.image.vendor=ConduitIO
- name: Build and push Docker image
uses: docker/build-push-action@v6.11.0
uses: docker/build-push-action@v6.12.0
with:
context: .
push: true
Expand Down
68 changes: 68 additions & 0 deletions cmd/conduit/api/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"
"fmt"

apiv1 "github.com/conduitio/conduit/proto/api/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
)

type Client struct {
conn *grpc.ClientConn
apiv1.PipelineServiceClient
healthgrpc.HealthClient
}

func NewClient(ctx context.Context, address string) (*Client, error) {
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC client: %w", err)
}

client := &Client{
conn: conn,
PipelineServiceClient: apiv1.NewPipelineServiceClient(conn),
HealthClient: healthgrpc.NewHealthClient(conn),
}

if err := client.CheckHealth(ctx, address); err != nil {
client.Close()
return nil, err
}

return client, nil
}

func (c *Client) CheckHealth(ctx context.Context, address string) error {
healthResp, err := c.HealthClient.Check(ctx, &healthgrpc.HealthCheckRequest{})
if err != nil || healthResp.Status != healthgrpc.HealthCheckResponse_SERVING {
return fmt.Errorf("we couldn't connect to Conduit at the configured address %q\n"+
"Please execute `conduit run` to start it.\nTo check the current configured `api.grpc.address`, run `conduit config`\n\n"+
"Error details: %v", address, err)
}
return nil
}

func (c *Client) Close() error {
return c.conn.Close()
}
106 changes: 106 additions & 0 deletions cmd/conduit/cecdysis/decorators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cecdysis

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/ecdysis"
"github.com/spf13/cobra"
)

// ------------------- CommandWithClient

// CommandWithExecuteWithClient can be implemented by a command that requires a client to interact
// with the Conduit API during the execution.
type CommandWithExecuteWithClient interface {
ecdysis.Command

// ExecuteWithClient is the actual work function. Most commands will implement this.
ExecuteWithClient(context.Context, *api.Client) error
}

// CommandWithExecuteWithClientDecorator is a decorator that adds a Conduit API client to the command execution.
type CommandWithExecuteWithClientDecorator struct{}

func (CommandWithExecuteWithClientDecorator) Decorate(_ *ecdysis.Ecdysis, cmd *cobra.Command, c ecdysis.Command) error {
v, ok := c.(CommandWithExecuteWithClient)
if !ok {
return nil
}

old := cmd.RunE
cmd.RunE = func(cmd *cobra.Command, args []string) error {
if old != nil {
err := old(cmd, args)
if err != nil {
return err
}
}

grpcAddress, err := getGRPCAddress(cmd)
if err != nil {
return fmt.Errorf("error reading gRPC address: %w", err)
}

client, err := api.NewClient(cmd.Context(), grpcAddress)
if err != nil {
// Not an error we need to escalate to the main CLI execution. We'll print it out and not execute further.
_, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
return nil
}
defer client.Close()

ctx := ecdysis.ContextWithCobraCommand(cmd.Context(), cmd)
return v.ExecuteWithClient(ctx, client)
}

return nil
}

func getGRPCAddress(cmd *cobra.Command) (string, error) {
var (
path string
err error
)

path, err = cmd.Flags().GetString("config.path")
if err != nil || path == "" {
path = conduit.DefaultConfig().ConduitCfg.Path
}

var usrCfg conduit.Config
defaultConfigValues := conduit.DefaultConfigWithBasePath(filepath.Dir(path))

cfg := ecdysis.Config{
EnvPrefix: "CONDUIT",
Parsed: &usrCfg,
Path: path,
DefaultValues: defaultConfigValues,
}

// If it can't be parsed, we return the default value
err = ecdysis.ParseConfig(cfg, cmd)
if err != nil || usrCfg.API.GRPC.Address == "" {
return defaultConfigValues.API.GRPC.Address, nil
}

return usrCfg.API.GRPC.Address, nil
}
3 changes: 2 additions & 1 deletion cmd/conduit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ import (
"fmt"
"os"

"github.com/conduitio/conduit/cmd/conduit/cecdysis"
"github.com/conduitio/conduit/cmd/conduit/root"
"github.com/conduitio/ecdysis"
)

func main() {
e := ecdysis.New()
e := ecdysis.New(ecdysis.WithDecorators(cecdysis.CommandWithExecuteWithClientDecorator{}))

cmd := e.MustBuildCobraCommand(&root.RootCommand{})
cmd.CompletionOptions.DisableDefaultCmd = true
Expand Down
89 changes: 89 additions & 0 deletions cmd/conduit/root/pipelines/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright © 2025 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pipelines

import (
"context"
"fmt"

"github.com/alexeyco/simpletable"
"github.com/conduitio/conduit/cmd/conduit/api"
"github.com/conduitio/conduit/cmd/conduit/cecdysis"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
"github.com/conduitio/ecdysis"
)

var (
_ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil)
_ ecdysis.CommandWithAliases = (*ListCommand)(nil)
_ ecdysis.CommandWithDocs = (*ListCommand)(nil)
)

type ListCommand struct{}

func (c *ListCommand) Docs() ecdysis.Docs {
return ecdysis.Docs{
Short: "List existing Conduit pipelines",
Long: `This command requires Conduit to be already running since it will list all pipelines registered
by Conduit. This will depend on the configured pipelines directory, which by default is /pipelines; however, it could
be configured via --pipelines.path at the time of running Conduit.`,
Example: "conduit pipelines ls",
}
}

func (c *ListCommand) Aliases() []string { return []string{"ls"} }

func (c *ListCommand) Usage() string { return "list" }

func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error {
resp, err := client.PipelineServiceClient.ListPipelines(ctx, &apiv1.ListPipelinesRequest{})
if err != nil {
return fmt.Errorf("failed to list pipelines: %w", err)
}

displayPipelines(resp.Pipelines)

return nil
}

func displayPipelines(pipelines []*apiv1.Pipeline) {
if len(pipelines) == 0 {
return
}

table := simpletable.New()

table.Header = &simpletable.Header{
Cells: []*simpletable.Cell{
{Align: simpletable.AlignCenter, Text: "ID"},
{Align: simpletable.AlignCenter, Text: "STATE"},
{Align: simpletable.AlignCenter, Text: "CREATED"},
{Align: simpletable.AlignCenter, Text: "LAST_UPDATED"},
},
}

for _, p := range pipelines {
r := []*simpletable.Cell{
{Align: simpletable.AlignRight, Text: p.Id},
{Align: simpletable.AlignLeft, Text: p.State.Status.String()},
{Align: simpletable.AlignLeft, Text: p.CreatedAt.AsTime().String()},
{Align: simpletable.AlignLeft, Text: p.UpdatedAt.AsTime().String()},
}

table.Body.Cells = append(table.Body.Cells, r)
}
table.SetStyle(simpletable.StyleCompact)
fmt.Println(table.String())
}
4 changes: 4 additions & 0 deletions cmd/conduit/root/pipelines/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import (
var (
_ ecdysis.CommandWithDocs = (*PipelinesCommand)(nil)
_ ecdysis.CommandWithSubCommands = (*PipelinesCommand)(nil)
_ ecdysis.CommandWithAliases = (*PipelinesCommand)(nil)
)

type PipelinesCommand struct{}

func (c *PipelinesCommand) Aliases() []string { return []string{"pipeline"} }

func (c *PipelinesCommand) SubCommands() []ecdysis.Command {
return []ecdysis.Command{
&InitCommand{},
&ListCommand{},
}
}

Expand Down
6 changes: 5 additions & 1 deletion cmd/conduit/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ var (

type RootFlags struct {
Version bool `long:"version" short:"v" usage:"show the current Conduit version"`

// Global Flags
GRPCAddress string `long:"api.grpc.address" usage:"address where Conduit is running" persistent:"true"`
ConfigPath string `long:"config.path" usage:"path to the configuration file" persistent:"true"`
}

type RootCommand struct {
Expand Down Expand Up @@ -77,6 +81,6 @@ func (c *RootCommand) SubCommands() []ecdysis.Command {
&initialize.InitCommand{Cfg: &runCmd.Cfg},
&version.VersionCommand{},
&pipelines.PipelinesCommand{},
&run.RunCommand{},
runCmd,
}
}
2 changes: 2 additions & 0 deletions cmd/conduit/root/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func TestRootCommandFlags(t *testing.T) {
persistent bool
}{
{longName: "version", shortName: "v", usage: "show the current Conduit version"},
{longName: "api.grpc.address", usage: "address where Conduit is running", persistent: true},
{longName: "config.path", usage: "path to the configuration file", persistent: true},
}

e := ecdysis.New()
Expand Down
7 changes: 7 additions & 0 deletions cmd/conduit/root/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package run

import (
"context"
"fmt"
"os"
"path/filepath"

Expand All @@ -42,6 +43,12 @@ type RunCommand struct {

func (c *RunCommand) Execute(_ context.Context) error {
e := &conduit.Entrypoint{}

if !c.Cfg.API.Enabled {
fmt.Print("Warning: API is currently disabled. Most Conduit CLI commands won't work without the API enabled." +
"To enable it, run conduit with `--api.enabled=true` or set `CONDUIT_API_ENABLED=true` in your environment.")
}

e.Serve(c.Cfg)
return nil
}
Expand Down
Loading

0 comments on commit 407c112

Please # to comment.