Skip to content

Commit

Permalink
Improve AppName.java to fix Airflow issue with inconsistent data runs
Browse files Browse the repository at this point in the history
  • Loading branch information
muttcg committed Jul 11, 2024
1 parent 36d49be commit fab5f99
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,40 @@
package org.gbif.pipelines.common.airflow;

import java.util.Arrays;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.gbif.api.model.pipelines.StepType;
import org.gbif.pipelines.common.PipelinesException;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class AppName {

private static final Set<String> IGNORE_SET = Set.of("to");

/**
* A lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.'.
* Must start and end with an alphanumeric character and its max lentgh is 64 characters.
*/
public static String get(StepType type, UUID datasetKey, int attempt) {
return String.join("_", type.name(), datasetKey.toString(), String.valueOf(attempt));
String joined =
String.join("-", shortenType(type), datasetKey.toString(), String.valueOf(attempt));
if (joined.length() >= 64) {
throw new PipelinesException("Spark name can't be normalized, cause run_id length > 64 char");
}
return joined;
}

private static String shortenType(StepType type) {
return Arrays.stream(type.name().split("_"))
.map(
s -> {
int l = s.length() > 4 ? 5 : s.length();
return s.toLowerCase().substring(0, l);
})
.filter(s -> !IGNORE_SET.contains(s))
.collect(Collectors.joining("-"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,19 @@ private AirflowBody getAirflowBody(String dagId) {

public Optional<Status> submitAwait() {
try {

String normalizedAppName = normalize(sparkAppName);
AirflowBody airflowBody = getAirflowBody(normalizedAppName);
AirflowBody airflowBody = getAirflowBody(sparkAppName);

log.info("Running Airflow DAG ID {}: {}", airflowBody.getDagRunId(), airflowBody);
Retry.decorateFunction(AIRFLOW_RETRY, airflowClient::createRun).apply(airflowBody);

Optional<Status> status = getStatusByName(normalizedAppName);
Optional<Status> status = getStatusByName(sparkAppName);

log.info("Waiting Airflow DAG ID {} to finish", airflowBody.getDagRunId());
while (status.isPresent()
&& Status.COMPLETED != status.get()
&& Status.FAILED != status.get()) {
TimeUnit.SECONDS.sleep(airflowConfiguration.apiCheckDelaySec);
status = getStatusByName(normalizedAppName);
status = getStatusByName(sparkAppName);
}
log.info(
"Airflow DAG ID {} is finished with status - {}",
Expand Down Expand Up @@ -159,13 +157,4 @@ public Optional<Status> getStatusByName(String dagId) {
}
return Optional.empty();
}

/**
* A lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.'.
* Must start and end with an alphanumeric character and its max lentgh is 64 characters.
*/
private static String normalize(String sparkAppName) {
String v = sparkAppName.toLowerCase().replace("_to_", "-").replace("_", "-");
return v.length() >= 64 ? v.substring(0, 63) : v;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,40 @@ public void getTest() {
// State
StepType dwca = StepType.DWCA_TO_VERBATIM;
UUID uuid = UUID.fromString("ad2ef207-969e-418a-ab4f-102e8d9bf7ac");
int attempt = 1;
int attempt = 1_000_000;

// When
String string = AppName.get(dwca, uuid, attempt);

// Should
Assert.assertEquals("DWCA_TO_VERBATIM_ad2ef207-969e-418a-ab4f-102e8d9bf7ac_1", string);
Assert.assertEquals("dwca-verba-ad2ef207-969e-418a-ab4f-102e8d9bf7ac-1000000", string);
}

@Test
public void getOccurrenceTest() {
// State
StepType dwca = StepType.VERBATIM_TO_INTERPRETED;
UUID uuid = UUID.fromString("ad2ef207-969e-418a-ab4f-102e8d9bf7ac");
int attempt = 1_000_000;

// When
String string = AppName.get(dwca, uuid, attempt);

// Should
Assert.assertEquals("verba-inter-ad2ef207-969e-418a-ab4f-102e8d9bf7ac-1000000", string);
}

@Test
public void getEventTest() {
// State
StepType dwca = StepType.EVENTS_VERBATIM_TO_INTERPRETED;
UUID uuid = UUID.fromString("ad2ef207-969e-418a-ab4f-102e8d9bf7ac");
int attempt = 1_000_000;

// When
String string = AppName.get(dwca, uuid, attempt);

// Should
Assert.assertEquals("event-verba-inter-ad2ef207-969e-418a-ab4f-102e8d9bf7ac-1000000", string);
}
}

0 comments on commit fab5f99

Please sign in to comment.