Skip to content

Commit

Permalink
refactor: dynamic properties (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle authored Dec 10, 2024
1 parent 6123d22 commit 08ffd6c
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.hightouch;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.Task;

import java.io.IOException;
Expand All @@ -14,6 +15,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
Expand All @@ -35,13 +37,12 @@ abstract class AbstractHightouchConnection extends Task {
title = "API Bearer token"
)
@NotNull
@PluginProperty(dynamic = true)
String token;
Property<String> token;

private final static String BASE_URL = "https://api.hightouch.com";
private final static ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();

protected <REQ, RES> RES request(String method, String path, String body, Class<RES> responseType) throws IOException, InterruptedException {
protected <REQ, RES> RES request(String method, String path, String body, Class<RES> responseType, RunContext runContext) throws IOException, InterruptedException {

HttpClient httpClient = HttpClient.newBuilder().build();

Expand All @@ -51,7 +52,7 @@ protected <REQ, RES> RES request(String method, String path, String body, Class<
HttpRequest request = HttpRequest.newBuilder(fullPath)
.method(method, HttpRequest.BodyPublishers.ofString(body, StandardCharsets.UTF_8))
.header("Content-Type", "application/json")
.header("Authorization", "Bearer " + token)
.header("Authorization", "Bearer " + runContext.render(token).as(String.class).orElseThrow())
.build();

HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
Expand All @@ -64,7 +65,7 @@ protected <REQ, RES> RES request(String method, String path, String body, Class<
"Request to '" + fullPath.getPath() + "' failed with status '" + response.statusCode() + "' and body '" + response.body() + "'"
);
}
} catch (ConnectException | MalformedURLException e) {
} catch (ConnectException | MalformedURLException | IllegalVariableEvaluationException e) {
throw new RuntimeException(e);
}
}
Expand Down
33 changes: 16 additions & 17 deletions src/main/java/io/kestra/plugin/hightouch/Sync.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.Await;
Expand Down Expand Up @@ -47,7 +47,7 @@
- id: sync
type: io.kestra.plugin.hightouch.Sync
token: YOUR_API_TOKEN
syncId: 1127166
syncId: 1127166
"""
)
}
Expand All @@ -65,30 +65,26 @@ public class Sync extends AbstractHightouchConnection implements RunnableTask<Sy
title = "The sync id to trigger run"
)
@NotNull
@PluginProperty(dynamic = true)
private Long syncId;
private Property<Long> syncId;

@Schema(
title = "Whether to do a full resynchronization"
)
@PluginProperty(dynamic = true)
@Builder.Default
private Boolean fullResynchronization = false;
private Property<Boolean> fullResynchronization = Property.of(false);

@Schema(
title = "Whether to wait for the end of the run.",
description = "Allowing to capture run status and logs"
)
@PluginProperty
@Builder.Default
private Boolean wait = true;
private Property<Boolean> wait = Property.of(true);

@Schema(
title = "The max total wait duration"
)
@PluginProperty
@Builder.Default
private Duration maxDuration = Duration.ofMinutes(5);
private Property<Duration> maxDuration = Property.of(Duration.ofMinutes(5));

@Builder.Default
@Getter(AccessLevel.NONE)
Expand All @@ -98,14 +94,15 @@ public class Sync extends AbstractHightouchConnection implements RunnableTask<Sy
public Sync.Output run(RunContext runContext) throws Exception {
Logger logger = runContext.logger();

final String syncId = runContext.render(this.syncId.toString());
final String syncId = runContext.render(this.syncId).as(Long.class).orElseThrow().toString();

// Get details of sync to display slug
SyncDetailsResponse syncDetails = this.request(
"GET",
String.format("/api/v1/syncs/%s", syncId),
"{}",
SyncDetailsResponse.class
SyncDetailsResponse.class,
runContext
);

// Trigger sync run
Expand All @@ -114,15 +111,16 @@ public Sync.Output run(RunContext runContext) throws Exception {
String.format("/api/v1/syncs/%s/trigger", syncId),
String.format(
"{\"fullResync\": %s}",
runContext.render(this.fullResynchronization.toString())
runContext.render(runContext.render(this.fullResynchronization).as(Boolean.class).orElseThrow().toString())
),
Run.class
Run.class,
runContext
);

Long runId = jobInfoRead.getId();
logger.info("[syncId={}] {}: Job triggered with runId {}", syncDetails.getId(), syncDetails.getSlug(), runId);

if (!this.wait) {
if (!runContext.render(wait).as(Boolean.class).orElseThrow()) {
return Output.builder()
.runId(runId)
.build();
Expand All @@ -135,7 +133,8 @@ public Sync.Output run(RunContext runContext) throws Exception {
"GET",
String.format("/api/v1/syncs/%s/runs?runId=%s", syncId, runId),
"{}",
RunDetailsResponse.class
RunDetailsResponse.class,
runContext
);

// Check we correctly get one run
Expand All @@ -158,7 +157,7 @@ public Sync.Output run(RunContext runContext) throws Exception {
return null;
}),
STATUS_REFRESH_RATE,
this.maxDuration
runContext.render(this.maxDuration).as(Duration.class).orElseThrow()
);

// handle failure
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.hightouch;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.micronaut.context.annotation.Value;
Expand Down Expand Up @@ -31,8 +32,8 @@ void run() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Sync task = Sync.builder()
.token(this.token)
.syncId(this.invalidSyncId)
.token(Property.of(this.token))
.syncId(Property.of(this.invalidSyncId))
.build();

Throwable exception = assertThrows(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.hightouch;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.micronaut.context.annotation.Value;
Expand Down Expand Up @@ -31,8 +32,8 @@ void run() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Sync task = Sync.builder()
.token(this.token)
.syncId(this.syncId)
.token(Property.of(this.token))
.syncId(Property.of(this.syncId))
.build();

Throwable exception = assertThrows(
Expand Down
5 changes: 3 additions & 2 deletions src/test/java/io/kestra/plugin/hightouch/SyncTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.hightouch;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.micronaut.context.annotation.Value;
Expand Down Expand Up @@ -30,8 +31,8 @@ void run() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());

Sync task = Sync.builder()
.token(this.token)
.syncId(this.syncId)
.token(Property.of(this.token))
.syncId(Property.of(this.syncId))
.build();

Sync.Output runOutput = task.run(runContext);
Expand Down

0 comments on commit 08ffd6c

Please sign in to comment.