diff --git a/cmd/sparkctl/README.md b/cmd/sparkctl/README.md index 7954a1cc3d..5633548b8c 100644 --- a/cmd/sparkctl/README.md +++ b/cmd/sparkctl/README.md @@ -1,6 +1,6 @@ # sparkctl -`sparkctl` is a command-line tool of the Spark Operator for creating, listing, checking status of, getting logs of, and deleting `SparkApplication`s. It can also do port forwarding from a local port to the Spark web UI port for accessing the Spark web UI on the driver. Each function is implemented as a sub-command of `sparkctl`. +`sparkctl` is a command-line tool of the Spark Operator for creating, listing, checking status of, getting logs of, and deleting `SparkApplication` objects. It can also do port forwarding from a local port to the Spark web UI port for accessing the Spark web UI on the driver. Each function is implemented as a sub-command of `sparkctl`. To build the `sparkctl` binary, run the following command in the root directory of the project: @@ -10,28 +10,28 @@ make build-sparkctl Then the `sparkctl` binary can be found in the `bin` directory: -```bash -$ bin/sparkctl --help -sparkctl is the command-line tool for working with the Spark Operator. It supports creating, deleting and - checking status of SparkApplication objects. It also supports fetching application logs. +```text +$ bin/sparkctl --help +sparkctl is the command-line tool for working with the Spark Operator. +It supports creating, deleting and checking status of SparkApplication objects. It also supports fetching application logs. Usage: sparkctl [command] Available Commands: completion Generate the autocompletion script for the specified shell - create Create a SparkApplication object - delete Delete a SparkApplication object - event Shows SparkApplication events + create Create a SparkApplication from file or ScheduledSparkApplication + delete Delete a SparkApplication + event Show events associated with a SparkApplication forward Start to forward a local port to the remote port of the driver UI + get Get status of a SparkApplication help Help about any command - list List SparkApplication objects - log log is a sub-command of sparkctl that fetches logs of a Spark application. - status Check status of a SparkApplication + list List SparkApplications in a given namespace. + log Fetch logs of the driver pod of a SparkApplication Flags: -h, --help help for sparkctl - -k, --kubeconfig string The path to the local Kubernetes configuration file (default "/Users/chenyi/.kube/config") + --kubeconfig string Paths to a kubeconfig. Only required if out-of-cluster. -n, --namespace string The namespace in which the SparkApplication is to be created (default "default") Use "sparkctl [command] --help" for more information about a command. @@ -41,9 +41,8 @@ Use "sparkctl [command] --help" for more information about a command. The following global flags are available for all the sub commands: -* `--namespace`: the Kubernetes namespace of the `SparkApplication`(s). Defaults to `default`. -* `--kubeconfig`: the path to the file storing configuration for accessing the Kubernetes API server. Defaults to -`$HOME/.kube/config` +- `-n` or `--namespace`: the Kubernetes namespace of the `SparkApplication`(s). Defaults to `default`. +- `--kubeconfig`: the path to the file storing configuration for accessing the Kubernetes API server. Defaults to `$HOME/.kube/config` ## Available Commands @@ -109,14 +108,14 @@ SDK uses the default credential provider chain to find AWS credentials. The SDK uses the first provider in the chain that returns credentials without an error. The default provider chain looks for credentials in the following order: -* Environment variables +- Environment variables - ``` + ```text AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY ``` -- Shared credentials file (.aws/credentials) +- Shared credentials file (`.aws/credentials`) For more information about AWS SDK authentication, please check [Specifying Credentials](https://docs.aws.amazon.com/sdk-for-go/v1/developer-guide/configuring-sdk.html#specifying-credentials). @@ -169,23 +168,22 @@ Publicly available files are referenced through URIs in the default form `https: Usage: ```bash -sparkctl list +sparkctl list [-n ] ``` -### Status +### Get -`status` is a sub command of `sparkctl` for checking and printing the status of a `SparkApplication` in the namespace specified by `--namespace`. +`get` is a sub command of `sparkctl` for checking and printing the status of a `SparkApplication` in the namespace specified by `-n` or `--namespace`. Usage: ```bash -sparkctl status +sparkctl get [-n ] ``` ### Event -`event` is a sub command of `sparkctl` for listing `SparkApplication` events in the namespace -specified by `--namespace`. +`event` is a sub command of `sparkctl` for listing `SparkApplication` events in the namespace specified by `-n` or `--namespace`. The `event` command also supports streaming the events with the `--follow` or `-f` flag. The command will display events since last creation of the `SparkApplication` for the specific `name`, and continues to stream events even if `ResourceVersion` changes. @@ -193,29 +191,29 @@ The command will display events since last creation of the `SparkApplication` fo Usage: ```bash -sparkctl event [-f] +sparkctl event [-n ] [-f] ``` ### Log -`log` is a sub command of `sparkctl` for fetching the logs of a pod of `SparkApplication` with the given name in the namespace specified by `--namespace`. The command by default fetches the logs of the driver pod. To make it fetch logs of an executor pod instead, use the flag `--executor` or `-e` to specify the ID of the executor whose logs should be fetched. +`log` is a sub command of `sparkctl` for fetching the logs of a pod of `SparkApplication` with the given name in the namespace specified by `-n` or `--namespace`. The command by default fetches the logs of the driver pod. To make it fetch logs of an executor pod instead, use the flag `--executor` or `-e` to specify the ID of the executor whose logs should be fetched. The `log` command also supports streaming the driver or executor logs with the `--follow` or `-f` flag. It works in the same way as `kubectl logs -f`, i.e., it streams logs until no more logs are available. Usage: ```bash -sparkctl log [-e ] [-f] +sparkctl log [-n ] [-e ] [-f] ``` ### Delete -`delete` is a sub command of `sparkctl` for deleting a `SparkApplication` with the given name in the namespace specified by `--namespace`. +`delete` is a sub command of `sparkctl` for deleting a `SparkApplication` with the given name in the namespace specified by `-n` or `--namespace`. Usage: ```bash -sparkctl delete +sparkctl delete [-n ] ``` ### Forward @@ -225,7 +223,7 @@ sparkctl delete Usage: ```bash -sparkctl forward [--local-port ] [--remote-port ] +sparkctl forward [-n ] [--local-port ] [--remote-port ] ``` Once port forwarding starts, users can open `127.0.0.1:` or `localhost:` in a browser to access the Spark web UI. Forwarding continues until it is interrupted or the driver pod terminates. diff --git a/cmd/sparkctl/app/client.go b/cmd/sparkctl/app/client.go deleted file mode 100644 index 0ab01ad876..0000000000 --- a/cmd/sparkctl/app/client.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "context" - "os" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" - - "github.com/kubeflow/spark-operator/api/v1beta2" - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" -) - -func buildConfig(kubeConfig string) (*rest.Config, error) { - // Check if kubeConfig exist - if _, err := os.Stat(kubeConfig); os.IsNotExist(err) { - // Try InClusterConfig for sparkctl running in a pod - config, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - - return config, nil - } - - return clientcmd.BuildConfigFromFlags("", kubeConfig) -} - -func getKubeClient() (clientset.Interface, error) { - config, err := buildConfig(KubeConfig) - if err != nil { - return nil, err - } - return getKubeClientForConfig(config) -} - -func getKubeClientForConfig(config *rest.Config) (clientset.Interface, error) { - return clientset.NewForConfig(config) -} - -func getSparkApplicationClient() (crdclientset.Interface, error) { - config, err := buildConfig(KubeConfig) - if err != nil { - return nil, err - } - return getSparkApplicationClientForConfig(config) -} - -func getSparkApplicationClientForConfig(config *rest.Config) (crdclientset.Interface, error) { - return crdclientset.NewForConfig(config) -} - -func getSparkApplication(name string, crdClientset crdclientset.Interface) (*v1beta2.SparkApplication, error) { - app, err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return app, nil -} diff --git a/cmd/sparkctl/app/create_test.go b/cmd/sparkctl/app/create_test.go deleted file mode 100644 index 28d62fddac..0000000000 --- a/cmd/sparkctl/app/create_test.go +++ /dev/null @@ -1,182 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "strings" - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/kubeflow/spark-operator/api/v1beta2" -) - -func TestIsLocalFile(t *testing.T) { - type testcase struct { - file string - isLocal bool - } - - testFn := func(test testcase, t *testing.T) { - isLocal, err := isLocalFile(test.file) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, test.isLocal, isLocal, "%s: expected %v got %v", test.file, test.isLocal, isLocal) - } - - testcases := []testcase{ - {file: "/path/to/file", isLocal: true}, - {file: "file:///path/to/file", isLocal: true}, - {file: "local:///path/to/file", isLocal: false}, - {file: "http://localhost/path/to/file", isLocal: false}, - } - - for _, test := range testcases { - testFn(test, t) - } -} - -func TestFilterLocalFiles(t *testing.T) { - files := []string{ - "path/to/file", - "/path/to/file", - "file:///file/to/path", - "http://localhost/path/to/file", - "hdfs://localhost/path/to/file", - "gs://bucket/path/to/file", - } - - expected := []string{ - "path/to/file", - "/path/to/file", - "file:///file/to/path", - } - - actual, err := filterLocalFiles(files) - if err != nil { - t.Fatal(err) - } - assert.Equal(t, expected, actual) -} - -func TestValidateSpec(t *testing.T) { - type testcase struct { - name string - spec v1beta2.SparkApplicationSpec - expectsValidationError bool - } - - testFn := func(test testcase, t *testing.T) { - err := validateSpec(test.spec) - if test.expectsValidationError { - assert.Error(t, err, "%s: expected error got nothing", test.name) - } else { - assert.NoError(t, err, "%s: did not expect error got %v", test.name, err) - } - } - - image := "spark" - remoteMainAppFile := "https://localhost/path/to/main/app/file" - containerLocalMainAppFile := "local:///path/to/main/app/file" - testcases := []testcase{ - { - name: "application with spec.image set", - spec: v1beta2.SparkApplicationSpec{ - Image: &image, - }, - expectsValidationError: false, - }, - { - name: "application with no spec.image and spec.driver.image", - spec: v1beta2.SparkApplicationSpec{ - Executor: v1beta2.ExecutorSpec{ - SparkPodSpec: v1beta2.SparkPodSpec{ - Image: &image, - }, - }, - }, - expectsValidationError: true, - }, - { - name: "application with no spec.image and spec.executor.image", - spec: v1beta2.SparkApplicationSpec{ - Driver: v1beta2.DriverSpec{ - SparkPodSpec: v1beta2.SparkPodSpec{ - Image: &image, - }, - }, - }, - expectsValidationError: true, - }, - { - name: "application with no spec.image but spec.driver.image and spec.executor.image", - spec: v1beta2.SparkApplicationSpec{ - MainApplicationFile: &containerLocalMainAppFile, - Driver: v1beta2.DriverSpec{ - SparkPodSpec: v1beta2.SparkPodSpec{ - Image: &image, - }, - }, - Executor: v1beta2.ExecutorSpec{ - SparkPodSpec: v1beta2.SparkPodSpec{ - Image: &image, - }, - }, - }, - expectsValidationError: false, - }, - { - name: "application with remote main file and spec.image", - spec: v1beta2.SparkApplicationSpec{ - Image: &image, - MainApplicationFile: &remoteMainAppFile, - }, - expectsValidationError: false, - }, - } - - for _, test := range testcases { - testFn(test, t) - } -} - -func TestLoadFromYAML(t *testing.T) { - app, err := loadFromYAML("testdata/test-app.yaml") - if err != nil { - t.Fatal(err) - } - - assert.Equal(t, "example", app.Name) - assert.Equal(t, "org.examples.SparkExample", *app.Spec.MainClass) - assert.Equal(t, "local:///path/to/example.jar", *app.Spec.MainApplicationFile) - assert.Equal(t, "spark", *app.Spec.Driver.Image) - assert.Equal(t, "spark", *app.Spec.Executor.Image) - assert.Equal(t, 1, int(*app.Spec.Executor.Instances)) -} - -func TestHandleHadoopConfiguration(t *testing.T) { - configMap, err := buildHadoopConfigMap("test", "testdata/hadoop-conf") - if err != nil { - t.Fatal(err) - } - - assert.Equal(t, "test-hadoop-config", configMap.Name) - assert.Len(t, configMap.BinaryData, 1) - assert.Len(t, configMap.Data, 1) - assert.True(t, strings.Contains(configMap.Data["core-site.xml"], "fs.gs.impl")) -} diff --git a/cmd/sparkctl/app/delete.go b/cmd/sparkctl/app/delete.go deleted file mode 100644 index 06021b2716..0000000000 --- a/cmd/sparkctl/app/delete.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "context" - "fmt" - "os" - - "github.com/spf13/cobra" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" -) - -var deleteCmd = &cobra.Command{ - Use: "delete ", - Short: "Delete a SparkApplication object", - Long: `Delete a SparkApplication object with a given name`, - Run: func(_ *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Fprintln(os.Stderr, "must specify a SparkApplication name") - return - } - - crdClientset, err := getSparkApplicationClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err) - return - } - - if err := doDelete(args[0], crdClientset); err != nil { - fmt.Fprintf(os.Stderr, "failed to delete SparkApplication %s: %v\n", args[0], err) - } - }, -} - -func doDelete(name string, crdClientset crdclientset.Interface) error { - if err := deleteSparkApplication(name, crdClientset); err != nil { - return err - } - - fmt.Printf("SparkApplication \"%s\" deleted\n", name) - - return nil -} - -func deleteSparkApplication(name string, crdClientset crdclientset.Interface) error { - return crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) -} diff --git a/cmd/sparkctl/app/forward.go b/cmd/sparkctl/app/forward.go deleted file mode 100644 index 43b817daf8..0000000000 --- a/cmd/sparkctl/app/forward.go +++ /dev/null @@ -1,174 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "context" - "fmt" - "net/http" - "net/url" - "os" - "os/signal" - "time" - - "github.com/spf13/cobra" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/portforward" - "k8s.io/client-go/transport/spdy" - - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" -) - -var LocalPort int32 -var RemotePort int32 - -var forwardCmd = &cobra.Command{ - Use: "forward [--local-port ] [--remote-port ]", - Short: "Start to forward a local port to the remote port of the driver UI", - Long: `Start to forward a local port to the remote port of the driver UI so the UI can be accessed locally.`, - Run: func(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Fprintln(os.Stderr, "must specify a SparkApplication name") - return - } - - config, err := buildConfig(KubeConfig) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get kubeconfig: %v\n", err) - return - } - - crdClientset, err := getSparkApplicationClientForConfig(config) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err) - return - } - - kubeClientset, err := getKubeClientForConfig(config) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get REST client: %v\n", err) - return - } - restClient := kubeClientset.CoreV1().RESTClient() - - driverPodURL, driverPodName, err := getDriverPodURLAndName(args[0], restClient, crdClientset) - if err != nil { - fmt.Fprintf(os.Stderr, - "failed to get an API server URL of the driver pod of SparkApplication %s: %v\n", - args[0], err) - return - } - - stopCh := make(chan struct{}, 1) - readyCh := make(chan struct{}) - - forwarder, err := newPortForwarder(config, driverPodURL, stopCh, readyCh) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get a port forwarder: %v\n", err) - return - } - - fmt.Printf("forwarding from %d -> %d\n", LocalPort, RemotePort) - if err = runPortForward(driverPodName, stopCh, forwarder, kubeClientset); err != nil { - fmt.Fprintf(os.Stderr, "failed to run port forwarding: %v\n", err) - } - }, -} - -func init() { - forwardCmd.Flags().Int32VarP(&LocalPort, "local-port", "l", 4040, - "local port to forward from") - forwardCmd.Flags().Int32VarP(&RemotePort, "remote-port", "r", 4040, - "remote port to forward to") -} - -func newPortForwarder( - config *rest.Config, - url *url.URL, - stopCh chan struct{}, - readyCh chan struct{}) (*portforward.PortForwarder, error) { - transport, upgrader, err := spdy.RoundTripperFor(config) - if err != nil { - return nil, err - } - - dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) - ports := []string{fmt.Sprintf("%d:%d", LocalPort, RemotePort)} - fw, err := portforward.New(dialer, ports, stopCh, readyCh, nil, os.Stderr) - if err != nil { - return nil, err - } - - return fw, nil -} - -func getDriverPodURLAndName( - name string, - restClient rest.Interface, - crdClientset crdclientset.Interface) (*url.URL, string, error) { - app, err := getSparkApplication(name, crdClientset) - if err != nil { - return nil, "", fmt.Errorf("failed to get SparkApplication %s: %v", name, err) - } - - if app.Status.DriverInfo.PodName != "" { - request := restClient.Post(). - Resource("pods"). - Namespace(Namespace). - Name(app.Status.DriverInfo.PodName). - SubResource("portforward") - return request.URL(), app.Status.DriverInfo.PodName, nil - } - - return nil, "", fmt.Errorf("driver pod name of SparkApplication %s is not available yet", name) -} - -func runPortForward( - driverPodName string, - stopCh chan struct{}, - forwarder *portforward.PortForwarder, - kubeClientset clientset.Interface) error { - signals := make(chan os.Signal, 1) - signal.Notify(signals, os.Interrupt) - defer signal.Stop(signals) - - go func() { - defer close(stopCh) - for { - pod, err := kubeClientset.CoreV1().Pods(Namespace).Get(context.TODO(), driverPodName, metav1.GetOptions{}) - if err != nil { - break - } - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - break - } - time.Sleep(1 * time.Second) - } - fmt.Println("stopping forwarding as the driver pod has terminated") - }() - - go func() { - <-signals - close(stopCh) - }() - - return forwarder.ForwardPorts() -} diff --git a/cmd/sparkctl/app/list.go b/cmd/sparkctl/app/list.go deleted file mode 100644 index 63c2bf3b8a..0000000000 --- a/cmd/sparkctl/app/list.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "context" - "fmt" - "os" - - "github.com/olekukonko/tablewriter" - "github.com/spf13/cobra" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" -) - -var listCmd = &cobra.Command{ - Use: "list", - Short: "List SparkApplication objects", - Long: `List SparkApplication objects in a given namespaces.`, - Run: func(_ *cobra.Command, args []string) { - crdClientset, err := getSparkApplicationClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err) - return - } - - if err = doList(crdClientset); err != nil { - fmt.Fprintf(os.Stderr, "failed to list SparkApplications: %v\n", err) - } - }, -} - -func doList(crdClientset crdclientset.Interface) error { - apps, err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return err - } - - table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"Name", "State", "Submission Age", "Termination Age"}) - for _, app := range apps.Items { - table.Append([]string{ - app.Name, - string(app.Status.AppState.State), - getSinceTime(app.Status.LastSubmissionAttemptTime), - getSinceTime(app.Status.TerminationTime), - }) - } - table.Render() - - return nil -} diff --git a/cmd/sparkctl/app/log.go b/cmd/sparkctl/app/log.go deleted file mode 100644 index 9ccdf4a8fe..0000000000 --- a/cmd/sparkctl/app/log.go +++ /dev/null @@ -1,161 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "context" - "fmt" - "io" - "os" - "time" - - "github.com/spf13/cobra" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clientset "k8s.io/client-go/kubernetes" - - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" -) - -var ExecutorID int32 -var FollowLogs bool - -var logCommand = &cobra.Command{ - Use: "log ", - Short: "log is a sub-command of sparkctl that fetches logs of a Spark application.", - Long: ``, - Run: func(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Fprintln(os.Stderr, "must specify a SparkApplication name") - return - } - - kubeClientset, err := getKubeClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get Kubernetes client: %v\n", err) - return - } - - crdClientset, err := getSparkApplicationClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err) - return - } - - if err := doLog(args[0], FollowLogs, kubeClientset, crdClientset); err != nil { - fmt.Fprintf(os.Stderr, "failed to get driver logs of SparkApplication %s: %v\n", args[0], err) - } - }, -} - -func init() { - logCommand.Flags().Int32VarP(&ExecutorID, "executor", "e", -1, - "id of the executor to fetch logs for") - logCommand.Flags().BoolVarP(&FollowLogs, "follow", "f", false, "whether to stream the logs") -} - -func doLog( - name string, - followLogs bool, - kubeClient clientset.Interface, - crdClient crdclientset.Interface) error { - timeout := 30 * time.Second - - podNameChannel := getPodNameChannel(name, crdClient) - var podName string - - select { - case podName = <-podNameChannel: - case <-time.After(timeout): - return fmt.Errorf("not found pod name") - } - - waitLogsChannel := waitForLogsFromPodChannel(podName, kubeClient, crdClient) - - select { - case <-waitLogsChannel: - case <-time.After(timeout): - return fmt.Errorf("timeout to fetch logs from pod \"%s\"", podName) - } - - if followLogs { - return streamLogs(os.Stdout, kubeClient, podName) - } - return printLogs(os.Stdout, kubeClient, podName) -} - -func getPodNameChannel( - sparkApplicationName string, - crdClient crdclientset.Interface) chan string { - channel := make(chan string, 1) - go func() { - for { - app, _ := crdClient.SparkoperatorV1beta2().SparkApplications(Namespace).Get( - context.TODO(), - sparkApplicationName, - metav1.GetOptions{}) - - if app.Status.DriverInfo.PodName != "" { - channel <- app.Status.DriverInfo.PodName - break - } - } - }() - return channel -} - -func waitForLogsFromPodChannel( - podName string, - kubeClient clientset.Interface, - _ crdclientset.Interface) chan bool { - channel := make(chan bool, 1) - go func() { - for { - _, err := kubeClient.CoreV1().Pods(Namespace).GetLogs(podName, &corev1.PodLogOptions{}).Do(context.TODO()).Raw() - - if err == nil { - channel <- true - break - } - } - }() - return channel -} - -// printLogs is a one time operation that prints the fetched logs of the given pod. -func printLogs(out io.Writer, kubeClientset clientset.Interface, podName string) error { - rawLogs, err := kubeClientset.CoreV1().Pods(Namespace).GetLogs(podName, &corev1.PodLogOptions{}).Do(context.TODO()).Raw() - if err != nil { - return err - } - fmt.Fprintln(out, string(rawLogs)) - return nil -} - -// streamLogs streams the logs of the given pod until there are no more logs available. -func streamLogs(out io.Writer, kubeClientset clientset.Interface, podName string) error { - request := kubeClientset.CoreV1().Pods(Namespace).GetLogs(podName, &corev1.PodLogOptions{Follow: true}) - reader, err := request.Stream(context.TODO()) - if err != nil { - return err - } - defer reader.Close() - if _, err := io.Copy(out, reader); err != nil { - return err - } - return nil -} diff --git a/cmd/sparkctl/app/root.go b/cmd/sparkctl/app/root.go deleted file mode 100644 index e845b8be89..0000000000 --- a/cmd/sparkctl/app/root.go +++ /dev/null @@ -1,58 +0,0 @@ -/* -Copyright 2017 Google LLC - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package app - -import ( - "fmt" - "os" - - "github.com/spf13/cobra" -) - -func getKubeConfigPath() string { - var kubeConfigEnv = os.Getenv("KUBECONFIG") - if len(kubeConfigEnv) == 0 { - return os.Getenv("HOME") + "/.kube/config" - } - return kubeConfigEnv -} - -var defaultKubeConfig = getKubeConfigPath() - -var Namespace string -var KubeConfig string - -var rootCmd = &cobra.Command{ - Use: "sparkctl", - Short: "sparkctl is the command-line tool for working with the Spark Operator", - Long: `sparkctl is the command-line tool for working with the Spark Operator. It supports creating, deleting and - checking status of SparkApplication objects. It also supports fetching application logs.`, -} - -func init() { - rootCmd.PersistentFlags().StringVarP(&Namespace, "namespace", "n", "default", - "The namespace in which the SparkApplication is to be created") - rootCmd.PersistentFlags().StringVarP(&KubeConfig, "kubeconfig", "k", defaultKubeConfig, - "The path to the local Kubernetes configuration file") - rootCmd.AddCommand(createCmd, deleteCmd, eventCommand, statusCmd, logCommand, listCmd, forwardCmd) -} - -func Execute() { - if err := rootCmd.Execute(); err != nil { - fmt.Fprintf(os.Stderr, "%v", err) - } -} diff --git a/cmd/sparkctl/app/create.go b/cmd/sparkctl/create/create.go similarity index 52% rename from cmd/sparkctl/app/create.go rename to cmd/sparkctl/create/create.go index 9509b9d7e4..1bb7ad17a8 100644 --- a/cmd/sparkctl/app/create.go +++ b/cmd/sparkctl/create/create.go @@ -1,11 +1,11 @@ /* -Copyright 2017 Google LLC +Copyright 2024 The Kubeflow authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package create import ( "context" @@ -26,110 +26,106 @@ import ( "unicode/utf8" "github.com/spf13/cobra" + "github.com/spf13/viper" "gocloud.dev/blob" "gocloud.dev/gcerrors" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/yaml" - clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubeflow/spark-operator/api/v1beta2" - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" + "github.com/kubeflow/spark-operator/pkg/common" + "github.com/kubeflow/spark-operator/pkg/util" ) -const bufferSize = 1024 - -var DeleteIfExists bool -var LogsEnabled bool -var RootPath string -var UploadToPath string -var UploadToEndpoint string -var UploadToRegion string -var Public bool -var S3ForcePathStyle bool -var Override bool -var From string - -var createCmd = &cobra.Command{ - Use: "create ", - Short: "Create a SparkApplication object", - Long: `Create a SparkApplication from a given YAML file storing the application specification.`, - Run: func(cmd *cobra.Command, args []string) { - if From != "" && len(args) != 1 { - fmt.Fprintln(os.Stderr, "must specify the name of a ScheduledSparkApplication") - return - } +const ( + bufferSize = 1024 +) - if len(args) != 1 { - fmt.Fprintln(os.Stderr, "must specify a YAML file of a SparkApplication") - return - } +var ( + k8sClient client.Client + clientset kubernetes.Interface + + Namespace string + DeleteIfExists bool + RootPath string + UploadToPath string + UploadToEndpoint string + UploadToRegion string + Public bool + S3ForcePathStyle bool + Override bool + From string +) - kubeClient, err := getKubeClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get Kubernetes client: %v\n", err) - return - } +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "create", + Short: "Create a SparkApplication from file or ScheduledSparkApplication", + RunE: func(cmd *cobra.Command, args []string) error { + if From != "" && len(args) != 1 { + return fmt.Errorf("must specify the name of a ScheduledSparkApplication") + } - crdClient, err := getSparkApplicationClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err) - return - } + if len(args) != 1 { + return fmt.Errorf("must specify a YAML file of a SparkApplication") + } + + Namespace = viper.GetString("namespace") - if From != "" { - if err := createFromScheduledSparkApplication(args[0], kubeClient, crdClient); err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) + var err error + k8sClient, err = util.GetK8sClient() + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %v", err) } - } else { - if err := createFromYaml(args[0], kubeClient, crdClient); err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) + + clientset, err = util.GetClientset() + if err != nil { + return fmt.Errorf("failed to create Kubernetes clientset: %v", err) } - } - }, -} -func init() { - createCmd.Flags().BoolVarP(&DeleteIfExists, "delete", "d", false, - "delete the SparkApplication if already exists") - createCmd.Flags().BoolVarP(&LogsEnabled, "logs", "l", false, - "watch the SparkApplication logs") - createCmd.Flags().StringVarP(&UploadToPath, "upload-to", "u", "", - "the name of the bucket where local application dependencies are to be uploaded") - createCmd.Flags().StringVarP(&RootPath, "upload-prefix", "p", "", - "the prefix to use for the dependency uploads") - createCmd.Flags().StringVarP(&UploadToRegion, "upload-to-region", "r", "", - "the GCS or S3 storage region for the bucket") - createCmd.Flags().StringVarP(&UploadToEndpoint, "upload-to-endpoint", "e", - "https://storage.googleapis.com", "the GCS or S3 storage api endpoint url") - createCmd.Flags().BoolVarP(&Public, "public", "c", false, - "whether to make uploaded files publicly available") - createCmd.Flags().BoolVar(&S3ForcePathStyle, "s3-force-path-style", false, - "whether to force path style URLs for S3 objects") - createCmd.Flags().BoolVarP(&Override, "override", "o", false, - "whether to override remote files with the same names") - createCmd.Flags().StringVarP(&From, "from", "f", "", - "the name of ScheduledSparkApplication from which a forced SparkApplication run is created") -} + if From != "" { + if err := createFromScheduledSparkApplication(args[0]); err != nil { + return fmt.Errorf("failed to create SparkApplication %q from ScheduledSparkApplication %q: %v", args[0], From, err) + } + return nil + } -func createFromYaml(yamlFile string, kubeClient clientset.Interface, crdClient crdclientset.Interface) error { - app, err := loadFromYAML(yamlFile) - if err != nil { - return fmt.Errorf("failed to read a SparkApplication from %s: %v", yamlFile, err) - } + app, err := util.LoadSparkApplicationFromFile(args[0]) + if err != nil { + return fmt.Errorf("failed to read SparkApplication from file %s: %v", args[0], err) + } - if err := createSparkApplication(app, kubeClient, crdClient); err != nil { - return fmt.Errorf("failed to create SparkApplication %s: %v", app.Name, err) + if err := createSparkApplication(app); err != nil { + return fmt.Errorf("failed to create SparkApplication %s: %v", app.Name, err) + } + + return nil + }, } - return nil + cmd.Flags().BoolVarP(&DeleteIfExists, "delete", "d", false, "delete the SparkApplication if already exists") + cmd.Flags().StringVarP(&UploadToPath, "upload-to", "u", "", "the name of the bucket where local application dependencies are to be uploaded") + cmd.Flags().StringVarP(&RootPath, "upload-prefix", "p", "", "the prefix to use for the dependency uploads") + cmd.Flags().StringVarP(&UploadToRegion, "upload-to-region", "r", "", "the GCS or S3 storage region for the bucket") + cmd.Flags().StringVarP(&UploadToEndpoint, "upload-to-endpoint", "e", "https://storage.googleapis.com", "the GCS or S3 storage api endpoint url") + cmd.Flags().BoolVarP(&Public, "public", "c", false, "whether to make uploaded files publicly available") + cmd.Flags().BoolVar(&S3ForcePathStyle, "s3-force-path-style", false, "whether to force path style URLs for S3 objects") + cmd.Flags().BoolVarP(&Override, "override", "o", false, "whether to override remote files with the same names") + cmd.Flags().StringVarP(&From, "from", "f", "", "the name of ScheduledSparkApplication from which a forced SparkApplication run is created") + + return cmd } -func createFromScheduledSparkApplication(name string, kubeClient clientset.Interface, crdClient crdclientset.Interface) error { - sapp, err := crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(Namespace).Get(context.TODO(), From, metav1.GetOptions{}) - if err != nil { +func createFromScheduledSparkApplication(name string) error { + key := client.ObjectKey{ + Namespace: Namespace, + Name: From, + } + scheduledApp := &v1beta2.ScheduledSparkApplication{} + if err := k8sClient.Get(context.TODO(), key, scheduledApp); err != nil { return fmt.Errorf("failed to get ScheduledSparkApplication %s: %v", From, err) } @@ -141,25 +137,25 @@ func createFromScheduledSparkApplication(name string, kubeClient clientset.Inter { APIVersion: v1beta2.SchemeGroupVersion.String(), Kind: reflect.TypeOf(v1beta2.ScheduledSparkApplication{}).Name(), - Name: sapp.Name, - UID: sapp.UID, + Name: scheduledApp.Name, + UID: scheduledApp.UID, }, }, }, - Spec: *sapp.Spec.Template.DeepCopy(), + Spec: *scheduledApp.Spec.Template.DeepCopy(), } - if err := createSparkApplication(app, kubeClient, crdClient); err != nil { + if err := createSparkApplication(app); err != nil { return fmt.Errorf("failed to create SparkApplication %s: %v", app.Name, err) } return nil } -func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset.Interface, crdClient crdclientset.Interface) error { +func createSparkApplication(app *v1beta2.SparkApplication) error { if DeleteIfExists { - if err := deleteSparkApplication(app.Name, crdClient); err != nil { - return err + if err := k8sClient.Delete(context.TODO(), app); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete SparkApplication %s: %v", app.Name, err) } } @@ -172,49 +168,21 @@ func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset. return err } - if hadoopConfDir := os.Getenv("HADOOP_CONF_DIR"); hadoopConfDir != "" { - fmt.Println("creating a ConfigMap for Hadoop configuration files in HADOOP_CONF_DIR") - if err := handleHadoopConfiguration(app, hadoopConfDir, kubeClient); err != nil { + if hadoopConfDir := os.Getenv(common.EnvHadoopConfDir); hadoopConfDir != "" { + fmt.Printf("Creating a ConfigMap for Hadoop configuration files in %s\n", hadoopConfDir) + if err := handleHadoopConf(app, hadoopConfDir); err != nil { return err } } - if _, err := crdClient.SparkoperatorV1beta2().SparkApplications(Namespace).Create( - context.TODO(), - app, - metav1.CreateOptions{}, - ); err != nil { + if err := k8sClient.Create(context.TODO(), app); err != nil { return err } - - fmt.Printf("SparkApplication \"%s\" created\n", app.Name) - - if LogsEnabled { - if err := doLog(app.Name, true, kubeClient, crdClient); err != nil { - return nil - } - } + fmt.Printf("sparkapplication %q created\n", app.Name) return nil } -func loadFromYAML(yamlFile string) (*v1beta2.SparkApplication, error) { - file, err := os.Open(yamlFile) - if err != nil { - return nil, err - } - defer file.Close() - - decoder := yaml.NewYAMLOrJSONDecoder(file, bufferSize) - app := &v1beta2.SparkApplication{} - err = decoder.Decode(app) - if err != nil { - return nil, err - } - - return app, nil -} - func validateSpec(spec v1beta2.SparkApplicationSpec) error { if spec.Image == nil && (spec.Driver.Image == nil || spec.Executor.Image == nil) { return fmt.Errorf("'spec.driver.image' and 'spec.executor.image' cannot be empty when 'spec.image' " + @@ -225,22 +193,25 @@ func validateSpec(spec v1beta2.SparkApplicationSpec) error { } func handleLocalDependencies(app *v1beta2.SparkApplication) error { - if app.Spec.MainApplicationFile != nil { - isMainAppFileLocal, err := isLocalFile(*app.Spec.MainApplicationFile) + // Upload the main application file to the cloud if it is a local file. + mainAppFile := app.Spec.MainApplicationFile + if mainAppFile != nil { + isMainAppFileLocal, err := util.IsLocalFile(*mainAppFile) if err != nil { return err } if isMainAppFileLocal { - uploadedMainFile, err := uploadLocalDependencies(app, []string{*app.Spec.MainApplicationFile}) + uploadedMainFile, err := uploadLocalDependencies(app, []string{*mainAppFile}) if err != nil { return fmt.Errorf("failed to upload local main application file: %v", err) } - app.Spec.MainApplicationFile = &uploadedMainFile[0] + mainAppFile = &uploadedMainFile[0] } } - localJars, err := filterLocalFiles(app.Spec.Deps.Jars) + // Filter out local jars and upload them to the cloud. + localJars, err := util.FilterLocalFiles(app.Spec.Deps.Jars) if err != nil { return fmt.Errorf("failed to filter local jars: %v", err) } @@ -253,7 +224,8 @@ func handleLocalDependencies(app *v1beta2.SparkApplication) error { app.Spec.Deps.Jars = uploadedJars } - localFiles, err := filterLocalFiles(app.Spec.Deps.Files) + // Filter out local files and upload them to the cloud. + localFiles, err := util.FilterLocalFiles(app.Spec.Deps.Files) if err != nil { return fmt.Errorf("failed to filter local files: %v", err) } @@ -266,7 +238,8 @@ func handleLocalDependencies(app *v1beta2.SparkApplication) error { app.Spec.Deps.Files = uploadedFiles } - localPyFiles, err := filterLocalFiles(app.Spec.Deps.PyFiles) + // Filter out local python files and upload them to the cloud. + localPyFiles, err := util.FilterLocalFiles(app.Spec.Deps.PyFiles) if err != nil { return fmt.Errorf("failed to filter local pyfiles: %v", err) } @@ -282,30 +255,73 @@ func handleLocalDependencies(app *v1beta2.SparkApplication) error { return nil } -func filterLocalFiles(files []string) ([]string, error) { - var localFiles []string - for _, file := range files { - if isLocal, err := isLocalFile(file); err != nil { - return nil, err - } else if isLocal { - localFiles = append(localFiles, file) - } +func handleHadoopConf(app *v1beta2.SparkApplication, hadoopConfDir string) error { + configMap, err := buildHadoopConfigMap(app.Name, app.Namespace, hadoopConfDir) + if err != nil { + return fmt.Errorf("failed to build ConfigMap from files in %s: %v", hadoopConfDir, err) + } + + if err := k8sClient.Delete(context.TODO(), configMap); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete existing ConfigMap %s: %v", configMap.Name, err) } - return localFiles, nil + if err := k8sClient.Create(context.TODO(), configMap); err != nil { + return fmt.Errorf("failed to create ConfigMap %s: %v", configMap.Name, err) + } + + app.Spec.HadoopConfigMap = &configMap.Name + return nil } -func isLocalFile(file string) (bool, error) { - fileURL, err := url.Parse(file) +func buildHadoopConfigMap(appName string, appNamespace string, hadoopConfDir string) (*corev1.ConfigMap, error) { + info, err := os.Stat(hadoopConfDir) + if err != nil { + return nil, err + } + + if !info.IsDir() { + return nil, fmt.Errorf("%s is not a directory", hadoopConfDir) + } + + files, err := os.ReadDir(hadoopConfDir) if err != nil { - return false, err + return nil, err + } + + if len(files) == 0 { + return nil, fmt.Errorf("no Hadoop configuration file found in %s", hadoopConfDir) } - if fileURL.Scheme == "file" || fileURL.Scheme == "" { - return true, nil + data := make(map[string]string) + binaryData := make(map[string][]byte) + for _, file := range files { + if file.IsDir() { + continue + } + + filename := file.Name() + bytes, err := os.ReadFile(filepath.Join(hadoopConfDir, filename)) + if err != nil { + return nil, err + } + + if utf8.Valid(bytes) { + data[filename] = string(bytes) + } else { + binaryData[filename] = bytes + } + } + + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-hadoop-conf", appName), + Namespace: appNamespace, + }, + Data: data, + BinaryData: binaryData, } - return false, nil + return configMap, nil } type blobHandler interface { @@ -425,76 +441,3 @@ func uploadLocalDependencies(app *v1beta2.SparkApplication, files []string) ([]s return uploadedFilePaths, nil } - -func handleHadoopConfiguration( - app *v1beta2.SparkApplication, - hadoopConfDir string, - kubeClientset clientset.Interface) error { - configMap, err := buildHadoopConfigMap(app.Name, hadoopConfDir) - if err != nil { - return fmt.Errorf("failed to create a ConfigMap for Hadoop configuration files in %s: %v", - hadoopConfDir, err) - } - - err = kubeClientset.CoreV1().ConfigMaps(Namespace).Delete(context.TODO(), configMap.Name, metav1.DeleteOptions{}) - if err != nil && !errors.IsNotFound(err) { - return fmt.Errorf("failed to delete existing ConfigMap %s: %v", configMap.Name, err) - } - - if configMap, err = kubeClientset.CoreV1().ConfigMaps(Namespace).Create(context.TODO(), configMap, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("failed to create ConfigMap %s: %v", configMap.Name, err) - } - - app.Spec.HadoopConfigMap = &configMap.Name - - return nil -} - -func buildHadoopConfigMap(appName string, hadoopConfDir string) (*corev1.ConfigMap, error) { - info, err := os.Stat(hadoopConfDir) - if err != nil { - return nil, err - } - - if !info.IsDir() { - return nil, fmt.Errorf("%s is not a directory", hadoopConfDir) - } - - files, err := os.ReadDir(hadoopConfDir) - if err != nil { - return nil, err - } - - if len(files) == 0 { - return nil, fmt.Errorf("no Hadoop configuration file found in %s", hadoopConfDir) - } - - hadoopStringConfigFiles := make(map[string]string) - hadoopBinaryConfigFiles := make(map[string][]byte) - for _, file := range files { - if file.IsDir() { - continue - } - content, err := os.ReadFile(filepath.Join(hadoopConfDir, file.Name())) - if err != nil { - return nil, err - } - - if utf8.Valid(content) { - hadoopStringConfigFiles[file.Name()] = string(content) - } else { - hadoopBinaryConfigFiles[file.Name()] = content - } - } - - configMap := &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: appName + "-hadoop-config", - Namespace: Namespace, - }, - Data: hadoopStringConfigFiles, - BinaryData: hadoopBinaryConfigFiles, - } - - return configMap, nil -} diff --git a/cmd/sparkctl/app/utils.go b/cmd/sparkctl/create/create_test.go similarity index 56% rename from cmd/sparkctl/app/utils.go rename to cmd/sparkctl/create/create_test.go index 0786c8a5ba..6c59b5abf5 100644 --- a/cmd/sparkctl/app/utils.go +++ b/cmd/sparkctl/create/create_test.go @@ -14,26 +14,23 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package create import ( - "time" + "strings" + "testing" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/duration" + "github.com/stretchr/testify/assert" ) -func getSinceTime(timestamp metav1.Time) string { - if timestamp.IsZero() { - return "N.A." +func TestHandleHadoopConfiguration(t *testing.T) { + configMap, err := buildHadoopConfigMap("test", "default", "testdata/hadoop-conf") + if err != nil { + t.Fatal(err) } - return duration.ShortHumanDuration(time.Since(timestamp.Time)) -} - -func formatNotAvailable(info string) string { - if info == "" { - return "N.A." - } - return info + assert.Equal(t, "test-hadoop-config", configMap.Name) + assert.Len(t, configMap.BinaryData, 1) + assert.Len(t, configMap.Data, 1) + assert.True(t, strings.Contains(configMap.Data["core-site.xml"], "fs.gs.impl")) } diff --git a/cmd/sparkctl/app/gcs.go b/cmd/sparkctl/create/gcs.go similarity index 99% rename from cmd/sparkctl/app/gcs.go rename to cmd/sparkctl/create/gcs.go index 5601497afb..22bd82f58c 100644 --- a/cmd/sparkctl/app/gcs.go +++ b/cmd/sparkctl/create/gcs.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package create import ( "fmt" diff --git a/cmd/sparkctl/app/s3.go b/cmd/sparkctl/create/s3.go similarity index 99% rename from cmd/sparkctl/app/s3.go rename to cmd/sparkctl/create/s3.go index 4bdbeaa6c9..4cfbb25ece 100644 --- a/cmd/sparkctl/app/s3.go +++ b/cmd/sparkctl/create/s3.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package create import ( "context" diff --git a/cmd/sparkctl/app/testdata/hadoop-conf/binary.dat b/cmd/sparkctl/create/testdata/hadoop-conf/binary.dat similarity index 100% rename from cmd/sparkctl/app/testdata/hadoop-conf/binary.dat rename to cmd/sparkctl/create/testdata/hadoop-conf/binary.dat diff --git a/cmd/sparkctl/app/testdata/hadoop-conf/core-site.xml b/cmd/sparkctl/create/testdata/hadoop-conf/core-site.xml similarity index 100% rename from cmd/sparkctl/app/testdata/hadoop-conf/core-site.xml rename to cmd/sparkctl/create/testdata/hadoop-conf/core-site.xml diff --git a/cmd/sparkctl/app/testdata/test-app.yaml b/cmd/sparkctl/create/testdata/test-app.yaml similarity index 100% rename from cmd/sparkctl/app/testdata/test-app.yaml rename to cmd/sparkctl/create/testdata/test-app.yaml diff --git a/cmd/sparkctl/delete/delete.go b/cmd/sparkctl/delete/delete.go new file mode 100644 index 0000000000..be1127441e --- /dev/null +++ b/cmd/sparkctl/delete/delete.go @@ -0,0 +1,62 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package delete + +import ( + "context" + "fmt" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/pkg/util" +) + +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "delete ", + Short: "Delete a SparkApplication", + Long: "Delete a SparkApplication object with a given name", + Args: cobra.ExactArgs(1), + RunE: func(_ *cobra.Command, args []string) error { + name := args[0] + namespace := viper.GetString("namespace") + + k8sClient, err := util.GetK8sClient() + if err != nil { + return fmt.Errorf("failed to create client: %v", err) + } + + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + if err := k8sClient.Delete(context.TODO(), app); err != nil { + return fmt.Errorf("failed to delete SparkApplication %s: %v", name, err) + } + + fmt.Printf("sparkapplication \"%s\" deleted\n", name) + return nil + }, + } + + return cmd +} diff --git a/cmd/sparkctl/app/event.go b/cmd/sparkctl/event/event.go similarity index 59% rename from cmd/sparkctl/app/event.go rename to cmd/sparkctl/event/event.go index 37ced1ad0a..2ba3e4de69 100644 --- a/cmd/sparkctl/app/event.go +++ b/cmd/sparkctl/event/event.go @@ -1,11 +1,11 @@ /* -Copyright 2018 Google LLC +Copyright 2024 The Kubeflow authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package event import ( "context" @@ -25,64 +25,68 @@ import ( "github.com/olekukonko/tablewriter" "github.com/spf13/cobra" + "github.com/spf13/viper" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" clientWatch "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/util/interrupt" + "sigs.k8s.io/controller-runtime/pkg/client" - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/pkg/util" ) -var FollowEvents bool - -var eventCommand = &cobra.Command{ - Use: "event ", - Short: "Shows SparkApplication events", - Long: `Shows events associated with SparkApplication of a given name`, - Run: func(cmd *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Fprintln(os.Stderr, "must specify a SparkApplication name") - return - } +var ( + followEvents bool +) - crdClientset, err := getSparkApplicationClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err) - return - } +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "event ", + Short: "Show events associated with a SparkApplication", + Long: "Show events associated with a SparkApplication of the given name", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + name := args[0] + namespace := viper.GetString("namespace") + + k8sClient, err := util.GetK8sClient() + if err != nil { + return fmt.Errorf("failed to get Kubernetes client: %v", err) + } - kubeClientset, err := getKubeClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get KubeClient: %v\n", err) - return - } + clientset, err := util.GetClientset() + if err != nil { + return fmt.Errorf("failed to get Kubernetes clientset: %v", err) + } - if err := doShowEvents(args[0], crdClientset, kubeClientset); err != nil { - fmt.Fprintf(os.Stderr, "failed to check events of SparkApplication %s: %v\n", args[0], err) - } - }, -} + if err := doShowEvents(name, namespace, k8sClient, clientset); err != nil { + return err + } -func init() { - eventCommand.Flags().BoolVarP(&FollowEvents, "follow", "f", false, - "whether to stream the events for the specified SparkApplication name") + return nil + }, + } + cmd.Flags().BoolVarP(&followEvents, "follow", "f", false, "whether to stream the events for the specified SparkApplication name") + return cmd } -func doShowEvents(name string, crdClientset crdclientset.Interface, kubeClientset kubernetes.Interface) error { - app, err := getSparkApplication(name, crdClientset) - if err != nil { +func doShowEvents(name string, namespace string, k8sClient client.Client, clientset kubernetes.Interface) error { + key := types.NamespacedName{Namespace: namespace, Name: name} + app := v1beta2.SparkApplication{} + if err := k8sClient.Get(context.TODO(), key, &app); err != nil { return fmt.Errorf("failed to get SparkApplication %s: %v", name, err) } - app.Kind = "SparkApplication" - eventsInterface := kubeClientset.CoreV1().Events(Namespace) - if FollowEvents { + eventsClient := clientset.CoreV1().Events(namespace) + if followEvents { // watch for all events for this specific SparkApplication name - selector := eventsInterface.GetFieldSelector(&app.Name, &app.Namespace, &app.Kind, nil) + selector := eventsClient.GetFieldSelector(&app.Name, &app.Namespace, &app.Kind, nil) options := metav1.ListOptions{FieldSelector: selector.String(), Watch: true} - events, err := eventsInterface.Watch(context.TODO(), options) + events, err := eventsClient.Watch(context.TODO(), options) if err != nil { return err } @@ -91,10 +95,10 @@ func doShowEvents(name string, crdClientset crdclientset.Interface, kubeClientse } } else { // print only events for current SparkApplication UID - stringUID := string(app.UID) - selector := eventsInterface.GetFieldSelector(&app.Name, &app.Namespace, &app.Kind, &stringUID) + uid := string(app.UID) + selector := eventsClient.GetFieldSelector(&app.Name, &app.Namespace, &app.Kind, &uid) options := metav1.ListOptions{FieldSelector: selector.String()} - events, err := eventsInterface.List(context.TODO(), options) + events, err := eventsClient.List(context.TODO(), options) if err != nil { return err } @@ -106,38 +110,6 @@ func doShowEvents(name string, crdClientset crdclientset.Interface, kubeClientse return nil } -func prepareNewTable() *tablewriter.Table { - table := tablewriter.NewWriter(os.Stdout) - table.SetColMinWidth(0, 10) - table.SetColMinWidth(1, 6) - table.SetColMinWidth(2, 50) - - return table -} - -func prepareEventsHeader(table *tablewriter.Table) *tablewriter.Table { - table.SetBorders(tablewriter.Border{Left: true, Top: true, Right: true, Bottom: true}) - table.SetHeader([]string{"Type", "Age", "Message"}) - table.SetHeaderLine(true) - return table -} - -func printEvents(events *corev1.EventList) error { - // Render all event rows - table := prepareNewTable() - table = prepareEventsHeader(table) - for _, event := range events.Items { - table.Append([]string{ - event.Type, - getSinceTime(event.LastTimestamp), - strings.TrimSpace(event.Message), - }) - } - - table.Render() - return nil -} - func streamEvents(events watch.Interface, streamSince int64) error { // Render just table header, without a additional header line as we stream table := prepareNewTable() @@ -152,11 +124,11 @@ func streamEvents(events watch.Interface, streamSince int64) error { // Start rendering contents of the table without table header as it is already printed table = prepareNewTable() table.SetBorders(tablewriter.Border{Left: true, Top: false, Right: true, Bottom: false}) - ctx := context.TODO() - ctx, cancel := context.WithTimeout(ctx, watchExpire) + ctx, cancel := context.WithTimeout(context.TODO(), watchExpire) defer cancel() - _, err := clientWatch.UntilWithoutRetry(ctx, events, func(ev watch.Event) (bool, error) { - if event, isEvent := ev.Object.(*corev1.Event); isEvent { + + _, err := clientWatch.UntilWithoutRetry(ctx, events, func(e watch.Event) (bool, error) { + if event, ok := e.Object.(*corev1.Event); ok { // Ensure to display events which are newer than last creation time of SparkApplication // for this specific application name if streamSince <= event.CreationTimestamp.Unix() { @@ -164,13 +136,13 @@ func streamEvents(events watch.Interface, streamSince int64) error { table.ClearRows() table.Append([]string{ event.Type, - getSinceTime(event.LastTimestamp), + util.GetSinceTime(event.LastTimestamp), strings.TrimSpace(event.Message), }) table.Render() } } else { - fmt.Printf("info: %v", ev.Object) + fmt.Printf("info: %v", e.Object) } return false, nil @@ -178,3 +150,34 @@ func streamEvents(events watch.Interface, streamSince int64) error { return err }) } + +func printEvents(events *corev1.EventList) error { + // Render all event rows + table := prepareNewTable() + table = prepareEventsHeader(table) + for _, event := range events.Items { + table.Append([]string{ + event.Type, + util.GetSinceTime(event.LastTimestamp), + strings.TrimSpace(event.Message), + }) + } + + table.Render() + return nil +} + +func prepareNewTable() *tablewriter.Table { + table := tablewriter.NewWriter(os.Stdout) + table.SetColMinWidth(0, 10) + table.SetColMinWidth(1, 6) + table.SetColMinWidth(2, 50) + return table +} + +func prepareEventsHeader(table *tablewriter.Table) *tablewriter.Table { + table.SetBorders(tablewriter.Border{Left: true, Top: true, Right: true, Bottom: true}) + table.SetHeader([]string{"Type", "Age", "Message"}) + table.SetHeaderLine(true) + return table +} diff --git a/cmd/sparkctl/forward/forward.go b/cmd/sparkctl/forward/forward.go new file mode 100644 index 0000000000..2e7359f9e6 --- /dev/null +++ b/cmd/sparkctl/forward/forward.go @@ -0,0 +1,183 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package forward + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + "os/signal" + "syscall" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/pkg/util" +) + +var ( + localPort int32 + remotePort int32 +) + +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "forward [--local-port ] [--remote-port ]", + Short: "Start to forward a local port to the remote port of the driver UI", + Long: `Start to forward a local port to the remote port of the driver UI so the UI can be accessed locally.`, + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + name := args[0] + namespace := viper.GetString("namespace") + + config, err := ctrl.GetConfig() + if err != nil { + return fmt.Errorf("failed to get rest config: %v", err) + } + + k8sClient, err := util.GetK8sClient() + if err != nil { + return fmt.Errorf("failed to get Kubernetes client: %v", err) + } + + clientset, err := util.GetClientset() + if err != nil { + return fmt.Errorf("failed to get Kubernetes clientset: %v", err) + } + + return doPortForward(name, namespace, config, k8sClient, clientset) + }, + } + + cmd.Flags().Int32VarP(&localPort, "local-port", "l", 4040, "local port to forward from") + cmd.Flags().Int32VarP(&remotePort, "remote-port", "r", 4040, "remote port to forward to") + return cmd +} + +func doPortForward( + name string, + namespace string, + config *rest.Config, + k8sClient client.Client, + clientset kubernetes.Interface, +) error { + key := types.NamespacedName{Namespace: namespace, Name: name} + app := &v1beta2.SparkApplication{} + if err := k8sClient.Get(context.TODO(), key, app); err != nil { + return fmt.Errorf("failed to get SparkApplication %s: %v", name, err) + } + + driverPodName := app.Status.DriverInfo.PodName + if driverPodName == "" { + return fmt.Errorf("driver pod not found") + } + + url := clientset.CoreV1(). + RESTClient(). + Post(). + Resource("pods"). + Namespace(namespace). + Name(app.Status.DriverInfo.PodName). + SubResource("portforward"). + URL() + if url == nil { + return fmt.Errorf("failed to get URL for port forwarding") + } + + stopCh := make(chan struct{}, 1) + readyCh := make(chan struct{}) + + forwarder, err := newPortForwarder(config, url, stopCh, readyCh) + if err != nil { + return fmt.Errorf("failed to get port forwarder: %v", err) + } + + fmt.Printf("Forwarding from %d -> %d\n", localPort, remotePort) + if err = runPortForward(driverPodName, namespace, forwarder, clientset, stopCh); err != nil { + return fmt.Errorf("failed to do port forwarding: %v", err) + } + + return nil +} + +func newPortForwarder( + config *rest.Config, + url *url.URL, + stopCh chan struct{}, + readyCh chan struct{}, +) (*portforward.PortForwarder, error) { + transport, upgrader, err := spdy.RoundTripperFor(config) + if err != nil { + return nil, err + } + + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", url) + ports := []string{fmt.Sprintf("%d:%d", localPort, remotePort)} + forwarder, err := portforward.New(dialer, ports, stopCh, readyCh, nil, os.Stderr) + if err != nil { + return nil, err + } + + return forwarder, nil +} + +func runPortForward( + driverPodName string, + namespace string, + forwarder *portforward.PortForwarder, + clientset kubernetes.Interface, + stopCh chan struct{}, +) error { + signals := make(chan os.Signal, 1) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGSTOP) + defer signal.Stop(signals) + + go func() { + defer close(stopCh) + for { + pod, err := clientset.CoreV1().Pods(namespace).Get(context.TODO(), driverPodName, metav1.GetOptions{}) + if err != nil { + break + } + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + break + } + time.Sleep(1 * time.Second) + } + fmt.Println("Stop forwarding as the driver pod has terminated") + }() + + go func() { + <-signals + close(stopCh) + }() + + return forwarder.ForwardPorts() +} diff --git a/cmd/sparkctl/app/status.go b/cmd/sparkctl/get/get.go similarity index 52% rename from cmd/sparkctl/app/status.go rename to cmd/sparkctl/get/get.go index 59d2a05faa..3f134cc11c 100644 --- a/cmd/sparkctl/app/status.go +++ b/cmd/sparkctl/get/get.go @@ -1,11 +1,11 @@ /* -Copyright 2017 Google LLC +Copyright 2024 The Kubeflow authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -14,62 +14,61 @@ See the License for the specific language governing permissions and limitations under the License. */ -package app +package get import ( + "context" "fmt" "os" "github.com/olekukonko/tablewriter" "github.com/spf13/cobra" + "github.com/spf13/viper" + "k8s.io/apimachinery/pkg/types" "github.com/kubeflow/spark-operator/api/v1beta2" - crdclientset "github.com/kubeflow/spark-operator/pkg/client/clientset/versioned" + "github.com/kubeflow/spark-operator/pkg/util" ) -var statusCmd = &cobra.Command{ - Use: "status ", - Short: "Check status of a SparkApplication", - Long: `Check status of a SparkApplication with a given name`, - Run: func(_ *cobra.Command, args []string) { - if len(args) != 1 { - fmt.Fprintln(os.Stderr, "must specify a SparkApplication name") - return - } - - crdClientset, err := getSparkApplicationClient() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get SparkApplication client: %v\n", err) - return - } - - if err := doStatus(args[0], crdClientset); err != nil { - fmt.Fprintf(os.Stderr, "failed to check status of SparkApplication %s: %v\n", args[0], err) - } - }, -} - -func doStatus(name string, crdClientset crdclientset.Interface) error { - app, err := getSparkApplication(name, crdClientset) - if err != nil { - return fmt.Errorf("failed to get SparkApplication %s: %v", name, err) +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "get ", + Short: "Get status of a SparkApplication", + Long: "Get status of a SparkApplication with the given name", + Args: cobra.ExactArgs(1), + RunE: func(_ *cobra.Command, args []string) error { + name := args[0] + namespace := viper.GetString("namespace") + + k8sClient, err := util.GetK8sClient() + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %v", err) + } + + key := types.NamespacedName{Namespace: namespace, Name: name} + app := &v1beta2.SparkApplication{} + if err := k8sClient.Get(context.TODO(), key, app); err != nil { + return fmt.Errorf("failed to get SparkApplication %s: %v", name, err) + } + + printStatus(app) + + return nil + }, } - - printStatus(app) - - return nil + return cmd } func printStatus(app *v1beta2.SparkApplication) { fmt.Println("application state:") table := tablewriter.NewWriter(os.Stdout) - table.SetHeader([]string{"State", "Submission Age", "Completion Age", "Driver Pod", "Driver UI", "SubmissionAttempts", "ExecutionAttempts"}) + table.SetHeader([]string{"State", "Submission Age", "Completion Age", "Driver Pod", "Driver UI", "Submission Attempts", "Execution Attempts"}) table.Append([]string{ string(app.Status.AppState.State), - getSinceTime(app.Status.LastSubmissionAttemptTime), - getSinceTime(app.Status.TerminationTime), - formatNotAvailable(app.Status.DriverInfo.PodName), - formatNotAvailable(app.Status.DriverInfo.WebUIAddress), + util.GetSinceTime(app.Status.LastSubmissionAttemptTime), + util.GetSinceTime(app.Status.TerminationTime), + util.FormatNotAvailable(app.Status.DriverInfo.PodName), + util.FormatNotAvailable(app.Status.DriverInfo.WebUIAddress), fmt.Sprintf("%v", app.Status.SubmissionAttempts), fmt.Sprintf("%v", app.Status.ExecutionAttempts), }) diff --git a/cmd/sparkctl/list/list.go b/cmd/sparkctl/list/list.go new file mode 100644 index 0000000000..b731a308dd --- /dev/null +++ b/cmd/sparkctl/list/list.go @@ -0,0 +1,98 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package list + +import ( + "context" + "fmt" + "os" + "text/tabwriter" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/pkg/util" +) + +const ( + TimeLayout = "2006-01-02T15:04:05Z" +) + +var ( + k8sClient client.Client + allNamespaces bool +) + +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List SparkApplications in a given namespace", + RunE: func(_ *cobra.Command, args []string) error { + namespace := viper.GetString("namespace") + + var err error + k8sClient, err = util.GetK8sClient() + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %v", err) + } + + return doList(namespace) + }, + } + + cmd.Flags().BoolVarP(&allNamespaces, "all-namespaces", "A", false, "If present, list the SparkApplications across all namespaces.") + + return cmd +} + +func doList(namespace string) error { + apps := v1beta2.SparkApplicationList{} + listOptions := []client.ListOption{} + if !allNamespaces { + listOptions = append(listOptions, client.InNamespace(namespace)) + } + if err := k8sClient.List(context.TODO(), &apps, listOptions...); err != nil { + return fmt.Errorf("failed to list SparkApplications: %v", err) + } + + writer := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + defer writer.Flush() + fmt.Fprintf(writer, "Name\tStatus\tSubmission Attempts\tSubmission Time\tExecution Attempts\tTermination Time\n") + for _, app := range apps.Items { + var lastSubmissionTime, executionTime string + if !app.Status.LastSubmissionAttemptTime.IsZero() { + lastSubmissionTime = app.Status.LastSubmissionAttemptTime.Format(TimeLayout) + } + if !app.Status.TerminationTime.IsZero() { + executionTime = app.Status.TerminationTime.Format(TimeLayout) + } + + fmt.Fprintf(writer, + "%s\t%s\t%d\t%s\t%d\t%s\n", + app.Name, + string(app.Status.AppState.State), + app.Status.SubmissionAttempts, + lastSubmissionTime, + app.Status.ExecutionAttempts, + executionTime, + ) + } + + return nil +} diff --git a/cmd/sparkctl/log/log.go b/cmd/sparkctl/log/log.go new file mode 100644 index 0000000000..da7f20dc42 --- /dev/null +++ b/cmd/sparkctl/log/log.go @@ -0,0 +1,140 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package log + +import ( + "context" + "fmt" + "io" + "os" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/pkg/util" +) + +var ( + k8sClient client.Client + clientset kubernetes.Interface + executorID int32 + followLogs bool + timeout time.Duration +) + +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "log ", + Short: "Fetch logs of the driver pod of a SparkApplication", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + name := args[0] + namespace := viper.GetString("namespace") + + var err error + k8sClient, err = util.GetK8sClient() + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %v", err) + } + + clientset, err = util.GetClientset() + if err != nil { + return fmt.Errorf("failed to create Kubernetes clientset: %v", err) + } + + if err := doLog(name, namespace, followLogs); err != nil { + return err + } + + return nil + }, + } + + cmd.Flags().Int32VarP(&executorID, "executor", "e", -1, "Executor id to fetch logs from.") + cmd.Flags().BoolVarP(&followLogs, "follow", "f", false, "Specify if the logs should be streamed.") + cmd.Flags().DurationVar(&timeout, "timeout", 30*time.Second, "Timeout for fetching logs.") + + return cmd +} + +func doLog(name string, namespace string, stream bool) error { + key := types.NamespacedName{Namespace: namespace, Name: name} + app := &v1beta2.SparkApplication{} + if err := k8sClient.Get(context.TODO(), key, app); err != nil { + return fmt.Errorf("failed to get SparkApplication %s: %v", name, err) + } + + driverPodName := app.Status.DriverInfo.PodName + if driverPodName == "" { + return fmt.Errorf("driver pod not found") + } + + if stream { + return streamLogs(driverPodName, namespace, os.Stdout) + } + return printLogs(driverPodName, namespace, os.Stdout) +} + +func getPodNameChannel(name string, namespace string, k8sClient client.Client) chan string { + key := types.NamespacedName{Namespace: namespace, Name: name} + channel := make(chan string, 1) + go func() { + for { + app := &v1beta2.SparkApplication{} + if err := k8sClient.Get(context.TODO(), key, app); err != nil { + continue + } + if app.Status.DriverInfo.PodName != "" { + channel <- app.Status.DriverInfo.PodName + break + } + } + }() + return channel +} + +// printLogs is a one time operation that prints the fetched logs of the given pod. +func printLogs(name string, namespace string, out io.Writer) error { + rawLogs, err := clientset.CoreV1().Pods(namespace).GetLogs(name, &corev1.PodLogOptions{}).Do(context.TODO()).Raw() + if err != nil { + return err + } + + fmt.Fprintln(out, string(rawLogs)) + return nil +} + +// streamLogs streams the logs of the given pod until there are no more logs available. +func streamLogs(name string, namespace string, out io.Writer) error { + request := clientset.CoreV1().Pods(namespace).GetLogs(name, &corev1.PodLogOptions{Follow: true}) + reader, err := request.Stream(context.TODO()) + if err != nil { + return err + } + defer reader.Close() + + if _, err := io.Copy(out, reader); err != nil { + return err + } + return nil +} diff --git a/cmd/sparkctl/main.go b/cmd/sparkctl/main.go index 4f0e00e654..2d1d45e21c 100644 --- a/cmd/sparkctl/main.go +++ b/cmd/sparkctl/main.go @@ -1,11 +1,11 @@ /* -Copyright 2017 Google LLC +Copyright 2024 The Kubeflow authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - https://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -17,11 +17,53 @@ limitations under the License. package main import ( + "flag" + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/spf13/viper" _ "k8s.io/client-go/plugin/pkg/client/auth" + ctrl "sigs.k8s.io/controller-runtime" - "github.com/kubeflow/spark-operator/cmd/sparkctl/app" + "github.com/kubeflow/spark-operator/cmd/sparkctl/create" + "github.com/kubeflow/spark-operator/cmd/sparkctl/delete" + "github.com/kubeflow/spark-operator/cmd/sparkctl/event" + "github.com/kubeflow/spark-operator/cmd/sparkctl/forward" + "github.com/kubeflow/spark-operator/cmd/sparkctl/get" + "github.com/kubeflow/spark-operator/cmd/sparkctl/list" + "github.com/kubeflow/spark-operator/cmd/sparkctl/log" ) +func NewCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "sparkctl", + Short: "sparkctl is the command-line tool for working with the Spark Operator", + Long: `sparkctl is the command-line tool for working with the Spark Operator. +It supports creating, deleting and checking status of SparkApplication objects. It also supports fetching application logs.`, + } + + cmd.PersistentFlags().StringP("namespace", "n", "default", "The namespace in which the SparkApplication is to be created") + viper.BindPFlag("namespace", cmd.PersistentFlags().Lookup("namespace")) + + flagSet := flag.NewFlagSet("controller", flag.ExitOnError) + ctrl.RegisterFlags(flagSet) + cmd.Flags().AddGoFlagSet(flagSet) + + cmd.AddCommand(get.NewCommand()) + cmd.AddCommand(list.NewCommand()) + cmd.AddCommand(event.NewCommand()) + cmd.AddCommand(log.NewCommand()) + cmd.AddCommand(forward.NewCommand()) + cmd.AddCommand(create.NewCommand()) + cmd.AddCommand(delete.NewCommand()) + + return cmd +} + func main() { - app.Execute() + if err := NewCommand().Execute(); err != nil { + fmt.Fprintf(os.Stderr, "%v\n", err) + os.Exit(1) + } } diff --git a/pkg/common/spark.go b/pkg/common/spark.go index 94ae2c51da..64bf7ba2cf 100644 --- a/pkg/common/spark.go +++ b/pkg/common/spark.go @@ -16,10 +16,21 @@ limitations under the License. package common +// Hadoop environment variables. +const ( + // EnvHadoopConfDir is the environment variable to add to the driver and executor Pods that point + // to the directory where the Hadoop ConfigMap is mounted. + EnvHadoopConfDir = "HADOOP_CONF_DIR" +) + // Spark environment variables. const ( EnvSparkHome = "SPARK_HOME" + // EnvSparkConfDir is the environment variable to add to the driver and executor Pods that point + // to the directory where the Spark ConfigMap is mounted. + EnvSparkConfDir = "SPARK_CONF_DIR" + EnvKubernetesServiceHost = "KUBERNETES_SERVICE_HOST" EnvKubernetesServicePort = "KUBERNETES_SERVICE_PORT" @@ -278,14 +289,6 @@ const ( // HadoopConfigMapVolumeName is the name of the ConfigMap volume of Hadoop configuration files. HadoopConfigMapVolumeName = "hadoop-configmap-volume" - - // EnvSparkConfDir is the environment variable to add to the driver and executor Pods that point - // to the directory where the Spark ConfigMap is mounted. - EnvSparkConfDir = "SPARK_CONF_DIR" - - // EnvHadoopConfDir is the environment variable to add to the driver and executor Pods that point - // to the directory where the Hadoop ConfigMap is mounted. - EnvHadoopConfDir = "HADOOP_CONF_DIR" ) const ( diff --git a/pkg/util/client.go b/pkg/util/client.go new file mode 100644 index 0000000000..5b47666f9b --- /dev/null +++ b/pkg/util/client.go @@ -0,0 +1,59 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/kubeflow/spark-operator/api/v1beta1" + "github.com/kubeflow/spark-operator/api/v1beta2" +) + +var ( + scheme = runtime.NewScheme() +) + +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + + utilruntime.Must(v1beta1.AddToScheme(scheme)) + utilruntime.Must(v1beta2.AddToScheme(scheme)) + // +kubebuilder:scaffold:scheme +} + +func GetK8sClient() (client.Client, error) { + cfg, err := ctrl.GetConfig() + if err != nil { + return nil, err + } + + return client.New(cfg, client.Options{Scheme: scheme}) +} + +func GetClientset() (kubernetes.Interface, error) { + cfg, err := ctrl.GetConfig() + if err != nil { + return nil, err + } + + return kubernetes.NewForConfig(cfg) +} diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index 65bdb26922..0581ffddbd 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -19,6 +19,7 @@ package util import ( "crypto/md5" "fmt" + "os" "reflect" "strings" "time" @@ -27,11 +28,16 @@ import ( networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/yaml" "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/pkg/common" ) +const ( + bufferSize = 1024 +) + // GetDriverPodName returns name of the driver pod of the given spark application. func GetDriverPodName(app *v1beta2.SparkApplication) string { name := app.Spec.Driver.PodName @@ -495,3 +501,18 @@ func GetInitialExecutorNumber(app *v1beta2.SparkApplication) int32 { return initialNumExecutors } + +func LoadSparkApplicationFromFile(filePath string) (*v1beta2.SparkApplication, error) { + file, err := os.Open(filePath) + if err != nil { + return nil, err + } + defer file.Close() + + decoder := yaml.NewYAMLOrJSONDecoder(file, bufferSize) + app := &v1beta2.SparkApplication{} + if err := decoder.Decode(app); err != nil { + return nil, err + } + return app, nil +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 25f664dbca..dc72745fa2 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -18,11 +18,15 @@ package util import ( "fmt" + "net/url" "os" "path/filepath" "strings" + "time" "golang.org/x/mod/semver" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/duration" "sigs.k8s.io/yaml" "github.com/kubeflow/spark-operator/pkg/common" @@ -118,3 +122,44 @@ func WriteObjectToFile(obj interface{}, filePath string) error { return nil } + +func GetSinceTime(timestamp metav1.Time) string { + if timestamp.IsZero() { + return "N.A." + } + + return duration.ShortHumanDuration(time.Since(timestamp.Time)) +} + +func FormatNotAvailable(info string) string { + if info == "" { + return "N.A." + } + return info +} + +func FilterLocalFiles(files []string) ([]string, error) { + var localFiles []string + for _, file := range files { + if isLocal, err := IsLocalFile(file); err != nil { + return nil, err + } else if isLocal { + localFiles = append(localFiles, file) + } + } + + return localFiles, nil +} + +func IsLocalFile(file string) (bool, error) { + url, err := url.Parse(file) + if err != nil { + return false, err + } + + if url.Scheme == "" || url.Scheme == "file" { + return true, nil + } + + return false, nil +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 5f24d4a372..bf6cc4d7f0 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -18,9 +18,12 @@ package util_test import ( "os" + "testing" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -201,3 +204,52 @@ spec: Expect(os.Remove(file)).NotTo(HaveOccurred()) }) }) + +func TestIsLocalFile(t *testing.T) { + type testcase struct { + file string + isLocal bool + } + + testFn := func(test testcase, t *testing.T) { + isLocal, err := util.IsLocalFile(test.file) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, test.isLocal, isLocal, "%s: expected %v got %v", test.file, test.isLocal, isLocal) + } + + testcases := []testcase{ + {file: "/path/to/file", isLocal: true}, + {file: "file:///path/to/file", isLocal: true}, + {file: "local:///path/to/file", isLocal: false}, + {file: "http://localhost/path/to/file", isLocal: false}, + } + + for _, test := range testcases { + testFn(test, t) + } +} + +func TestFilterLocalFiles(t *testing.T) { + files := []string{ + "path/to/file", + "/path/to/file", + "file:///file/to/path", + "http://localhost/path/to/file", + "hdfs://localhost/path/to/file", + "gs://bucket/path/to/file", + } + + expected := []string{ + "path/to/file", + "/path/to/file", + "file:///file/to/path", + } + + actual, err := util.FilterLocalFiles(files) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, expected, actual) +}