Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test peas in pod #227

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.20 as builder
FROM golang:1.20 AS builder

WORKDIR /workspace

Expand Down
29 changes: 23 additions & 6 deletions api/v1alpha2/minicluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ type MiniClusterSpec struct {
// +optional
Interactive bool `json:"interactive"`

// Allow >1 Flux running (oversubscribing resources)
// +kubebuilder:default=false
// +optional
Oversubscribe bool `json:"oversubscribe"`

// Flux options for the broker, shared across cluster
// +optional
Flux FluxSpec `json:"flux"`
Expand Down Expand Up @@ -821,13 +826,16 @@ func (f *MiniCluster) Validate() bool {
// Count the FluxRunners
if container.RunFlux {
fluxRunners += 1
}

// Give all flux containers a name, if not provided
if container.Name == "" {

// Non flux-runners are required to have a name
} else {
if container.Name == "" {
fmt.Printf("😥️ %s is missing a name\n", name)
return false
// Maintain previous behavior to have name == main flux runner
if i == 0 {
container.Name = f.Name
}
container.Name = fmt.Sprintf("%s-%d", container.Name, i)
}

// If a custom script is provided AND a command, no go
Expand All @@ -836,7 +844,16 @@ func (f *MiniCluster) Validate() bool {
return false
}
}
if fluxRunners != 1 {

// If we have more than one flux runner, must explicitly oversubscribe
if fluxRunners > 1 && !f.Spec.Oversubscribe {
fmt.Printf("😥️ More than one flux runner requires oversubscribe: true\n")
valid = false
}

// More than one container can run Flux (and the brokers see the same resources)
// But we need at least one!
if fluxRunners < 1 {
valid = false
}

Expand Down
5 changes: 5 additions & 0 deletions api/v1alpha2/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@
"default": {},
"$ref": "#/definitions/Network"
},
"oversubscribe": {
"description": "Allow \u003e1 Flux running (oversubscribing resources)",
"type": "boolean",
"default": false
},
"pod": {
"description": "Pod spec details",
"default": {},
Expand Down
8 changes: 8 additions & 0 deletions api/v1alpha2/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions chart/templates/minicluster-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/flux-framework.org_miniclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
6 changes: 4 additions & 2 deletions controllers/flux/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func getFluxContainer(

func getContainers(
specs []api.MiniClusterContainer,
defaultName string,
customName string,
mounts []corev1.VolumeMount,
serviceContainer bool,
) ([]corev1.Container, error) {
Expand Down Expand Up @@ -82,9 +82,11 @@ func getContainers(
// wait.sh path corresponds to container identifier
waitScript := fmt.Sprintf("/flux_operator/wait-%d.sh", i)
command = []string{"/bin/bash", waitScript}
containerName = defaultName
}

if customName != "" {
containerName = customName
}
// A container not running flux can only have pre/post sections
// in a custom script if we know the entrypoint.
if container.GenerateEntrypoint() && !serviceContainer {
Expand Down
4 changes: 3 additions & 1 deletion controllers/flux/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ func NewMiniClusterJob(cluster *api.MiniCluster) (*batchv1.Job, error) {
}

// Prepare listing of containers for the MiniCluster
// We don't provide a default name because defaults are provided in Validate()
// Only service containers have a custom name here
containers, err := getContainers(
cluster.Spec.Containers,
cluster.Name,
"",
mounts,
false,
)
Expand Down
10 changes: 10 additions & 0 deletions docs/getting_started/custom-resource-definition.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,16 @@ This would be equivalent to giving a start command of `sleep infinity` however o
(e.g., if there is a flux shutdown from within the Flux instance) the sleep command would
not exit with a failed code.

### oversubscribe

By default, we treat your single application container _or_ the single container in a MiniCluster pod designated to "runFlux" as the only Flux broker. When oversubscribe is set to true, you are allowed to define more than one "runFlux" container, meaning that multiple brokers will be sharing the same resources.

```yaml
oversubscribe: true
```

We created this use case with the intention of having a service container running fluxion alongside the MiniCluster to orchestrate the N containers. This is consiedered an advanced use case and you should use it with caution!

### launcher

If you are using an executor that launches Flux Jobs (e.g., workflow managers such as Snakemake and Nextflow do!)
Expand Down
1 change: 1 addition & 0 deletions docs/tutorials/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The following tutorials are provided from their respective directories (and are

These examples show how to interact with your flux queue from a sidecar container (that has access to the flux broker of the pod):

- [multiple-applications-per-pod](https://github.com/flux-framework/flux-operator/tree/main/examples/experimental/multiple-applications-per-pod): Allow multiple pods to be scheduled per node (controlled by cgroups)
- [flux-sidecar](https://github.com/flux-framework/flux-operator/blob/main/examples/tests/flux-sidecar) to see a sleep job in the main application queue

### Services
Expand Down
4 changes: 4 additions & 0 deletions examples/dist/flux-operator-arm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
4 changes: 4 additions & 0 deletions examples/dist/flux-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ spec:
description: Name for cluster headless service
type: string
type: object
oversubscribe:
default: false
description: Allow >1 Flux running (oversubscribing resources)
type: boolean
pod:
description: Pod spec details
properties:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
apiVersion: flux-framework.org/v1alpha2
kind: MiniCluster
metadata:
name: flux-sample
spec:
size: 4
interactive: true

# shell in, and then:
# source /mnt/flux/flux-view.sh
# flux proxy $fluxsocket bash
# flux resource list

# 1. Create resource graph independent of what hwloc does.
# 2. Will need to pass JGF into fluxion when it starts (try this out for R and starting the broker)
# 3. crearte resource graph with double number of resources we want. If we want N brokers, increase by factor of N.
# 4. Each brokers needs to get one 1/N of thart resource graph
# 5. When we submit jobspec, we need to submit with entire resource graph (request 4x the number of resouresce we want)
# the entire resource graph is allocated to that job
# for the executable we have to specify which part of graph we are execurting.
# all brokers at node level

# Note that:
# 1. all the containers here have an ubuntu 20.04 base!
# 2. The non-flux runners also need a name.
# 3. Since we control the logic of the sidecars, we need to add
# an entrypoint that keeps them running. Otherwise, it jumps to
# "not ready"
# 4. The issue we will run into is that the job won't complete when
# the main flux running shuts down. It will need to be deleted.
flux:
container:
image: ghcr.io/converged-computing/flux-view-ubuntu:tag-focal

# This ensures that fluxion is running as a service to the MiniCluster
services:
- image: ghcr.io/converged-computing/fluxion:latest
command: /code/bin/server --host 0.0.0.0
name: fluxion

# This starts the flux broker without a command (interactive)
interactive: true

# A required marker from the user that they want multiple runFlux
# to work. This is considered an advanced use case.
oversubscribe: true

containers:

# This is a faux "queue only" broker container. It will be
# the interface to which we submit jobs. We don't run Flux
# but we will orchestrate running things.
- image: rockylinux:9
name: queue

# TODO we will need to allow the other ones to still see flux
- image: ghcr.io/rse-ops/lammps-matrix:mpich-ubuntu-20.04-amd64
name: lammps
workingDir: /opt/lammps/examples/reaxff/HNS
command: run_interactive_cluster
runFlux: true

# command: lmp -v x 2 -v y 2 -v z 2 -in in.reaxc.hns -nocite
- image: ghcr.io/converged-computing/metric-ior:latest
name: ior
command: run_interactive_cluster
runFlux: true

- image: ghcr.io/converged-computing/metric-chatterbug:latest
name: chatterbug
command: run_interactive_cluster
runFlux: true
10 changes: 2 additions & 8 deletions examples/experimental/multiple-pods-per-node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,12 @@ gcloud container clusters create test-cluster \

### Install the Flux Operator

We are going to install the Flux operator from the refactor branch (with the feature added to disable affinity).
As follows:

```bash
git clone -b test-refactor-modular
cd test-refactor-modular

# You might need other dependencies, etc. here or to specify your own registry you can push to.
make test-deploy-recreate
kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
```

If/when this is released, you can install from a release.

### Experiments

Then create the flux operator pods.
Expand Down
2 changes: 1 addition & 1 deletion examples/tests/osu-benchmarks/minicluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ spec:
containers:
- image: ghcr.io/converged-computing/metric-osu-benchmark:latest
workingDir: /opt/osu-benchmark/build.openmpi/libexec/osu-micro-benchmarks/mpi/one-sided
command: ./osu_get_latency
command: ./osu_get_latency
8 changes: 7 additions & 1 deletion pkg/flux/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@ func getRandomToken(requested string) string {
}

// generateHostlist for a specific size given the cluster namespace and a size
func generateHostlist(cluster *api.MiniCluster, size int32) string {
// Note that we don't customize on the level of the container, but I'm
// generating them separately anticipating wanting slightly different setups.
func generateHostlist(
cluster *api.MiniCluster,
container api.MiniClusterContainer,
size int32,
) string {

var hosts string
if cluster.Spec.Flux.Bursting.Hostlist != "" {
Expand Down
30 changes: 18 additions & 12 deletions pkg/flux/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) {
// Custom logic for a sidecar container alongside flux
if container.GenerateEntrypoint() {
startScriptID := fmt.Sprintf("start-%d", i)
startScript, err := generateServiceEntrypoint(cluster, container)
startScript, err := generateServiceEntrypoint(cluster, container, i)
if err != nil {
return data, err
}
Expand All @@ -58,11 +58,16 @@ func GenerateEntrypoints(cluster *api.MiniCluster) (map[string]string, error) {
}

// generateServiceEntrypoint generates an entrypoint for a service container
func generateServiceEntrypoint(cluster *api.MiniCluster, container api.MiniClusterContainer) (string, error) {
func generateServiceEntrypoint(
cluster *api.MiniCluster,
container api.MiniClusterContainer,
containerIndex int) (string, error) {

st := ServiceTemplate{
ViewBase: cluster.Spec.Flux.Container.MountPath,
Container: container,
Spec: cluster.Spec,
ViewBase: cluster.Spec.Flux.Container.MountPath,
Container: container,
ContainerIndex: containerIndex,
Spec: cluster.Spec,
}

// Wrap the named template to identify it later
Expand All @@ -88,7 +93,7 @@ func generateEntrypointScript(
) (string, error) {

container := cluster.Spec.Containers[containerIndex]
mainHost := fmt.Sprintf("%s-0", cluster.Name)
mainHost := fmt.Sprintf("%s-0", container.Name)

// Ensure if we have a batch command, it gets split up
batchCommand := strings.Split(container.Command, "\n")
Expand All @@ -99,12 +104,13 @@ func generateEntrypointScript(

// The token uuid is the same across images
wt := WaitTemplate{
RequiredRanks: requiredRanks,
ViewBase: cluster.Spec.Flux.Container.MountPath,
Container: container,
MainHost: mainHost,
Spec: cluster.Spec,
Batch: batchCommand,
RequiredRanks: requiredRanks,
ViewBase: cluster.Spec.Flux.Container.MountPath,
ContainerIndex: containerIndex,
Container: container,
MainHost: mainHost,
Spec: cluster.Spec,
Batch: batchCommand,
}

// Wrap the named template to identify it later
Expand Down
12 changes: 8 additions & 4 deletions pkg/flux/scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ var startComponents string

// ServiceTemplate is for a separate service container
type ServiceTemplate struct {
ViewBase string // Where the mounted view with flux is expected to be
Container api.MiniClusterContainer
Spec api.MiniClusterSpec
ViewBase string // Where the mounted view with flux is expected to be
Container api.MiniClusterContainer
ContainerIndex int
Spec api.MiniClusterSpec
}

// WaitTemplate populates wait.sh for an application container entrypoint
Expand All @@ -40,7 +41,10 @@ type WaitTemplate struct {
MainHost string // Main host identifier
FluxToken string // Token to log into the UI, should be consistent across containers
Container api.MiniClusterContainer
Spec api.MiniClusterSpec

// Index for container, for generation of unique socket path
ContainerIndex int
Spec api.MiniClusterSpec

// Broker initial quorum that must be online to start
// This is used if the cluster MaxSize > Size
Expand Down
Loading
Loading