diff --git a/Dockerfile b/Dockerfile index 26fb6345..7ed8cb61 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM golang:1.20 as builder +FROM golang:1.20 AS builder WORKDIR /workspace diff --git a/api/v1alpha2/minicluster_types.go b/api/v1alpha2/minicluster_types.go index bd62bea3..2d3f982f 100644 --- a/api/v1alpha2/minicluster_types.go +++ b/api/v1alpha2/minicluster_types.go @@ -50,6 +50,11 @@ type MiniClusterSpec struct { // +optional Interactive bool `json:"interactive"` + // Allow >1 Flux running (oversubscribing resources) + // +kubebuilder:default=false + // +optional + Oversubscribe bool `json:"oversubscribe"` + // Flux options for the broker, shared across cluster // +optional Flux FluxSpec `json:"flux"` @@ -821,13 +826,16 @@ func (f *MiniCluster) Validate() bool { // Count the FluxRunners if container.RunFlux { fluxRunners += 1 + } + + // Give all flux containers a name, if not provided + if container.Name == "" { - // Non flux-runners are required to have a name - } else { - if container.Name == "" { - fmt.Printf("😥️ %s is missing a name\n", name) - return false + // Maintain previous behavior to have name == main flux runner + if i == 0 { + container.Name = f.Name } + container.Name = fmt.Sprintf("%s-%d", container.Name, i) } // If a custom script is provided AND a command, no go @@ -836,7 +844,16 @@ func (f *MiniCluster) Validate() bool { return false } } - if fluxRunners != 1 { + + // If we have more than one flux runner, must explicitly oversubscribe + if fluxRunners > 1 && !f.Spec.Oversubscribe { + fmt.Printf("😥️ More than one flux runner requires oversubscribe: true\n") + valid = false + } + + // More than one container can run Flux (and the brokers see the same resources) + // But we need at least one! + if fluxRunners < 1 { valid = false } diff --git a/api/v1alpha2/swagger.json b/api/v1alpha2/swagger.json index 7e57f3fe..a778d3e6 100644 --- a/api/v1alpha2/swagger.json +++ b/api/v1alpha2/swagger.json @@ -622,6 +622,11 @@ "default": {}, "$ref": "#/definitions/Network" }, + "oversubscribe": { + "description": "Allow \u003e1 Flux running (oversubscribing resources)", + "type": "boolean", + "default": false + }, "pod": { "description": "Pod spec details", "default": {}, diff --git a/api/v1alpha2/zz_generated.openapi.go b/api/v1alpha2/zz_generated.openapi.go index b2b352d7..a299f742 100644 --- a/api/v1alpha2/zz_generated.openapi.go +++ b/api/v1alpha2/zz_generated.openapi.go @@ -1077,6 +1077,14 @@ func schema_flux_framework_flux_operator_api_v1alpha2_MiniClusterSpec(ref common Format: "", }, }, + "oversubscribe": { + SchemaProps: spec.SchemaProps{ + Description: "Allow >1 Flux running (oversubscribing resources)", + Default: false, + Type: []string{"boolean"}, + Format: "", + }, + }, "flux": { SchemaProps: spec.SchemaProps{ Description: "Flux options for the broker, shared across cluster", diff --git a/chart/templates/minicluster-crd.yaml b/chart/templates/minicluster-crd.yaml index 290d5a1c..ffc74fc5 100644 --- a/chart/templates/minicluster-crd.yaml +++ b/chart/templates/minicluster-crd.yaml @@ -520,6 +520,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/config/crd/bases/flux-framework.org_miniclusters.yaml b/config/crd/bases/flux-framework.org_miniclusters.yaml index d9f163a0..76e43c41 100644 --- a/config/crd/bases/flux-framework.org_miniclusters.yaml +++ b/config/crd/bases/flux-framework.org_miniclusters.yaml @@ -523,6 +523,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/controllers/flux/containers.go b/controllers/flux/containers.go index 7d8051d3..4d7e25a2 100644 --- a/controllers/flux/containers.go +++ b/controllers/flux/containers.go @@ -53,7 +53,7 @@ func getFluxContainer( func getContainers( specs []api.MiniClusterContainer, - defaultName string, + customName string, mounts []corev1.VolumeMount, serviceContainer bool, ) ([]corev1.Container, error) { @@ -82,9 +82,11 @@ func getContainers( // wait.sh path corresponds to container identifier waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i) command = []string{"/bin/bash", waitScript} - containerName = defaultName } + if customName != "" { + containerName = customName + } // A container not running flux can only have pre/post sections // in a custom script if we know the entrypoint. if container.GenerateEntrypoint() && !serviceContainer { diff --git a/controllers/flux/job.go b/controllers/flux/job.go index ee79eeb0..bbf9295f 100644 --- a/controllers/flux/job.go +++ b/controllers/flux/job.go @@ -98,9 +98,11 @@ func NewMiniClusterJob(cluster *api.MiniCluster) (*batchv1.Job, error) { } // Prepare listing of containers for the MiniCluster + // We don't provide a default name because defaults are provided in Validate() + // Only service containers have a custom name here containers, err := getContainers( cluster.Spec.Containers, - cluster.Name, + "", mounts, false, ) diff --git a/docs/getting_started/custom-resource-definition.md b/docs/getting_started/custom-resource-definition.md index 484ef37a..ae3fb70b 100644 --- a/docs/getting_started/custom-resource-definition.md +++ b/docs/getting_started/custom-resource-definition.md @@ -92,6 +92,16 @@ This would be equivalent to giving a start command of `sleep infinity` however o (e.g., if there is a flux shutdown from within the Flux instance) the sleep command would not exit with a failed code. +### oversubscribe + +By default, we treat your single application container _or_ the single container in a MiniCluster pod designated to "runFlux" as the only Flux broker. When oversubscribe is set to true, you are allowed to define more than one "runFlux" container, meaning that multiple brokers will be sharing the same resources. + +```yaml + oversubscribe: true +``` + +We created this use case with the intention of having a service container running fluxion alongside the MiniCluster to orchestrate the N containers. This is consiedered an advanced use case and you should use it with caution! + ### launcher If you are using an executor that launches Flux Jobs (e.g., workflow managers such as Snakemake and Nextflow do!) diff --git a/docs/tutorials/index.md b/docs/tutorials/index.md index 7210b2fa..352cbfc3 100644 --- a/docs/tutorials/index.md +++ b/docs/tutorials/index.md @@ -56,6 +56,7 @@ The following tutorials are provided from their respective directories (and are These examples show how to interact with your flux queue from a sidecar container (that has access to the flux broker of the pod): + - [multiple-applications-per-pod](https://github.com/flux-framework/flux-operator/tree/main/examples/experimental/multiple-applications-per-pod): Allow multiple pods to be scheduled per node (controlled by cgroups) - [flux-sidecar](https://github.com/flux-framework/flux-operator/blob/main/examples/tests/flux-sidecar) to see a sleep job in the main application queue ### Services diff --git a/examples/dist/flux-operator-arm.yaml b/examples/dist/flux-operator-arm.yaml index 493eeee4..03c37c05 100644 --- a/examples/dist/flux-operator-arm.yaml +++ b/examples/dist/flux-operator-arm.yaml @@ -529,6 +529,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/examples/dist/flux-operator.yaml b/examples/dist/flux-operator.yaml index 8c01f467..7885d6e4 100644 --- a/examples/dist/flux-operator.yaml +++ b/examples/dist/flux-operator.yaml @@ -529,6 +529,10 @@ spec: description: Name for cluster headless service type: string type: object + oversubscribe: + default: false + description: Allow >1 Flux running (oversubscribing resources) + type: boolean pod: description: Pod spec details properties: diff --git a/examples/experimental/multiple-applications-per-pod/minicluster.yaml b/examples/experimental/multiple-applications-per-pod/minicluster.yaml new file mode 100644 index 00000000..63c356af --- /dev/null +++ b/examples/experimental/multiple-applications-per-pod/minicluster.yaml @@ -0,0 +1,72 @@ +apiVersion: flux-framework.org/v1alpha2 +kind: MiniCluster +metadata: + name: flux-sample +spec: + size: 4 + interactive: true + + # shell in, and then: + # source /mnt/flux/flux-view.sh + # flux proxy $fluxsocket bash + # flux resource list + +# 1. Create resource graph independent of what hwloc does. +# 2. Will need to pass JGF into fluxion when it starts (try this out for R and starting the broker) +# 3. crearte resource graph with double number of resources we want. If we want N brokers, increase by factor of N. +# 4. Each brokers needs to get one 1/N of thart resource graph +# 5. When we submit jobspec, we need to submit with entire resource graph (request 4x the number of resouresce we want) +# the entire resource graph is allocated to that job +# for the executable we have to specify which part of graph we are execurting. +# all brokers at node level + + # Note that: + # 1. all the containers here have an ubuntu 20.04 base! + # 2. The non-flux runners also need a name. + # 3. Since we control the logic of the sidecars, we need to add + # an entrypoint that keeps them running. Otherwise, it jumps to + # "not ready" + # 4. The issue we will run into is that the job won't complete when + # the main flux running shuts down. It will need to be deleted. + flux: + container: + image: ghcr.io/converged-computing/flux-view-ubuntu:tag-focal + + # This ensures that fluxion is running as a service to the MiniCluster + services: + - image: ghcr.io/converged-computing/fluxion:latest + command: /code/bin/server --host 0.0.0.0 + name: fluxion + + # This starts the flux broker without a command (interactive) + interactive: true + + # A required marker from the user that they want multiple runFlux + # to work. This is considered an advanced use case. + oversubscribe: true + + containers: + + # This is a faux "queue only" broker container. It will be + # the interface to which we submit jobs. We don't run Flux + # but we will orchestrate running things. + - image: rockylinux:9 + name: queue + + # TODO we will need to allow the other ones to still see flux + - image: ghcr.io/rse-ops/lammps-matrix:mpich-ubuntu-20.04-amd64 + name: lammps + workingDir: /opt/lammps/examples/reaxff/HNS + command: run_interactive_cluster + runFlux: true + + # command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite + - image: ghcr.io/converged-computing/metric-ior:latest + name: ior + command: run_interactive_cluster + runFlux: true + + - image: ghcr.io/converged-computing/metric-chatterbug:latest + name: chatterbug + command: run_interactive_cluster + runFlux: true \ No newline at end of file diff --git a/examples/experimental/multiple-pods-per-node/README.md b/examples/experimental/multiple-pods-per-node/README.md index 51f85939..63c8a8fd 100644 --- a/examples/experimental/multiple-pods-per-node/README.md +++ b/examples/experimental/multiple-pods-per-node/README.md @@ -27,18 +27,12 @@ gcloud container clusters create test-cluster \ ### Install the Flux Operator -We are going to install the Flux operator from the refactor branch (with the feature added to disable affinity). +As follows: ```bash -git clone -b test-refactor-modular -cd test-refactor-modular - -# You might need other dependencies, etc. here or to specify your own registry you can push to. -make test-deploy-recreate +kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml ``` -If/when this is released, you can install from a release. - ### Experiments Then create the flux operator pods. diff --git a/examples/tests/osu-benchmarks/minicluster.yaml b/examples/tests/osu-benchmarks/minicluster.yaml index bf6a3d9f..f03a6dff 100644 --- a/examples/tests/osu-benchmarks/minicluster.yaml +++ b/examples/tests/osu-benchmarks/minicluster.yaml @@ -21,4 +21,4 @@ spec: containers: - image: ghcr.io/converged-computing/metric-osu-benchmark:latest workingDir: /opt/osu-benchmark/build.openmpi/libexec/osu-micro-benchmarks/mpi/one-sided - command: ./osu_get_latency \ No newline at end of file + command: ./osu_get_latency diff --git a/pkg/flux/config.go b/pkg/flux/config.go index 10f607a0..df2918d5 100644 --- a/pkg/flux/config.go +++ b/pkg/flux/config.go @@ -34,7 +34,13 @@ func getRandomToken(requested string) string { } // generateHostlist for a specific size given the cluster namespace and a size -func generateHostlist(cluster *api.MiniCluster, size int32) string { +// Note that we don't customize on the level of the container, but I'm +// generating them separately anticipating wanting slightly different setups. +func generateHostlist( + cluster *api.MiniCluster, + container api.MiniClusterContainer, + size int32, +) string { var hosts string if cluster.Spec.Flux.Bursting.Hostlist != "" { diff --git a/pkg/flux/entrypoint.go b/pkg/flux/entrypoint.go index c9b50a90..d30e9e9c 100644 --- a/pkg/flux/entrypoint.go +++ b/pkg/flux/entrypoint.go @@ -37,7 +37,7 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) { // Custom logic for a sidecar container alongside flux if container.GenerateEntrypoint() { startScriptID := fmt.Sprintf("start-%d", i) - startScript, err := generateServiceEntrypoint(cluster, container) + startScript, err := generateServiceEntrypoint(cluster, container, i) if err != nil { return data, err } @@ -58,11 +58,16 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) { } // generateServiceEntrypoint generates an entrypoint for a service container -func generateServiceEntrypoint(cluster *api.MiniCluster, container api.MiniClusterContainer) (string, error) { +func generateServiceEntrypoint( + cluster *api.MiniCluster, + container api.MiniClusterContainer, + containerIndex int) (string, error) { + st := ServiceTemplate{ - ViewBase: cluster.Spec.Flux.Container.MountPath, - Container: container, - Spec: cluster.Spec, + ViewBase: cluster.Spec.Flux.Container.MountPath, + Container: container, + ContainerIndex: containerIndex, + Spec: cluster.Spec, } // Wrap the named template to identify it later @@ -88,7 +93,7 @@ func generateEntrypointScript( ) (string, error) { container := cluster.Spec.Containers[containerIndex] - mainHost := fmt.Sprintf("%s-0", cluster.Name) + mainHost := fmt.Sprintf("%s-0", container.Name) // Ensure if we have a batch command, it gets split up batchCommand := strings.Split(container.Command, "\n") @@ -99,12 +104,13 @@ func generateEntrypointScript( // The token uuid is the same across images wt := WaitTemplate{ - RequiredRanks: requiredRanks, - ViewBase: cluster.Spec.Flux.Container.MountPath, - Container: container, - MainHost: mainHost, - Spec: cluster.Spec, - Batch: batchCommand, + RequiredRanks: requiredRanks, + ViewBase: cluster.Spec.Flux.Container.MountPath, + ContainerIndex: containerIndex, + Container: container, + MainHost: mainHost, + Spec: cluster.Spec, + Batch: batchCommand, } // Wrap the named template to identify it later diff --git a/pkg/flux/scripts.go b/pkg/flux/scripts.go index 45b798f1..81f30b23 100644 --- a/pkg/flux/scripts.go +++ b/pkg/flux/scripts.go @@ -29,9 +29,10 @@ var startComponents string // ServiceTemplate is for a separate service container type ServiceTemplate struct { - ViewBase string // Where the mounted view with flux is expected to be - Container api.MiniClusterContainer - Spec api.MiniClusterSpec + ViewBase string // Where the mounted view with flux is expected to be + Container api.MiniClusterContainer + ContainerIndex int + Spec api.MiniClusterSpec } // WaitTemplate populates wait.sh for an application container entrypoint @@ -40,7 +41,10 @@ type WaitTemplate struct { MainHost string // Main host identifier FluxToken string // Token to log into the UI, should be consistent across containers Container api.MiniClusterContainer - Spec api.MiniClusterSpec + + // Index for container, for generation of unique socket path + ContainerIndex int + Spec api.MiniClusterSpec // Broker initial quorum that must be online to start // This is used if the cluster MaxSize > Size diff --git a/pkg/flux/templates/components.sh b/pkg/flux/templates/components.sh index c246d285..6df90bf2 100644 --- a/pkg/flux/templates/components.sh +++ b/pkg/flux/templates/components.sh @@ -63,6 +63,43 @@ command="/bin/bash ./custom-entrypoint.sh" {{end}} {{end}} +{{define "broker"}} +brokerOptions="-Scron.directory=/etc/flux/system/cron-{{ .ContainerIndex }}.d \ + -Stbon.fanout=256 \ + -Srundir=${viewroot}/run/flux {{ if .Spec.Interactive }}-Sbroker.rc2_none {{ end }} \ + -Sstatedir=${STATE_DIR} \ + -Slocal-uri=local://$viewroot/run/flux/local-{{ .ContainerIndex }} \ +{{ if .Spec.Flux.ConnectTimeout }}-Stbon.connect_timeout={{ .Spec.Flux.ConnectTimeout }}{{ end }} \ +{{ if .RequiredRanks }}-Sbroker.quorum={{ .RequiredRanks }}{{ end }} \ +{{ if .Spec.Logging.Zeromq }}-Stbon.zmqdebug=1{{ end }} \ +{{ if not .Spec.Logging.Quiet }} -Slog-stderr-level={{or .Spec.Flux.LogLevel 6}} {{ else }} -Slog-stderr-level=0 {{ end }} \ + -Slog-stderr-mode=local" + + +# Run an interactive cluster, giving no command to flux start +function run_interactive_cluster() { + echo "🌀 flux broker --config-path ${cfg} ${brokerOptions}" + flux broker --config-path ${cfg} ${brokerOptions} +} +{{end}} + +{{define "worker-broker"}} +cfg="${viewroot}/etc/flux/config-{{ .ContainerIndex }}" +brokerOptions="-Stbon.fanout=256 \ + -Srundir=${viewroot}/run/flux {{ if .Spec.Interactive }}-Sbroker.rc2_none {{ end }} \ + -Slocal-uri=local://$viewroot/run/flux/local-{{ .ContainerIndex }} \ +{{ if .Spec.Flux.ConnectTimeout }}-Stbon.connect_timeout={{ .Spec.Flux.ConnectTimeout }}{{ end }} \ +{{ if .Spec.Logging.Zeromq }}-Stbon.zmqdebug=1{{ end }} \ +{{ if not .Spec.Logging.Quiet }} -Slog-stderr-level={{or .Spec.Flux.LogLevel 6}} {{ else }} -Slog-stderr-level=0 {{ end }} \ + -Slog-stderr-mode=local" + +# This is provided as an optional function for a worker +function run_interactive_cluster() { + echo "🌀 flux broker --config-path ${cfg} ${brokerOptions}" + flux broker --config-path ${cfg} ${brokerOptions} +} +{{end}} + {{define "paths"}} foundroot=$(find $viewroot -maxdepth 2 -type d -path $viewroot/lib/python3\*) {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }} pythonversion=$(basename ${foundroot}) @@ -92,7 +129,7 @@ cat <> ${viewbase}/flux-view.sh export PATH=$PATH export PYTHONPATH=$PYTHONPATH export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$viewroot/lib -export fluxsocket=local://${viewroot}/run/flux/local +export fluxsocket=local://${viewroot}/run/flux/local-{{ .ContainerIndex }} EOT {{end}} {{define "ensure-pip"}} diff --git a/pkg/flux/templates/start.sh b/pkg/flux/templates/start.sh index 3c984d4e..755dafce 100644 --- a/pkg/flux/templates/start.sh +++ b/pkg/flux/templates/start.sh @@ -18,7 +18,7 @@ {{ .Container.Commands.ServicePre}} {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }} # Ensure socket path is envar for user -fluxsocket=${viewroot}/run/flux/local +fluxsocket=${viewroot}/run/flux/local-{{ .ContainerIndex }} # Wait for it to exist (application is running) {{ if .Spec.Flux.NoWaitSocket }}{{ else }}goshare-wait-fs -p ${fluxsocket} {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }}{{ end }} @@ -29,6 +29,8 @@ fluxsocket="local://$fluxsocket" # Is a custom script provided? {{template "custom-script" .}} +{{template "worker-broker" .}} + {{ .Container.Command }} {{ .Container.Commands.Post}} diff --git a/pkg/flux/templates/wait.sh b/pkg/flux/templates/wait.sh index dc489d45..4873fdf7 100644 --- a/pkg/flux/templates/wait.sh +++ b/pkg/flux/templates/wait.sh @@ -19,7 +19,7 @@ fluxuser=$(whoami) fluxuid=$(id -u $fluxuser) # Variables we can use again -cfg="${viewroot}/etc/flux/config" +cfg="${viewroot}/etc/flux/config-{{ .ContainerIndex }}" command="{{ .Container.Command }}" # Is a custom script provided? This will override command @@ -55,18 +55,18 @@ chown -R ${fluxuid} ${curvepath} # If we have disabled the view, we need to use the flux here to generate resources {{ if .Spec.Flux.Container.Disable }} -hosts=$(cat ${viewroot}/etc/flux/system/hostlist) +hosts=$(cat ${viewroot}/etc/flux/system/hostlist-{{ .ContainerIndex }}) {{ if not .Spec.Logging.Quiet }} echo echo "📦 Resources" echo "flux R encode --hosts=${hosts} --local" {{ end }} -flux R encode --hosts=${hosts} --local > ${viewroot}/etc/flux/system/R -{{ if not .Spec.Logging.Quiet }}cat ${viewroot}/etc/flux/system/R{{ end }} +flux R encode --hosts=${hosts} --local > ${viewroot}/etc/flux/system/R-{{ .ContainerIndex }} +{{ if not .Spec.Logging.Quiet }}cat ${viewroot}/etc/flux/system/R-{{ .ContainerIndex }}{{ end }} {{ end }} # Put the state directory in /var/lib on shared view -export STATE_DIR=${viewroot}/var/lib/flux +export STATE_DIR=${viewroot}/var/lib/flux-{{ .ContainerIndex }} export FLUX_OUTPUT_DIR={{ if .Container.Logs }}{{.Container.Logs}}{{ else }}/tmp/fluxout{{ end }} mkdir -p ${STATE_DIR} ${FLUX_OUTPUT_DIR} @@ -82,23 +82,7 @@ echo "The working directory is ${workdir}, contents include:" ls . {{ end }} -brokerOptions="-Scron.directory=/etc/flux/system/cron.d \ - -Stbon.fanout=256 \ - -Srundir=${viewroot}/run/flux {{ if .Spec.Interactive }}-Sbroker.rc2_none {{ end }} \ - -Sstatedir=${STATE_DIR} \ - -Slocal-uri=local://$viewroot/run/flux/local \ -{{ if .Spec.Flux.ConnectTimeout }}-Stbon.connect_timeout={{ .Spec.Flux.ConnectTimeout }}{{ end }} \ -{{ if .RequiredRanks }}-Sbroker.quorum={{ .RequiredRanks }}{{ end }} \ -{{ if .Spec.Logging.Zeromq }}-Stbon.zmqdebug=1{{ end }} \ -{{ if not .Spec.Logging.Quiet }} -Slog-stderr-level={{or .Spec.Flux.LogLevel 6}} {{ else }} -Slog-stderr-level=0 {{ end }} \ - -Slog-stderr-mode=local" - - -# Run an interactive cluster, giving no command to flux start -function run_interactive_cluster() { - echo "🌀 flux broker --config-path ${cfg} ${brokerOptions}" - flux broker --config-path ${cfg} ${brokerOptions} -} +{{template "broker" .}} # if we are given an archive to use, load first, not required to exist # Note that we ask the user to dump in interactive mode - I am not @@ -180,13 +164,13 @@ else {{ .Container.Commands.WorkerPre}} {{ if .Spec.Logging.Quiet }}> /dev/null 2>&1{{ end }} # We basically sleep/wait until the lead broker is ready - echo "🌀 flux start {{ if .Spec.Flux.Wrap }}--wrap={{ .Spec.Flux.Wrap }} {{ end }} -o --config ${viewroot}/etc/flux/config ${brokerOptions}" + echo "🌀 flux start {{ if .Spec.Flux.Wrap }}--wrap={{ .Spec.Flux.Wrap }} {{ end }} -o --config ${cfg} ${brokerOptions}" # We can keep trying forever, don't care if worker is successful or not # Unless retry count is set, in which case we stop after retries while true do - flux start -o --config ${viewroot}/etc/flux/config ${brokerOptions} + flux start -o --config ${cfg} ${brokerOptions} retval=$? if [[ "${retval}" -eq 0 ]] || [[ "{{ .Spec.Flux.CompleteWorkers }}" == "true" ]]; then echo "The follower worker exited cleanly. Goodbye!" diff --git a/pkg/flux/view.go b/pkg/flux/view.go index 6061facc..9c9db030 100644 --- a/pkg/flux/view.go +++ b/pkg/flux/view.go @@ -29,8 +29,8 @@ func generateHostBlock(hosts string, cluster *api.MiniCluster) string { // Unless we have a bursting broker address if cluster.Spec.Flux.Bursting.LeadBroker.Address != "" { - hostTemplate = `hosts = [{host="%s", bind="tcp://eth0:%s", connect="tcp://%s:%s"}, - {host="%s"}]` + hostTemplate = `hosts = [{host="%s", bind="tcp://eth0:%s", connect="tcp://%s:%d"}, + {host="%d"}]` hostBlock = fmt.Sprintf( hostTemplate, @@ -43,12 +43,18 @@ func generateHostBlock(hosts string, cluster *api.MiniCluster) string { return hostBlock } -func generateBrokerConfig(cluster *api.MiniCluster, hosts string) string { +func generateBrokerConfig( + cluster *api.MiniCluster, + hosts string, + containerIndex int, +) string { if cluster.Spec.Flux.BrokerConfig != "" { return cluster.Spec.Flux.BrokerConfig } + // Port assembled based on index. Right now this only supports up + defaultPort := 8050 + containerIndex hostBlock := generateHostBlock(hosts, cluster) fqdn := fmt.Sprintf("%s.%s.svc.cluster.local", cluster.Spec.Network.HeadlessName, cluster.Namespace) @@ -62,17 +68,17 @@ allow-root-owner = true # Point to resource definition generated with flux-R(1). [resource] -path = "%s/view/etc/flux/system/R" +path = "%s/view/etc/flux/system/R-%d" [bootstrap] curve_cert = "%s/view/curve/curve.cert" -default_port = 8050 +default_port = %d default_bind = "%s" default_connect = "%s" %s [archive] -dbpath = "%s/view/var/lib/flux/job-archive.sqlite" +dbpath = "%s/view/var/lib/flux/job-archive-%d.sqlite" period = "1m" busytimeout = "50s" @@ -82,11 +88,14 @@ queue-policy = "%s" return fmt.Sprintf( template, cluster.Spec.Flux.Container.MountPath, + containerIndex, cluster.Spec.Flux.Container.MountPath, + defaultPort, defaultBind, defaultConnect, hostBlock, cluster.Spec.Flux.Container.MountPath, + containerIndex, cluster.Spec.Flux.Scheduler.QueuePolicy, ) @@ -96,37 +105,28 @@ queue-policy = "%s" // This is run inside of the flux container that will be copied to the empty volume // If the flux container is disabled, we still add an init container with // the broker config, etc., but we don't expect a flux view there. -func GenerateFluxEntrypoint(cluster *api.MiniCluster) (string, error) { +func GenerateFluxEntrypoint( + cluster *api.MiniCluster, +) (string, error) { // fluxRoot for the view is in /opt/view/lib // This must be consistent between the flux-view containers // github.com:converged-computing/flux-views.git fluxRoot := "/opt/view" - mainHost := fmt.Sprintf("%s-0", cluster.Name) - - // Generate hostlists, this is the lead broker - hosts := generateHostlist(cluster, cluster.Spec.MaxSize) - brokerConfig := generateBrokerConfig(cluster, hosts) - // If we are disabling the view, it won't have flux (or extra spack copies) // We copy our faux flux config directory (not a symlink) to the mount path spackView := fmt.Sprintf(`mkdir -p $viewroot/software -cp -R /opt/view/* %s/view`, + cp -R /opt/view/* %s/view`, cluster.Spec.Flux.Container.MountPath, ) generateHosts := `echo '📦 Flux view disabled, not generating resources here.' -mkdir -p ${fluxroot}/etc/flux/system -` - if !cluster.Spec.Flux.Container.Disable { - generateHosts = ` -echo "flux R encode --hosts=${hosts} --local" -flux R encode --hosts=${hosts} --local > ${fluxroot}/etc/flux/system/R + mkdir -p ${fluxroot}/etc/flux/system + ` -echo -echo "📦 Resources" -cat ${fluxroot}/etc/flux/system/R` + // Create a different broker.toml for each runFlux container + if !cluster.Spec.Flux.Container.Disable { spackView = `# Now prepare to copy finished spack view over echo "Moving content from /opt/view to be in shared volume at %s" @@ -143,9 +143,57 @@ cp -R /opt/software $viewroot/ ` } + // Generate a broker config for each potential running flux container + brokerConfigs := "" + for i, container := range cluster.Spec.Containers { + if !container.RunFlux { + continue + } + + // Generate hostlists, this is the lead broker + hosts := generateHostlist(cluster, container, cluster.Spec.MaxSize) + + // Create a different broker.toml for each runFlux container + if !cluster.Spec.Flux.Container.Disable { + generateHosts = fmt.Sprintf(` +echo "flux R encode --hosts=${hosts} --local" +flux R encode --hosts=${hosts} --local > ${fluxroot}/etc/flux/system/R-%d + +echo +echo "📦 Resources" +cat ${fluxroot}/etc/flux/system/R-%d`, i, i) + } + + brokerConfig := generateBrokerConfig(cluster, hosts, i) + brokerConfigs += fmt.Sprintf(` +# Write the broker configuration +mkdir -p ${fluxroot}/etc/flux/config-%d + +cat <> ${fluxroot}/etc/flux/config-%d/broker.toml +%s +EOT + +# These actions need to happen on all hosts +mkdir -p $fluxroot/etc/flux/system +hosts="%s" + +# Echo hosts here in case the main container needs to generate +echo "${hosts}" > ${fluxroot}/etc/flux/system/hostlist-%d +%s + +# Cron directory +mkdir -p $fluxroot/etc/flux/system/cron-%d.d +mkdir -p $fluxroot/var/lib/flux + +# The rundir needs to be created first, and owned by user flux +# Along with the state directory and curve certificate +mkdir -p ${fluxroot}/run/flux ${fluxroot}/etc/curve + +`, i, i, brokerConfig, hosts, i, generateHosts, i) + } + setup := `#!/bin/sh fluxroot=%s -mainHost=%s echo "Hello I am hostname $(hostname) running setup." # Always use verbose, no reason to not here @@ -158,31 +206,14 @@ export PATH=/opt/view/bin:$PATH # If the view doesn't exist, ensure basic paths do mkdir -p $fluxroot/bin -# Cron directory -mkdir -p $fluxroot/etc/flux/system/cron.d -mkdir -p $fluxroot/var/lib/flux - -# These actions need to happen on all hosts -mkdir -p $fluxroot/etc/flux/system -hosts="%s" - -# Echo hosts here in case the main container needs to generate -echo "${hosts}" > ${fluxroot}/etc/flux/system/hostlist -%s - -# Write the broker configuration -mkdir -p ${fluxroot}/etc/flux/config -cat <> ${fluxroot}/etc/flux/config/broker.toml %s -EOT echo echo "🐸 Broker Configuration" -cat ${fluxroot}/etc/flux/config/broker.toml - -# The rundir needs to be created first, and owned by user flux -# Along with the state directory and curve certificate -mkdir -p ${fluxroot}/run/flux ${fluxroot}/etc/curve +for filename in $(find ${fluxroot}/etc/flux -name broker.toml) + do + cat $filename +done # View the curve certificate echo "🌟️ Curve Certificate" @@ -201,10 +232,7 @@ echo "Application is done." return fmt.Sprintf( setup, fluxRoot, - mainHost, - hosts, - generateHosts, - brokerConfig, + brokerConfigs, cluster.Spec.Flux.Container.MountPath, spackView, ), nil diff --git a/sdk/python/v1alpha2/docs/MiniClusterSpec.md b/sdk/python/v1alpha2/docs/MiniClusterSpec.md index 74b4c855..c8693e38 100644 --- a/sdk/python/v1alpha2/docs/MiniClusterSpec.md +++ b/sdk/python/v1alpha2/docs/MiniClusterSpec.md @@ -16,6 +16,7 @@ Name | Type | Description | Notes **max_size** | **int** | MaxSize (maximum number of pods to allow scaling to) | [optional] **min_size** | **int** | MinSize (minimum number of pods that must be up for Flux) Note that this option does not edit the number of tasks, so a job could run with fewer (and then not start) | [optional] **network** | [**Network**](Network.md) | | [optional] +**oversubscribe** | **bool** | Allow >1 Flux running (oversubscribing resources) | [optional] [default to False] **pod** | [**PodSpec**](PodSpec.md) | | [optional] **services** | [**list[MiniClusterContainer]**](MiniClusterContainer.md) | Services are one or more service containers to bring up alongside the MiniCluster. | [optional] **share_process_namespace** | **bool** | Share process namespace? | [optional] [default to False] diff --git a/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py b/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py index ae98903e..0b1aeeba 100644 --- a/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py +++ b/sdk/python/v1alpha2/fluxoperator/models/mini_cluster_spec.py @@ -47,6 +47,7 @@ class MiniClusterSpec(object): 'max_size': 'int', 'min_size': 'int', 'network': 'Network', + 'oversubscribe': 'bool', 'pod': 'PodSpec', 'services': 'list[MiniClusterContainer]', 'share_process_namespace': 'bool', @@ -66,6 +67,7 @@ class MiniClusterSpec(object): 'max_size': 'maxSize', 'min_size': 'minSize', 'network': 'network', + 'oversubscribe': 'oversubscribe', 'pod': 'pod', 'services': 'services', 'share_process_namespace': 'shareProcessNamespace', @@ -73,7 +75,7 @@ class MiniClusterSpec(object): 'tasks': 'tasks' } - def __init__(self, archive=None, cleanup=False, containers=None, deadline_seconds=31500000, flux=None, interactive=False, job_labels=None, logging=None, max_size=None, min_size=None, network=None, pod=None, services=None, share_process_namespace=False, size=1, tasks=1, local_vars_configuration=None): # noqa: E501 + def __init__(self, archive=None, cleanup=False, containers=None, deadline_seconds=31500000, flux=None, interactive=False, job_labels=None, logging=None, max_size=None, min_size=None, network=None, oversubscribe=False, pod=None, services=None, share_process_namespace=False, size=1, tasks=1, local_vars_configuration=None): # noqa: E501 """MiniClusterSpec - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -90,6 +92,7 @@ def __init__(self, archive=None, cleanup=False, containers=None, deadline_second self._max_size = None self._min_size = None self._network = None + self._oversubscribe = None self._pod = None self._services = None self._share_process_namespace = None @@ -118,6 +121,8 @@ def __init__(self, archive=None, cleanup=False, containers=None, deadline_second self.min_size = min_size if network is not None: self.network = network + if oversubscribe is not None: + self.oversubscribe = oversubscribe if pod is not None: self.pod = pod if services is not None: @@ -376,6 +381,29 @@ def network(self, network): self._network = network + @property + def oversubscribe(self): + """Gets the oversubscribe of this MiniClusterSpec. # noqa: E501 + + Allow >1 Flux running (oversubscribing resources) # noqa: E501 + + :return: The oversubscribe of this MiniClusterSpec. # noqa: E501 + :rtype: bool + """ + return self._oversubscribe + + @oversubscribe.setter + def oversubscribe(self, oversubscribe): + """Sets the oversubscribe of this MiniClusterSpec. + + Allow >1 Flux running (oversubscribing resources) # noqa: E501 + + :param oversubscribe: The oversubscribe of this MiniClusterSpec. # noqa: E501 + :type oversubscribe: bool + """ + + self._oversubscribe = oversubscribe + @property def pod(self): """Gets the pod of this MiniClusterSpec. # noqa: E501