diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java index 0ea533ad..d243ccd4 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/client/TablesClient.java @@ -2,6 +2,7 @@ import com.linkedin.openhouse.datalayout.persistence.StrategiesDaoTableProps; import com.linkedin.openhouse.datalayout.strategy.DataLayoutStrategy; +import com.linkedin.openhouse.jobs.util.AppConstants; import com.linkedin.openhouse.jobs.util.DatabaseTableFilter; import com.linkedin.openhouse.jobs.util.DirectoryMetadata; import com.linkedin.openhouse.jobs.util.ReplicationConfig; @@ -113,6 +114,11 @@ private Optional> getTableReplication(GetTableResponseBo .cluster(rc.getDestination()) .tableOwner(response.getTableCreator()) .schedule(rc.getCronSchedule()) + .enableSetup( + Boolean.parseBoolean( + response + .getTableProperties() + .getOrDefault(AppConstants.REPLICATION_SETUP_KEY, null))) .build())); // since replicationConfigList is initialized, it cannot be null. return Optional.of(replicationConfigList); diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java index 8185d660..e4f803ab 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/AppConstants.java @@ -47,6 +47,7 @@ public final class AppConstants { public static final String JOB_ID = "job_id"; public static final String QUEUED_TIME = "queued_time"; public static final String DATABASE_NAME = "database_name"; + public static final String REPLICATION_SETUP_KEY = "replication.enableSetup"; private AppConstants() {} } diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java index 15c6094a..30cd6eb2 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/ReplicationConfig.java @@ -14,4 +14,5 @@ public class ReplicationConfig { private final String schedule; private final String tableOwner; private final String cluster; + private final boolean enableSetup; } diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java index 223faf1c..24967984 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/InternalRepositoryUtils.java @@ -26,6 +26,7 @@ public final class InternalRepositoryUtils { protected static final String POLICIES_KEY = "policies"; + protected static final String REPLICATION_SETUP_KEY = "replication.enableSetup"; private static final Set EXCLUDE_PROPERTIES_LIST = new HashSet<>(Collections.singletonList(POLICIES_KEY)); diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java index ef04d9e2..06e65e1b 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/repository/impl/OpenHouseInternalRepositoryImpl.java @@ -18,6 +18,7 @@ import com.linkedin.openhouse.common.schema.IcebergSchemaHelper; import com.linkedin.openhouse.internal.catalog.SnapshotsUtil; import com.linkedin.openhouse.internal.catalog.fileio.FileIOManager; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; import com.linkedin.openhouse.tables.common.TableType; import com.linkedin.openhouse.tables.dto.mapper.TablesMapper; import com.linkedin.openhouse.tables.dto.mapper.iceberg.PartitionSpecMapper; @@ -31,6 +32,7 @@ import io.micrometer.core.instrument.MeterRegistry; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -136,10 +138,16 @@ public TableDto save(TableDto tableDto) { doUpdateSchemaIfNeeded(transaction, writeSchema, table.schema(), tableDto); UpdateProperties updateProperties = transaction.updateProperties(); + // check if replicationPolicy has any change. update property replication.setupNeeded + // accordingly + if (checkIfReplicationPolicyUpdated(table.properties(), tableDto.getPolicies())) { + updateProperties.set(REPLICATION_SETUP_KEY, Boolean.TRUE.toString()); + } boolean propsUpdated = doUpdateUserPropsIfNeeded(updateProperties, tableDto, table); boolean snapshotsUpdated = doUpdateSnapshotsIfNeeded(updateProperties, tableDto); boolean policiesUpdated = doUpdatePoliciesIfNeeded(updateProperties, tableDto, table.properties()); + // TODO remove tableTypeAdded after all existing tables have been back-filled to have a // tableType boolean tableTypeAdded = checkIfTableTypeAdded(updateProperties, table.properties()); @@ -160,6 +168,25 @@ public TableDto save(TableDto tableDto) { table, fileIOManager, partitionSpecMapper, policiesMapper, tableTypeMapper); } + private boolean checkIfReplicationPolicyUpdated( + Map existingTableProps, Policies policyFromTableDTO) { + String existingPolicies = existingTableProps.getOrDefault(POLICIES_KEY, ""); + // If both are empty or null, no update + if (existingPolicies.isEmpty() && policyFromTableDTO == null) { + return false; + } + // If existing policies exist and policyFromTableDTO is not null, compare replication + if (!existingPolicies.isEmpty() && policyFromTableDTO != null) { + Policies existingPoliciesObj = + new GsonBuilder().create().fromJson(existingPolicies, Policies.class); + return !Objects.equals( + existingPoliciesObj.getReplication(), policyFromTableDTO.getReplication()); + } + // If existing policies are empty but policyFromTableDTO is not null, update needed if + // replication is set + return existingPolicies.isEmpty() && policyFromTableDTO.getReplication() != null; + } + private boolean skipEligibilityCheck( Map existingTableProps, Map newTableprops) { TableType existingTableType = @@ -298,6 +325,9 @@ private Map computePropsForTableCreation(TableDto tableDto) { // Populate policies String policiesString = policiesMapper.toPoliciesJsonString(tableDto); propertiesMap.put(InternalRepositoryUtils.POLICIES_KEY, policiesString); + if (tableDto.getPolicies() != null && tableDto.getPolicies().getReplication() != null) { + propertiesMap.put(REPLICATION_SETUP_KEY, Boolean.TRUE.toString()); + } if (!CollectionUtils.isEmpty(tableDto.getJsonSnapshots())) { meterRegistry.counter(MetricsConstant.REPO_TABLE_CREATED_WITH_DATA_CTR).increment(); diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java index 71ffcb8e..862a2737 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/RepositoryTest.java @@ -13,6 +13,7 @@ import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey; import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository; import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ReplicationConfig; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; import com.linkedin.openhouse.tables.common.TableType; import com.linkedin.openhouse.tables.model.TableDto; @@ -24,6 +25,7 @@ import com.linkedin.openhouse.tables.repository.impl.InternalRepositoryUtils; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -442,6 +444,100 @@ public void testCreateTableWithTableTypeProperty() { } } + @Test + public void testCreateTableWithReplicationProperty() { + String replicationKey = "replication.enableSetup"; + TableDto tableDTO = + TABLE_DTO + .toBuilder() + .policies(TABLE_POLICIES.toBuilder().replication(null).build()) + .tableVersion(INITIAL_TABLE_VERSION) + .build(); + + TableDto createdDTO = openHouseInternalRepository.save(tableDTO); + Assertions.assertFalse(createdDTO.getTableProperties().containsKey(replicationKey)); + + TableDtoPrimaryKey primaryKey = + TableDtoPrimaryKey.builder() + .tableId(TABLE_DTO.getTableId()) + .databaseId(TABLE_DTO.getDatabaseId()) + .build(); + openHouseInternalRepository.deleteById(primaryKey); + + // create table with some replication config and assert that tblProperty has key + TableDto tableDTOWithReplicationPolicy = + TABLE_DTO.toBuilder().policies(TABLE_POLICIES).tableVersion(INITIAL_TABLE_VERSION).build(); + + TableDto createdDTOWithReplicationPolicy = + openHouseInternalRepository.save(tableDTOWithReplicationPolicy); + Assertions.assertTrue( + createdDTOWithReplicationPolicy.getTableProperties().containsKey(replicationKey)); + Assertions.assertTrue( + Boolean.parseBoolean( + createdDTOWithReplicationPolicy.getTableProperties().get(replicationKey))); + + Map modifiedProperties = + new HashMap<>(createdDTOWithReplicationPolicy.getTableProperties()); + modifiedProperties.put(replicationKey, "false"); + + // update tblProperty, setting to false + TableDto tableDTOWithTblProperties = + createdDTOWithReplicationPolicy + .toBuilder() + .tableType(TableType.PRIMARY_TABLE) + .tableVersion(createdDTOWithReplicationPolicy.getTableLocation()) + .tableProperties(modifiedProperties) + .build(); + + TableDto createdDTOWithTblProps = openHouseInternalRepository.save(tableDTOWithTblProperties); + Assertions.assertTrue(createdDTOWithTblProps.getTableProperties().containsKey(replicationKey)); + Assertions.assertFalse( + Boolean.parseBoolean(createdDTOWithTblProps.getTableProperties().get(replicationKey))); + + // Update replication policy to different values and assert that tblProperty values is still set + // to False + TableDto tableDTOWithUpdatedRetentionPolicy = + createdDTOWithTblProps + .toBuilder() + .tableVersion(createdDTOWithTblProps.getTableLocation()) + .policies(TABLE_POLICIES.toBuilder().retention(RETENTION_POLICY).build()) + .build(); + + TableDto createdDTOWithUpdatedRetentionPolicy = + openHouseInternalRepository.save(tableDTOWithUpdatedRetentionPolicy); + Assertions.assertTrue( + createdDTOWithUpdatedRetentionPolicy.getTableProperties().containsKey(replicationKey)); + Assertions.assertFalse( + Boolean.parseBoolean( + createdDTOWithUpdatedRetentionPolicy.getTableProperties().get(replicationKey))); + + // Update replication policy to different values and assert that tblProperty values is set back + // to true + ArrayList configs = new ArrayList<>(); + configs.add(ReplicationConfig.builder().destination("CLUSTER1").interval("15H").build()); + TableDto tableDTOWithUpdatedReplicationPolicy = + createdDTOWithUpdatedRetentionPolicy + .toBuilder() + .tableVersion(createdDTOWithUpdatedRetentionPolicy.getTableLocation()) + .policies( + TABLE_POLICIES + .toBuilder() + .replication(REPLICATION_POLICY.toBuilder().config(configs).build()) + .build()) + .build(); + + TableDto createdDTOWithUpdatedReplicationPolicy = + openHouseInternalRepository.save(tableDTOWithUpdatedReplicationPolicy); + Assertions.assertTrue( + createdDTOWithUpdatedReplicationPolicy.getTableProperties().containsKey(replicationKey)); + Assertions.assertTrue( + Boolean.parseBoolean( + createdDTOWithUpdatedReplicationPolicy.getTableProperties().get(replicationKey))); + + openHouseInternalRepository.deleteById(primaryKey); + Assertions.assertFalse(openHouseInternalRepository.existsById(primaryKey)); + } + @Test void testSchemaEvolutionBasic() { Schema oldSchema =