Skip to content

Commit

Permalink
merge 3.0.5 changes from master (#1118)
Browse files Browse the repository at this point in the history
* Use singltone VocabularyService to avoid memory issues

* sex vocab prefilter not to remove zeros

* Apply spotless

* Add extra logging

* Fix extra logging

* Improve Retry function to avoid count some issues as retry exceptions

* Remove Retry waitDuration in favor of intervalFunction

* Skip error if file doesn't exist for cases when old export data is used

* Don't fail if errors occure during releasing

* [maven-release-plugin] prepare release pipelines-parent-3.0.2

* Remove withoutSharding to improve writing speed

* Bump the project version

* build fix with change from dev

* [maven-release-plugin] prepare release pipelines-parent-3.0.3

* [maven-release-plugin] prepare for next development iteration

* reverted FileVocabularyFactory changes from the dev branch merge

* [maven-release-plugin] prepare release pipelines-parent-3.0.5

* [maven-release-plugin] prepare for next development iteration

---------

Co-authored-by: nvolik <[email protected]>
Co-authored-by: Marcos Lopez Gonzalez <[email protected]>
Co-authored-by: gbif-jenkins <[email protected]>
  • Loading branch information
4 people authored Feb 7, 2025
1 parent 681bd38 commit 3ead8c9
Show file tree
Hide file tree
Showing 56 changed files with 177 additions and 124 deletions.
2 changes: 1 addition & 1 deletion gbif/coordinator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks-integration-tests-hbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/coordinator/tasks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>coordinator</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.github.resilience4j.core.IntervalFunction;
Expand All @@ -13,6 +14,7 @@
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.Getter;
Expand All @@ -33,6 +35,7 @@ public class GbifApi {
"apiCall",
RetryConfig.custom()
.maxAttempts(7)
.retryExceptions(JsonParseException.class, IOException.class, TimeoutException.class)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(6)))
.build());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package org.gbif.pipelines.common.process;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonNode;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import java.io.IOException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Builder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -25,6 +28,7 @@ public class AirflowSparkLauncher {
"airflowApiCall",
RetryConfig.custom()
.maxAttempts(7)
.retryExceptions(JsonParseException.class, IOException.class, TimeoutException.class)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(6)))
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static org.gbif.common.messaging.api.messages.OccurrenceDeletionReason.NOT_SEEN_IN_LAST_CRAWL;

import com.fasterxml.jackson.core.JsonParseException;
import com.google.common.annotations.VisibleForTesting;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
Expand All @@ -19,6 +20,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -73,6 +75,7 @@ public class PipelinesCallback<I extends PipelineBasedMessage, O extends Pipelin
"registryCall",
RetryConfig.custom()
.maxAttempts(7)
.retryExceptions(JsonParseException.class, IOException.class, TimeoutException.class)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(6)))
.build());

Expand All @@ -81,6 +84,7 @@ public class PipelinesCallback<I extends PipelineBasedMessage, O extends Pipelin
"runningExecutionCall",
RetryConfig.custom()
.maxAttempts(7)
.retryExceptions(JsonParseException.class, IOException.class, TimeoutException.class)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(6)))
.retryOnResult(Objects::isNull)
.build());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package org.gbif.pipelines.tasks;

