Skip to content

Commit

Permalink
[Feature] Added no_wait option for clusters to skip waiting to star…
Browse files Browse the repository at this point in the history
…t on cluster creation (#3953)

## Changes
Added no_wait option for clusters to skip waiting to start on cluster
creation

Fixes
#3837

Also required for databricks/cli#1698

## Tests
- [x] `make test` run locally
- [x] relevant change in `docs/` folder
- [x] covered with integration tests in `internal/acceptance`
- [x] relevant acceptance tests are passing
- [x] using Go SDK


Manually run the following configuration to confirm everything works

```
data "databricks_node_type" "smallest" {
  local_disk = true
}

data "databricks_spark_version" "latest_lts" {
  long_term_support = true
}

resource "databricks_cluster" "shared_autoscaling" {
  cluster_name            = "Andrew Nester TF Test"
  spark_version           = data.databricks_spark_version.latest_lts.id
  node_type_id            = data.databricks_node_type.smallest.id
  autotermination_minutes = 20
  autoscale {
    min_workers = 1
    max_workers = 50
  }
  no_wait = true

  library {
    pypi {
        package = "fbprophet==0.6"
    }
  }
}
```

---------

Co-authored-by: Pieter Noordhuis <[email protected]>
Co-authored-by: Miles Yucht <[email protected]>
  • Loading branch information
3 people authored Sep 2, 2024
1 parent f316f41 commit e4c36c8
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 6 deletions.
35 changes: 29 additions & 6 deletions clusters/resource_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ func (ClusterSpec) CustomizeSchemaResourceSpecific(s *common.CustomizableSchema)
return old == new
},
})
s.AddNewField("no_wait", &schema.Schema{
Type: schema.TypeBool,
Optional: true,
Default: false,
DiffSuppressFunc: func(k, old, new string, d *schema.ResourceData) bool {
if old == "" && new == "false" {
return true
}
return old == new
},
})
s.AddNewField("state", &schema.Schema{
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -414,11 +425,8 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
if err != nil {
return err
}
clusterInfo, err := clusterWaiter.GetWithTimeout(timeout)
if err != nil {
return err
}
d.SetId(clusterInfo.ClusterId)

d.SetId(clusterWaiter.ClusterId)
d.Set("cluster_id", d.Id())
isPinned, ok := d.GetOk("is_pinned")
if ok && isPinned.(bool) {
Expand All @@ -437,6 +445,20 @@ func resourceClusterCreate(ctx context.Context, d *schema.ResourceData, c *commo
}); err != nil {
return err
}
}

// If there is a no_wait flag set to true, don't wait for the cluster to be created
noWait, ok := d.GetOk("no_wait")
if ok && noWait.(bool) {
return nil
}

clusterInfo, err := clusterWaiter.GetWithTimeout(timeout)
if err != nil {
return err
}

if len(cluster.Libraries) > 0 {
_, err := libraries.WaitForLibrariesInstalledSdk(ctx, w, compute.Wait{
ClusterID: d.Id(),
IsRunning: clusterInfo.IsRunningOrResizing(),
Expand Down Expand Up @@ -508,7 +530,7 @@ func resourceClusterRead(ctx context.Context, d *schema.ResourceData, c *common.
func hasClusterConfigChanged(d *schema.ResourceData) bool {
for k := range clusterSchema {
// TODO: create a map if we'll add more non-cluster config parameters in the future
if k == "library" || k == "is_pinned" {
if k == "library" || k == "is_pinned" || k == "no_wait" {
continue
}
if d.HasChange(k) {
Expand Down Expand Up @@ -551,6 +573,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, c *commo
for k := range clusterSchema {
if k == "library" ||
k == "is_pinned" ||
k == "no_wait" ||
k == "num_workers" ||
k == "autoscale" {
continue
Expand Down
155 changes: 155 additions & 0 deletions clusters/resource_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,161 @@ func TestResourceClusterCreatePhoton(t *testing.T) {
assert.Equal(t, "abc", d.Id())
}

func TestResourceClusterCreateNoWait_WithLibraries(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.1/clusters/create",
ExpectedRequest: compute.ClusterSpec{
NumWorkers: 100,
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 60,
},
Response: compute.ClusterDetails{
ClusterId: "abc",
State: compute.StateUnknown,
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.1/clusters/get?cluster_id=abc",
Response: compute.ClusterDetails{
ClusterId: "abc",
NumWorkers: 100,
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 15,
State: compute.StateUnknown,
},
},
{
Method: "POST",
Resource: "/api/2.1/clusters/events",
ExpectedRequest: compute.GetEvents{
ClusterId: "abc",
Limit: 1,
Order: compute.GetEventsOrderDesc,
EventTypes: []compute.EventType{compute.EventTypePinned, compute.EventTypeUnpinned},
},
Response: compute.GetEventsResponse{
Events: []compute.ClusterEvent{},
TotalCount: 0,
},
},
{
Method: "POST",
Resource: "/api/2.0/libraries/install",
ExpectedRequest: compute.InstallLibraries{
ClusterId: "abc",
Libraries: []compute.Library{
{
Pypi: &compute.PythonPyPiLibrary{
Package: "seaborn==1.2.4",
},
},
},
},
},
{
Method: "GET",
Resource: "/api/2.0/libraries/cluster-status?cluster_id=abc",
Response: compute.ClusterLibraryStatuses{
LibraryStatuses: []compute.LibraryFullStatus{
{
Library: &compute.Library{
Pypi: &compute.PythonPyPiLibrary{
Package: "seaborn==1.2.4",
},
},
Status: compute.LibraryInstallStatusPending,
},
},
},
},
},
Create: true,
Resource: ResourceCluster(),
HCL: `num_workers = 100
spark_version = "7.1-scala12"
node_type_id = "i3.xlarge"
no_wait = true
library {
pypi {
package = "seaborn==1.2.4"
}
}`,
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "abc", d.Id())
}

func TestResourceClusterCreateNoWait(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
{
Method: "POST",
Resource: "/api/2.1/clusters/create",
ExpectedRequest: compute.ClusterSpec{
NumWorkers: 100,
ClusterName: "Shared Autoscaling",
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 15,
},
Response: compute.ClusterDetails{
ClusterId: "abc",
State: compute.StateUnknown,
},
},
{
Method: "GET",
ReuseRequest: true,
Resource: "/api/2.1/clusters/get?cluster_id=abc",
Response: compute.ClusterDetails{
ClusterId: "abc",
NumWorkers: 100,
ClusterName: "Shared Autoscaling",
SparkVersion: "7.1-scala12",
NodeTypeId: "i3.xlarge",
AutoterminationMinutes: 15,
State: compute.StateUnknown,
},
},
{
Method: "POST",
Resource: "/api/2.1/clusters/events",
ExpectedRequest: compute.GetEvents{
ClusterId: "abc",
Limit: 1,
Order: compute.GetEventsOrderDesc,
EventTypes: []compute.EventType{compute.EventTypePinned, compute.EventTypeUnpinned},
},
Response: compute.GetEventsResponse{
Events: []compute.ClusterEvent{},
TotalCount: 0,
},
},
},
Create: true,
Resource: ResourceCluster(),
State: map[string]any{
"autotermination_minutes": 15,
"cluster_name": "Shared Autoscaling",
"spark_version": "7.1-scala12",
"node_type_id": "i3.xlarge",
"num_workers": 100,
"is_pinned": false,
"no_wait": true,
},
}.Apply(t)
assert.NoError(t, err)
assert.Equal(t, "abc", d.Id())
}

func TestResourceClusterCreate_Error(t *testing.T) {
d, err := qa.ResourceFixture{
Fixtures: []qa.HTTPFixture{
Expand Down
1 change: 1 addition & 0 deletions docs/resources/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ resource "databricks_cluster" "shared_autoscaling" {
* `custom_tags` - (Optional) Additional tags for cluster resources. Databricks will tag all cluster resources (e.g., AWS EC2 instances and EBS volumes) with these tags in addition to `default_tags`. If a custom cluster tag has the same name as a default cluster tag, the custom tag is prefixed with an `x_` when it is propagated.
* `spark_conf` - (Optional) Map with key-value pairs to fine-tune Spark clusters, where you can provide custom [Spark configuration properties](https://spark.apache.org/docs/latest/configuration.html) in a cluster configuration.
* `is_pinned` - (Optional) boolean value specifying if the cluster is pinned (not pinned by default). You must be a Databricks administrator to use this. The pinned clusters' maximum number is [limited to 100](https://docs.databricks.com/clusters/clusters-manage.html#pin-a-cluster), so `apply` may fail if you have more than that (this number may change over time, so check Databricks documentation for actual number).
* `no_wait` - (Optional) If true, the provider will not wait for the cluster to reach `RUNNING` state when creating the cluster, allowing cluster creation and library installation to continue asynchronously. Defaults to false (the provider will wait for cluster creation and library installation to succeed).

The following example demonstrates how to create an autoscaling cluster with [Delta Cache](https://docs.databricks.com/delta/optimizations/delta-cache.html) enabled:

Expand Down
21 changes: 21 additions & 0 deletions internal/acceptance/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,24 @@ func TestAccClusterResource_CreateAndUpdateAwsAttributes(t *testing.T) {
})
}
}

func TestAccClusterResource_CreateAndNoWait(t *testing.T) {
workspaceLevel(t, step{
Template: `data "databricks_spark_version" "latest" {
}
resource "databricks_cluster" "this" {
cluster_name = "nowait-{var.RANDOM}"
spark_version = data.databricks_spark_version.latest.id
instance_pool_id = "{env.TEST_INSTANCE_POOL_ID}"
num_workers = 1
autotermination_minutes = 10
spark_conf = {
"spark.databricks.cluster.profile" = "serverless"
}
custom_tags = {
"ResourceClass" = "Serverless"
}
no_wait = true
}`,
})
}

0 comments on commit e4c36c8

Please sign in to comment.