Skip to content

Commit

Permalink
Add support for basic authentication (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc Rooding authored Mar 14, 2019
1 parent 56a4d94 commit b540b0a
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 32 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@ If all went well you should see the word counter continue with where it was.

A list of some example commands to run can be found [here](./docs/example-commands.md).

## Authentication

Apache Flink doesn't support any Web UI authentication out of the box. One of the custom approaches is using NGINX in front of Flink to protect the user interface. With NGINX, there are again a lot of different ways to add that authentication layer. To support the most basic one, we've added support for using Basic Authentication.

You can inject the `FLINK_BASIC_AUTH_USERNAME` and `FLINK_BASIC_AUTH_PASSWORD` environment variables to configure basic authentication.

## Supported environment variables

* FLINK_BASE_URL: Base Url to Flink's API (**required**, e.g. http://jobmanageraddress:8081/)
* FLINK_BASIC_AUTH_USERNAME: Basic authentication username used for authenticating to Flink
* FLINK_BASIC_AUTH_PASSWORD: Basic authentication password used for authenticating to Flink
* FLINK_API_TIMEOUT_SECONDS: Number of seconds until requests to the Flink API time out (e.g. 10)

# Development
Expand Down
4 changes: 1 addition & 3 deletions cmd/cli/flink/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ package flink

import (
"fmt"

retryablehttp "github.com/hashicorp/go-retryablehttp"
)

// Cancel terminates a running job specified by job ID
func (c FlinkRestClient) Cancel(jobID string) error {
req, err := retryablehttp.NewRequest("PATCH", c.constructURL(fmt.Sprintf("jobs/%v", jobID)), nil)
req, err := c.newRequest("PATCH", c.constructURL(fmt.Sprintf("jobs/%v", jobID)), nil)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions cmd/cli/flink/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ func TestCancelReturnsAnErrorWhenTheResponseStatusIsNot202(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/id", "", http.StatusOK, "OK")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
err := api.Cancel("id")

assert.EqualError(t, err, "Unexpected response status 200")
Expand All @@ -22,7 +25,10 @@ func TestCancelShouldNotReturnAnErrorWhenTheResponseStatusIs202(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/id", "", http.StatusAccepted, "")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
err := api.Cancel("id")

assert.Nil(t, err)
Expand Down
23 changes: 21 additions & 2 deletions cmd/cli/flink/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,29 @@ import (
// A FlinkRestClient is a client to interface with
// the Apache Flink REST API
type FlinkRestClient struct {
BaseURL string
Client *retryablehttp.Client
BaseURL string
BasicAuthUsername string
BasicAuthPassword string
Client *retryablehttp.Client
}

func (c FlinkRestClient) constructURL(path string) string {
return fmt.Sprintf("%v/%v", c.BaseURL, path)
}

func basicAuthenticationCredentialsDefined(username, password string) bool {
return len(username) != 0 && len(password) != 0
}

func (c FlinkRestClient) newRequest(method, url string, rawBody interface{}) (*retryablehttp.Request, error) {
req, err := retryablehttp.NewRequest(method, url, rawBody)
if err != nil {
return nil, err
}

if basicAuthenticationCredentialsDefined(c.BasicAuthUsername, c.BasicAuthPassword) == true {
req.SetBasicAuth(c.BasicAuthUsername, c.BasicAuthPassword)
}

return req, err
}
52 changes: 51 additions & 1 deletion cmd/cli/flink/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,63 @@ import (
)

func TestConstructUrlShouldProperlyFormTheCompleteURL(t *testing.T) {
api := FlinkRestClient{"http://localhost:80", &retryablehttp.Client{}}
api := FlinkRestClient{
BaseURL: "http://localhost:80",
Client: &retryablehttp.Client{},
}

url := api.constructURL("jobs")

assert.Equal(t, "http://localhost:80/jobs", url)
}

func TestNewRequestShouldAddTheBasicAuthenticationHeadersWhenTheCredentialsAreSet(t *testing.T) {
api := FlinkRestClient{"http://localhost:80", "username", "password", &retryablehttp.Client{}}

req, err := api.newRequest("GET", "jobs", nil)

assert.Nil(t, err)
assert.Equal(t, "Basic dXNlcm5hbWU6cGFzc3dvcmQ=", req.Header.Get("Authorization"))
}

func TestNewRequestShouldNotAddTheBasicAuthenticationHeadersWhenTheUsernameIsUnset(t *testing.T) {
api := FlinkRestClient{
BaseURL: "http://localhost:80",
BasicAuthPassword: "password",
Client: &retryablehttp.Client{},
}

req, err := api.newRequest("GET", "jobs", nil)

assert.Nil(t, err)
assert.Equal(t, "", req.Header.Get("Authorization"))
}

func TestNewRequestShouldNotAddTheBasicAuthenticationHeadersWhenThePassworsdIsUnset(t *testing.T) {
api := FlinkRestClient{
BaseURL: "http://localhost:80",
BasicAuthUsername: "username",
Client: &retryablehttp.Client{},
}

req, err := api.newRequest("GET", "jobs", nil)

assert.Nil(t, err)
assert.Equal(t, "", req.Header.Get("Authorization"))
}

func TestNewRequestShouldNotAddTheBasicAuthenticationHeadersWhenBothCredentialsAreUnset(t *testing.T) {
api := FlinkRestClient{
BaseURL: "http://localhost:80",
Client: &retryablehttp.Client{},
}

req, err := api.newRequest("GET", "jobs", nil)

assert.Nil(t, err)
assert.Equal(t, "", req.Header.Get("Authorization"))
}

func createTestServerWithoutBodyCheck(t *testing.T, expectedURL string, status int, body string) *httptest.Server {
return createTestServer(t, expectedURL, false, "", status, body)
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/cli/flink/retrieve_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ type retrieveJobsResponse struct {

// RetrieveJobs returns all the jobs on the Flink cluster
func (c FlinkRestClient) RetrieveJobs() ([]Job, error) {
res, err := c.Client.Get(c.constructURL("jobs/overview"))
req, err := c.newRequest("GET", c.constructURL("jobs/overview"), nil)
if err != nil {
return nil, err
}

res, err := c.Client.Do(req)
if err != nil {
return nil, err
}
Expand Down
15 changes: 12 additions & 3 deletions cmd/cli/flink/retrieve_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ func TestRetrieveJobsReturnsAnErrorWhenTheStatusIsNot200(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/overview", "", http.StatusAccepted, "{}")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
_, err := api.RetrieveJobs()

assert.EqualError(t, err, "Unexpected response status 202 with body {}")
Expand All @@ -22,7 +25,10 @@ func TestRetrieveJobsReturnsAnErrorWhenItCannotDeserializeTheResponseAsJSON(t *t
server := createTestServerWithBodyCheck(t, "/jobs/overview", "", http.StatusOK, `{"jobs: []}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
_, err := api.RetrieveJobs()

assert.EqualError(t, err, "Unable to parse API response as valid JSON: {\"jobs: []}")
Expand All @@ -32,7 +38,10 @@ func TestRetrieveJobsCorrectlyReturnsAnArrayOfJobs(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/overview", "", http.StatusOK, `{"jobs":[{"jid": "1", "name": "Job A", "state": "RUNNING"}]}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
jobs, err := api.RetrieveJobs()

assert.Len(t, jobs, 1)
Expand Down
12 changes: 9 additions & 3 deletions cmd/cli/flink/run_jar.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ type runJarRequest struct {

// RunJar executes a specific JAR file with the supplied parameters on the Flink cluster
func (c FlinkRestClient) RunJar(jarID string, entryClass string, jarArgs string, parallelism int, savepointPath string, allowNonRestoredState bool) error {
req := runJarRequest{
runJarRequest := runJarRequest{
EntryClass: entryClass,
ProgramArgs: jarArgs,
Parallelism: parallelism,
AllowNonRestoredState: allowNonRestoredState,
SavepointPath: savepointPath,
}
reqBody := new(bytes.Buffer)
json.NewEncoder(reqBody).Encode(req)
json.NewEncoder(reqBody).Encode(runJarRequest)

res, err := c.Client.Post(c.constructURL(fmt.Sprintf("jars/%v/run", jarID)), "application/json", reqBody)
req, err := c.newRequest("POST", c.constructURL(fmt.Sprintf("jars/%v/run", jarID)), reqBody)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

res, err := c.Client.Do(req)
if err != nil {
return err
}
Expand Down
10 changes: 8 additions & 2 deletions cmd/cli/flink/run_jar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ func TestRunJarReturnsAnErrorWhenTheStatusIsNot200(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jars/id/run", `{"entryClass":"MainClass","programArgs":"","parallelism":1,"allowNonRestoredState":false,"savepointPath":"/data/flink"}`, http.StatusAccepted, "{}")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
err := api.RunJar("id", "MainClass", "", 1, "/data/flink", false)

assert.EqualError(t, err, "Unexpected response status 202 with body {}")
Expand All @@ -22,7 +25,10 @@ func TestRunJarCorrectlyReturnsNilWhenTheCallSucceeds(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jars/id/run", `{"entryClass":"MainClass","programArgs":"","parallelism":1,"allowNonRestoredState":false,"savepointPath":"/data/flink"}`, http.StatusOK, "")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
err := api.RunJar("id", "MainClass", "", 1, "/data/flink", false)

assert.Nil(t, err)
Expand Down
12 changes: 9 additions & 3 deletions cmd/cli/flink/savepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,21 @@ type CreateSavepointResponse struct {

// CreateSavepoint creates a savepoint for a job specified by job ID
func (c FlinkRestClient) CreateSavepoint(jobID string, savepointPath string) (CreateSavepointResponse, error) {
req := createSavepointRequest{
createSavepointRequest := createSavepointRequest{
TargetDirectory: savepointPath,
CancelJob: false,
}

reqBody := new(bytes.Buffer)
json.NewEncoder(reqBody).Encode(req)
json.NewEncoder(reqBody).Encode(createSavepointRequest)

res, err := c.Client.Post(c.constructURL(fmt.Sprintf("jobs/%v/savepoints", jobID)), "application/json", reqBody)
req, err := c.newRequest("POST", c.constructURL(fmt.Sprintf("jobs/%v/savepoints", jobID)), reqBody)
if err != nil {
return CreateSavepointResponse{}, err
}
req.Header.Set("Content-Type", "application/json")

res, err := c.Client.Do(req)
if err != nil {
return CreateSavepointResponse{}, err
}
Expand Down
30 changes: 24 additions & 6 deletions cmd/cli/flink/savepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ func TestCreateSavepointReturnsAnErrorWhenTheStatusIsNot202(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"target-directory":"/data/flink","cancel-job":false}`, http.StatusOK, "{}")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
_, err := api.CreateSavepoint("1", "/data/flink")

assert.EqualError(t, err, "Unexpected response status 200 with body {}")
Expand All @@ -25,7 +28,10 @@ func TestCreateSavepointReturnsAnErrorWhenItCannotDeserializeTheResponseAsJSON(t
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"target-directory":"/data/flink","cancel-job":false}`, http.StatusAccepted, `{"jobs: []}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
_, err := api.CreateSavepoint("1", "/data/flink")

assert.EqualError(t, err, "Unable to parse API response as valid JSON: {\"jobs: []}")
Expand All @@ -35,7 +41,10 @@ func TestCreateSavepointCorrectlyReturnsARequestID(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/1/savepoints", `{"target-directory":"/data/flink","cancel-job":false}`, http.StatusAccepted, `{"request-id": "1"}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
res, err := api.CreateSavepoint("1", "/data/flink")

assert.Equal(t, res.RequestID, "1")
Expand All @@ -49,7 +58,10 @@ func TestMonitorSavepointCreationReturnsAnErrorWhenTheStatusIsNot200(t *testing.
server := createTestServerWithBodyCheck(t, "/jobs/id-1/savepoints/request-id-1", "", http.StatusAccepted, "{}")
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
_, err := api.MonitorSavepointCreation("id-1", "request-id-1")

assert.EqualError(t, err, "Unexpected response status 202 with body {}")
Expand All @@ -59,7 +71,10 @@ func TestMonitorSavepointCreationReturnsAnErrorWhenItCannotDeserializeTheRespons
server := createTestServerWithBodyCheck(t, "/jobs/id-1/savepoints/request-id-1", "", http.StatusOK, `{"jobs: []}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
_, err := api.MonitorSavepointCreation("id-1", "request-id-1")

assert.EqualError(t, err, "Unable to parse API response as valid JSON: {\"jobs: []}")
Expand All @@ -69,7 +84,10 @@ func TestMonitorSavepointCreationCorrectlyReturnsARequestID(t *testing.T) {
server := createTestServerWithBodyCheck(t, "/jobs/id-1/savepoints/request-id-1", "", http.StatusOK, `{"status":{"id":"PENDING"}}`)
defer server.Close()

api := FlinkRestClient{server.URL, retryablehttp.NewClient()}
api := FlinkRestClient{
BaseURL: server.URL,
Client: retryablehttp.NewClient(),
}
res, err := api.MonitorSavepointCreation("id-1", "request-id-1")

assert.Equal(t, res.Status.Id, "PENDING")
Expand Down
8 changes: 7 additions & 1 deletion cmd/cli/flink/upload_jar.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ func (c FlinkRestClient) constructUploadJarRequest(filename string, url string)
contentType := writer.FormDataContentType()
writer.Close()

return c.Client.Post(url, contentType, buffer)
req, err := c.newRequest("POST", url, buffer)
if err != nil {
return &http.Response{}, err
}
req.Header.Set("Content-Type", contentType)

return c.Client.Do(req)
}

// UploadJar allows for uploading a JAR file to the Flink cluster
Expand Down
Loading

0 comments on commit b540b0a

Please # to comment.