Skip to content
This repository has been archived by the owner on Nov 12, 2024. It is now read-only.

Commit

Permalink
feat: support for tier and tag override configurations (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
laxmanchekka authored Aug 25, 2021
1 parent d7308a0 commit 46edd83
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public class PinotTableSpec {
private String brokerTenant;
private String serverTenant;

// Tag override configs
@Optional private Config tagOverrideConfigs;

// Tier configs
@Optional private List<Config> tierConfigs;

// Task configs
@Optional private Config taskConfigs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_OFFLINE_CONFIGS_KEY;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_REALTIME_CONFIGS_KEY;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_REST_URI_TABLES;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_RT_COMPLETED_TAG_KEY;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_RT_CONSUMING_TAG_KEY;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_SCHEMA_MAP_KEYS_SUFFIX;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_SCHEMA_MAP_VALUES_SUFFIX;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_STREAM_CONFIGS_KEY;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_TIER_NAME;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_TIER_SEGMENT_AGE;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_TIER_SEGMENT_SELECTOR_TYPE;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_TIER_SERVER_TAG;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_TIER_STORAGE_TYPE;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_TRANSFORM_COLUMN_FUNCTION;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.PINOT_TRANSFORM_COLUMN_NAME;
import static org.hypertrace.core.viewcreator.pinot.PinotViewCreatorConfig.SIMPLE_AVRO_MESSAGE_DECODER;
Expand Down Expand Up @@ -47,6 +54,8 @@
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
Expand Down Expand Up @@ -320,6 +329,9 @@ public static TableConfig buildPinotTableConfig(
// Tenant configs
.setBrokerTenant(pinotTableSpec.getBrokerTenant())
.setServerTenant(pinotTableSpec.getServerTenant())
.setTagOverrideConfig(toTagOverrideConfig(pinotTableSpec.getTagOverrideConfigs()))
// Tier configs
.setTierConfigList(getTableTierConfigs(pinotTableSpec))
// Task configurations
.setTaskConfig(toTableTaskConfig(pinotTableSpec.getTaskConfigs()))
// ingestion configurations
Expand Down Expand Up @@ -354,6 +366,37 @@ private static IngestionConfig getTableIngestionConfig(@Nullable PinotTableSpec
return new IngestionConfig(null, null, null, tableTransformConfigs);
}

private static TagOverrideConfig toTagOverrideConfig(Config tenantTagOverrideConfig) {
if (tenantTagOverrideConfig == null) {
return null;
}

String realtimeConsuming =
getOptionalString(tenantTagOverrideConfig, PINOT_RT_CONSUMING_TAG_KEY, null);
String realtimeCompleted =
getOptionalString(tenantTagOverrideConfig, PINOT_RT_COMPLETED_TAG_KEY, null);

return new TagOverrideConfig(realtimeConsuming, realtimeCompleted);
}

private static List<TierConfig> getTableTierConfigs(@Nullable PinotTableSpec tableSpec) {
List<Config> tierConfigs = tableSpec.getTierConfigs();
List<TierConfig> tableTierConfigs = null;
if (tierConfigs != null) {
tableTierConfigs = new ArrayList<>();
for (Config tierConfig : tierConfigs) {
tableTierConfigs.add(
new TierConfig(
tierConfig.getString(PINOT_TIER_NAME),
tierConfig.getString(PINOT_TIER_SEGMENT_SELECTOR_TYPE),
getOptionalString(tierConfig, PINOT_TIER_SEGMENT_AGE, null),
tierConfig.getString(PINOT_TIER_STORAGE_TYPE),
getOptionalString(tierConfig, PINOT_TIER_SERVER_TAG, null)));
}
}
return tableTierConfigs;
}

private static TableTaskConfig toTableTaskConfig(@Nullable Config allTasksConfigs) {
if (allTasksConfigs == null) {
return null;
Expand All @@ -380,6 +423,7 @@ public static boolean sendPinotTableCreationRequest(
tableConfig.toJsonString(),
tableConfig.getTableName());
}

/** Utility for sending Pinot Table Creation/Update request. */
public static boolean sendPinotTableCreationRequest(
String controllerHost,
Expand Down Expand Up @@ -442,6 +486,7 @@ public static int sendRequest(String requestMethod, String urlString, String pay
}
return HTTP_OK;
}

/** Utility for reading from an InputStream and converting it to String */
private static String readInputStream(InputStream inputStream) {
final StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -469,6 +514,7 @@ private static String getControllerAddressForTableCreate(
String controllerHost, String controllerPort) {
return String.format("http://%s:%s/%s", controllerHost, controllerPort, PINOT_REST_URI_TABLES);
}

/**
* Utility for preparing the api-endpoint for Pinot Table Update
*
Expand All @@ -483,11 +529,17 @@ private static String getControllerAddressForTableUpdate(
"http://%s:%s/%s/%s", controllerHost, controllerPort, PINOT_REST_URI_TABLES, tableName);
}

private static String getOptionalString(Config config, String key, String defaultValue) {
if (config.hasPath(key)) {
return config.getString(key);
}
return defaultValue;
}

private static Config getOptionalConfig(Config config, String key) {
if (config.hasPath(key)) {
return config.getConfig(key);
} else {
return ConfigFactory.empty();
}
return ConfigFactory.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,21 @@ public class PinotViewCreatorConfig {
public static final String PINOT_TRANSFORM_COLUMN_NAME = "columnName";
public static final String PINOT_TRANSFORM_COLUMN_FUNCTION = "transformFunction";

/////////////
// Pinot tier configurations
/////////////
public static final String PINOT_TIER_NAME = "name";
public static final String PINOT_TIER_SEGMENT_SELECTOR_TYPE = "segmentSelectorType";
public static final String PINOT_TIER_SEGMENT_AGE = "segmentAge";
public static final String PINOT_TIER_STORAGE_TYPE = "storageType";
public static final String PINOT_TIER_SERVER_TAG = "serverTag";

/////////////
// Pinot REALTIME table configurations
/////////////
public static final String PINOT_REALTIME_CONFIGS_KEY = "pinotRealtime";
public static final String PINOT_RT_CONSUMING_TAG_KEY = "realtimeConsuming";
public static final String PINOT_RT_COMPLETED_TAG_KEY = "realtimeCompleted";
public static final String PINOT_STREAM_CONFIGS_KEY = "streamConfigs";
public static final String STREAM_KAFKA_DECODER_CLASS_NAME_KEY =
"stream.kafka.decoder.class.name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
Expand Down Expand Up @@ -128,8 +130,20 @@ public void testBuildRealtimeTableConfig() {
"largest", actualStreamConfigs.get("stream.kafka.consumer.prop.auto.offset.reset"));

// Verify tenant configs
assertEquals("defaultBroker", tableConfig.getTenantConfig().getBroker());
assertEquals("defaultServer", tableConfig.getTenantConfig().getServer());
TenantConfig tenantConfig = tableConfig.getTenantConfig();
assertEquals("defaultBroker", tenantConfig.getBroker());
assertEquals("defaultServer", tenantConfig.getServer());
assertEquals("tier-for-consuming", tenantConfig.getTagOverrideConfig().getRealtimeConsuming());
assertEquals("tier-for-completed", tenantConfig.getTagOverrideConfig().getRealtimeCompleted());

// Verify tier configs
assertEquals(1, tableConfig.getTierConfigsList().size());
TierConfig tierConfig = tableConfig.getTierConfigsList().get(0);
assertEquals("hot-data-tier", tierConfig.getName());
assertEquals("time", tierConfig.getSegmentSelectorType());
assertEquals("5d", tierConfig.getSegmentAge());
assertEquals("pinot_server", tierConfig.getStorageType());
assertEquals("tier-for-hot-data", tierConfig.getServerTag());

// Verify indexing related configs
assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,23 @@ pinotRealtime = {
}
}

transformConfigs= [{
transformConfigs = [{
columnName = "bucket_start_time_millis"
transformFunction = "round(start_time_millis, 3600000)"
}]

tagOverrideConfigs = {
realtimeConsuming = "tier-for-consuming"
realtimeCompleted = "tier-for-completed"
}

tierConfigs = [{
name = "hot-data-tier"
segmentSelectorType = "time"
segmentAge = "5d"
storageType = "pinot_server"
serverTag = "tier-for-hot-data"
}]
}

pinotOffline = {
Expand Down

0 comments on commit 46edd83

Please sign in to comment.