import com.fasterxml.jackson.core.JsonParseException;
import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import java.io.IOException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,6 +32,7 @@ public class Validations {
"validatorCall",
RetryConfig.custom()
.maxAttempts(7)
.retryExceptions(JsonParseException.class, IOException.class, TimeoutException.class)
.intervalFunction(IntervalFunction.ofExponentialBackoff(Duration.ofSeconds(6)))
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.gbif.common.messaging.api.messages.PipelinesInterpretationMessage;
import org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage;
import org.gbif.dwc.terms.DwcTerm;
import org.gbif.pipelines.common.PipelinesException;
import org.gbif.pipelines.common.PipelinesVariables.Metrics;
import org.gbif.pipelines.common.airflow.AppName;
import org.gbif.pipelines.common.process.AirflowSparkLauncher;
Expand Down Expand Up @@ -113,6 +114,7 @@ private void runDistributed(PipelinesInterpretationMessage message, BeamParamete
.metaFileName(metaFileName)
.metricName(Metrics.BASIC_RECORDS_COUNT + Metrics.ATTEMPTED)
.alternativeMetricName(Metrics.UNIQUE_GBIF_IDS_COUNT + Metrics.ATTEMPTED)
.skipIf(true)
.build()
.get();

Expand All @@ -124,10 +126,21 @@ private void runDistributed(PipelinesInterpretationMessage message, BeamParamete
.metaFileName(new DwcaToAvroConfiguration().metaFileName)
.metricName(Metrics.ARCHIVE_TO_OCC_COUNT)
.alternativeMetricName(Metrics.ARCHIVE_TO_ER_COUNT)
.skipIf(true)
.build()
.get();

if (interpretationRecordsNumber == 0 && dwcaRecordsNumber == 0) {
throw new PipelinesException(
"interpretationRecordsNumber and dwcaRecordsNumber are 0, check meta yaml files");
}

long recordsNumber = Math.min(dwcaRecordsNumber, interpretationRecordsNumber);
if (interpretationRecordsNumber == 0) {
recordsNumber = dwcaRecordsNumber;
} else if (dwcaRecordsNumber == 0) {
recordsNumber = interpretationRecordsNumber;
}

log.info("Calculate job's settings based on {} records", recordsNumber);
boolean useMemoryExtraCoef =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.gbif.common.messaging.api.MessagePublisher;
import org.gbif.common.messaging.api.messages.PipelinesIndexedMessage;
import org.gbif.common.messaging.api.messages.PipelinesInterpretedMessage;
import org.gbif.pipelines.common.PipelinesException;
import org.gbif.pipelines.common.PipelinesVariables.Metrics;
import org.gbif.pipelines.common.airflow.AppName;
import org.gbif.pipelines.common.indexing.IndexSettings;
Expand Down Expand Up @@ -130,6 +131,7 @@ public Runnable createRunnable(PipelinesInterpretedMessage message) {
.metaFileName(new InterpreterConfiguration().metaFileName)
.metricName(Metrics.BASIC_RECORDS_COUNT + Metrics.ATTEMPTED)
.alternativeMetricName(Metrics.UNIQUE_GBIF_IDS_COUNT + Metrics.ATTEMPTED)
.skipIf(true)
.build()
.get();

Expand All @@ -141,10 +143,21 @@ public Runnable createRunnable(PipelinesInterpretedMessage message) {
.metaFileName(new DwcaToAvroConfiguration().metaFileName)
.metricName(Metrics.ARCHIVE_TO_OCC_COUNT)
.alternativeMetricName(Metrics.ARCHIVE_TO_ER_COUNT)
.skipIf(true)
.build()
.get();

if (interpretationRecordsNumber == 0 && dwcaRecordsNumber == 0) {
throw new PipelinesException(
"interpretationRecordsNumber and dwcaRecordsNumber are 0, check meta yaml files");
}

long recordsNumber = Math.min(dwcaRecordsNumber, interpretationRecordsNumber);
if (interpretationRecordsNumber == 0) {
recordsNumber = dwcaRecordsNumber;
} else if (dwcaRecordsNumber == 0) {
recordsNumber = interpretationRecordsNumber;
}

IndexSettings indexSettings =
IndexSettings.create(
Expand Down
2 changes: 1 addition & 1 deletion gbif/identifiers/diagnostics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>identifiers</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/identifiers/identifiers-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>identifiers</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/identifiers/keygen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>identifiers</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/identifiers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>gbif</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion gbif/ingestion/ingest-gbif-beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.gbif.pipelines</groupId>
<artifactId>ingestion</artifactId>
<version>3.0.2-SNAPSHOT</version>
<version>3.0.6-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,55 +167,51 @@ public static void run(

uniqueRawRecords
.apply("Interpret event identifiers", identifierTransform.interpret())
.apply(
"Write event identifiers to avro", identifierTransform.write(pathFn).withoutSharding());
.apply("Write event identifiers to avro", identifierTransform.write(pathFn));

uniqueRawRecords
.apply("Check event core transform", eventCoreTransform.check(types))
.apply("Interpret event core", eventCoreTransform.interpret())
.apply("Write event core to avro", eventCoreTransform.write(pathFn).withoutSharding());
.apply("Write event core to avro", eventCoreTransform.write(pathFn));

uniqueRawRecords
.apply("Check event temporal transform", temporalTransform.check(types))
.apply("Interpret event temporal", temporalTransform.interpret())
.apply("Write event temporal to avro", temporalTransform.write(pathFn).withoutSharding());
.apply("Write event temporal to avro", temporalTransform.write(pathFn));

uniqueRawRecords
.apply("Check event taxonomy transform", taxonomyTransform.check(types))
.apply("Interpret event taxonomy", taxonomyTransform.interpret())
.apply("Write event taxon to avro", taxonomyTransform.write(pathFn).withoutSharding());
.apply("Write event taxon to avro", taxonomyTransform.write(pathFn));

uniqueRawRecords
.apply("Check event multimedia transform", multimediaTransform.check(types))
.apply("Interpret event multimedia", multimediaTransform.interpret())
.apply(
"Write event multimedia to avro", multimediaTransform.write(pathFn).withoutSharding());
.apply("Write event multimedia to avro", multimediaTransform.write(pathFn));

uniqueRawRecords
.apply("Check event audubon transform", audubonTransform.check(types))
.apply("Interpret event audubon", audubonTransform.interpret())
.apply("Write event audubon to avro", audubonTransform.write(pathFn).withoutSharding());
.apply("Write event audubon to avro", audubonTransform.write(pathFn));

uniqueRawRecords
.apply("Check event image transform", imageTransform.check(types))
.apply("Interpret event image", imageTransform.interpret())
.apply("Write event image to avro", imageTransform.write(pathFn).withoutSharding());
.apply("Write event image to avro", imageTransform.write(pathFn));

uniqueRawRecords
.apply("Check location transform", locationTransform.check(types))
.apply("Interpret event location", locationTransform.interpret(metadataView))
.apply("Write event location to avro", locationTransform.write(pathFn).withoutSharding());
.apply("Write event location to avro", locationTransform.write(pathFn));

uniqueRawRecords
.apply("Check event measurementOrFact", measurementOrFactTransform.check(types))
.apply("Interpret event measurementOrFact", measurementOrFactTransform.interpret())
.apply(
"Write event measurementOrFact to avro",
measurementOrFactTransform.write(pathFn).withoutSharding());
.apply("Write event measurementOrFact to avro", measurementOrFactTransform.write(pathFn));

uniqueRawRecords
.apply("Check event verbatim transform", verbatimTransform.check(types))
.apply("Write event verbatim to avro", verbatimTransform.write(pathFn).withoutSharding());
.apply("Write event verbatim to avro", verbatimTransform.write(pathFn));

log.info("Running the pipeline");
PipelineResult result = p.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,17 @@ public static void run(

idsTuple
.get(uniqueIdTransform.getTag())
.apply("Write GBIF ids to avro", idTransform.write(pathFn).withoutSharding());
.apply("Write GBIF ids to avro", idTransform.write(pathFn));

idsTuple
.get(uniqueIdTransform.getInvalidTag())
.apply(
"Write invalid GBIF IDs to avro", idTransform.writeInvalid(pathFn).withoutSharding());
.apply("Write invalid GBIF IDs to avro", idTransform.writeInvalid(pathFn));

idCollection
.get(tupleTransform.getAbsentTag())
.apply(
"Write absent GBIF ids to avro",
idTransform.write(pathFn.apply(idTransform.getAbsentName())).withoutSharding());
idTransform.write(pathFn.apply(idTransform.getAbsentName())));

log.info("Running the pipeline");
PipelineResult result = p.run();
Expand Down
Loading

0 comments on commit 3ead8c9

Please sign in to comment.