diff --git a/apis/flinkcluster/v1beta1/flinkcluster_default.go b/apis/flinkcluster/v1beta1/flinkcluster_default.go index 6834e2fa..e2ae463e 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_default.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_default.go @@ -27,6 +27,7 @@ import ( ) const ( + DefaultParallelism = -1 DefaultJobManagerReplicas = 1 DefaultTaskManagerReplicas = 3 ForceTearDownAfter = time.Second * 10 diff --git a/apis/flinkcluster/v1beta1/flinkcluster_validate.go b/apis/flinkcluster/v1beta1/flinkcluster_validate.go index cac603a1..5f962e02 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_validate.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_validate.go @@ -533,8 +533,8 @@ func (v *Validator) validateJob(jobSpec *JobSpec) error { return fmt.Errorf("job jarFile or pythonFile or pythonModule is unspecified") } - if jobSpec.Parallelism != nil && *jobSpec.Parallelism < 1 { - return fmt.Errorf("job parallelism must be >= 1") + if jobSpec.Parallelism != nil && *jobSpec.Parallelism < 1 && *jobSpec.Parallelism != DefaultParallelism { + return fmt.Errorf("job parallelism must be -1 (adaptive) or >= 1") } if jobSpec.RestartPolicy == nil {