Skip to content

Commit

Permalink
feat(apps): add app config in run record
Browse files Browse the repository at this point in the history
- changed returned schema type of "status" to "AppRunRecord"!
- added a "config" field in the app record
- added a "pending" stage so that "external" app status can be mapped ti internal app status
  • Loading branch information
sushi30 committed Jan 27, 2025
1 parent fa6d727 commit 35f9ad9
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public ResultList<App> list(
mediaType = "application/json",
schema = @Schema(implementation = AppRunList.class)))
})
public Response listAppRuns(
public ResultList<AppRunRecord> listAppRuns(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Name of the App", schema = @Schema(type = "string"))
Expand Down Expand Up @@ -281,9 +281,7 @@ public Response listAppRuns(
Long endTs) {
App installation = repository.getByName(uriInfo, name, repository.getFields("id,pipelines"));
if (installation.getAppType().equals(AppType.Internal)) {
return Response.status(Response.Status.OK)
.entity(repository.listAppRuns(installation, limitParam, offset))
.build();
return repository.listAppRuns(installation, limitParam, offset);
}
if (!installation.getPipelines().isEmpty()) {
EntityReference pipelineRef = installation.getPipelines().get(0);
Expand All @@ -292,13 +290,27 @@ public Response listAppRuns(
IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
return Response.ok(
ingestionPipelineRepository.listPipelineStatus(
ingestionPipeline.getFullyQualifiedName(), startTs, endTs),
MediaType.APPLICATION_JSON_TYPE)
.build();
return ingestionPipelineRepository
.listPipelineStatus(ingestionPipeline.getFullyQualifiedName(), startTs, endTs)
.map(pipelineStatus -> convertPipelineStatus(installation, pipelineStatus));
}
throw new IllegalArgumentException("App does not have an associated pipeline.");
throw new IllegalArgumentException("App does not have a scheduled deployment");
}

private static AppRunRecord convertPipelineStatus(App app, PipelineStatus pipelineStatus) {
return new AppRunRecord()
.withAppId(app.getId())
.withAppName(app.getName())
.withExecutionTime(pipelineStatus.getStartDate())
.withEndTime(pipelineStatus.getEndDate())
.withStatus(
switch (pipelineStatus.getPipelineState()) {
case QUEUED -> AppRunRecord.Status.PENDING;
case SUCCESS -> AppRunRecord.Status.SUCCESS;
case FAILED, PARTIAL_SUCCESS -> AppRunRecord.Status.FAILED;
case RUNNING -> AppRunRecord.Status.RUNNING;
})
.withConfig(pipelineStatus.getConfig());
}

@GET
Expand Down Expand Up @@ -617,7 +629,7 @@ public Response create(
limits.enforceLimits(
securityContext,
getResourceContext(),
new OperationContext(Entity.APPLICATION, MetadataOperation.CREATE));
new OperationContext(APPLICATION, MetadataOperation.CREATE));
if (SCHEDULED_TYPES.contains(app.getScheduleType())) {
ApplicationHandler.getInstance()
.installApplication(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.validation.constraints.NotNull;
import org.openmetadata.schema.system.EntityError;
import org.openmetadata.schema.type.Paging;
Expand Down Expand Up @@ -95,6 +97,11 @@ public ResultList(List<T> data, Integer offset, int total) {
paging = new Paging().withBefore(null).withAfter(null).withTotal(total).withOffset(offset);
}

/* Conveniently map the data to another type without the need to create a new ResultList */
public <S> ResultList<S> map(Function<T, S> mapper) {
return new ResultList<>(data.stream().map(mapper).collect(Collectors.toList()), paging);
}

public ResultList(List<T> data, Integer offset, Integer limit, Integer total) {
this.data = data;
paging =
Expand All @@ -106,6 +113,17 @@ public ResultList(List<T> data, Integer offset, Integer limit, Integer total) {
.withLimit(limit);
}

public ResultList(List<T> data, Paging other) {
this.data = data;
paging =
new Paging()
.withBefore(null)
.withAfter(null)
.withTotal(other.getTotal())
.withOffset(other.getOffset())
.withLimit(other.getLimit());
}

public ResultList(
List<T> data, List<EntityError> errors, String beforeCursor, String afterCursor, int total) {
this.data = data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
"active",
"activeError",
"stopped",
"success"
"success",
"pending"
]
},
"runType": {
Expand Down Expand Up @@ -63,6 +64,10 @@
},
"scheduleInfo": {
"$ref": "./app.json#/definitions/appSchedule"
},
"config": {
"description": "The configuration used for this application run. It's type will be based on the application type.",
"$ref": "../../type/basic.json#/definitions/map"
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
"status": {
"description": "Ingestion Pipeline summary status. Informed at the end of the execution.",
"$ref": "status.json#/definitions/ingestionStatus"
},
"config": {
"description": "Pipeline configuration for this particular execution.",
"$ref": "../../../type/basic.json#/definitions/map"
}
},
"additionalProperties": false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/


/**
/**
* App Run Record.
*/
export interface AppRunRecord {
Expand All @@ -24,6 +22,11 @@ export interface AppRunRecord {
* Name of the application.
*/
appName?: string;
/**
* The configuration used for this application run. It's type will be based on the
* application type.
*/
config?: { [key: string]: any };
/**
* End time of the job status.
*/
Expand Down Expand Up @@ -91,6 +94,7 @@ export enum Status {
ActiveError = "activeError",
Completed = "completed",
Failed = "failed",
Pending = "pending",
Running = "running",
Started = "started",
Stopped = "stopped",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 Collate.
* Copyright 2025 Collate.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -10,9 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/


/**
/**
* Ingestion Pipeline Config is used to set up a DAG and deploy. This entity is used to
* setup metadata/quality pipelines on Apache Airflow.
*/
Expand Down Expand Up @@ -386,7 +384,7 @@ export interface OpenMetadataConnection {
/**
* SSL Configuration for OpenMetadata Server
*/
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
/**
* If set to true, when creating a service during the ingestion we will store its Service
* Connection. Otherwise, the ingestion will create a bare service without connection
Expand Down Expand Up @@ -490,12 +488,15 @@ export interface OpenMetadataJWTClientConfig {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*
* OpenMetadata Client configured to validate SSL certificates.
*/
export interface SchemaRegistrySSLClass {
export interface ConsumerConfigSSLClass {
/**
* The CA certificate used for SSL validation.
*/
Expand Down Expand Up @@ -536,6 +537,10 @@ export enum VerifySSL {
* This defines runtime status of Pipeline.
*/
export interface PipelineStatus {
/**
* Pipeline configuration for this particular execution.
*/
config?: { [key: string]: any };
/**
* endDate of the pipeline run for this particular execution.
*/
Expand Down Expand Up @@ -2802,10 +2807,11 @@ export interface ConfigClass {
*
* Http/Https connection scheme
*/
scheme?: string;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
supportsDBTExtraction?: boolean;
scheme?: string;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
supportsDBTExtraction?: boolean;
supportsIncrementalMetadataExtraction?: boolean;
/**
* Supports Lineage Extraction.
*/
Expand Down Expand Up @@ -3172,6 +3178,11 @@ export interface ConfigClass {
* Confluent Redpanda Consumer Config
*/
consumerConfig?: { [key: string]: any };
/**
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*/
consumerConfigSSL?: ConsumerConfigSSLClass;
/**
* sasl.mechanism Consumer Config property
*/
Expand All @@ -3195,7 +3206,7 @@ export interface ConfigClass {
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*/
schemaRegistrySSL?: SchemaRegistrySSLClass;
schemaRegistrySSL?: ConsumerConfigSSLClass;
/**
* Schema Registry Topic Suffix Name. The suffix to be appended to the topic name to get
* topic schema from registry.
Expand Down Expand Up @@ -3992,7 +4003,7 @@ export interface SSLCertificatesByPath {
* Qlik Authentication Certificate File Path
*/
export interface QlikCertificatesBy {
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
/**
* Client Certificate
*/
Expand Down Expand Up @@ -4395,6 +4406,9 @@ export enum ConnectionScheme {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*/
Expand Down Expand Up @@ -4611,7 +4625,7 @@ export interface HiveMetastoreConnectionDetails {
/**
* SSL Configuration details.
*/
sslConfig?: SchemaRegistrySSLClass;
sslConfig?: ConsumerConfigSSLClass;
sslMode?: SSLMode;
supportsDatabase?: boolean;
supportsDataDiff?: boolean;
Expand Down Expand Up @@ -4837,6 +4851,9 @@ export enum KafkaSecurityProtocol {
*
* SSL Configuration details.
*
* Consumer Config SSL Config. Configuration for enabling SSL for the Consumer Config
* connection.
*
* Schema Registry SSL Config. Configuration for enabling SSL for the Schema Registry
* connection.
*
Expand Down

0 comments on commit 35f9ad9

Please sign in to comment.