Skip to content

Commit

Permalink
CCMSG-1133: Make hive table name configurable (#574)
Browse files Browse the repository at this point in the history
  • Loading branch information
erdody authored Jul 8, 2021
1 parent 05bc6af commit 1cbceff
Show file tree
Hide file tree
Showing 18 changed files with 547 additions and 577 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<apacheds-jdbm1.version>2.0.0-M2</apacheds-jdbm1.version>
<kafka.connect.maven.plugin.version>0.11.1</kafka.connect.maven.plugin.version>
<maven.release.plugin.version>2.5.3</maven.release.plugin.version>
<kafka.connect.storage.common.version>10.1.1</kafka.connect.storage.common.version>
<kafka.connect.storage.common.version>10.2.0</kafka.connect.storage.common.version>
<confluent-log4j.version>1.2.17-cp2</confluent-log4j.version>
<commons.collections.version>3.2.2</commons.collections.version>
<libthrift.version>0.13.0</libthrift.version>
Expand Down
19 changes: 12 additions & 7 deletions src/main/java/io/confluent/connect/hdfs/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ private void initializeHiveServices(Configuration hadoopConfiguration) {
@Override
public void createTable(
String database, String tableName, Schema schema,
Partitioner partitioner
Partitioner partitioner, String topic
) {
newHiveUtil.createTable(database, tableName, schema, partitioner);
newHiveUtil.createTable(database, tableName, schema, partitioner, topic);
log.debug("Created Hive table {}", tableName);
}

Expand Down Expand Up @@ -338,7 +338,8 @@ private void initializeTopicPartitionWriters(Set<TopicPartition> assignment) {
schemaFileReader,
executorService,
hiveUpdateFutures,
time
time,
connectorConfig.getHiveTableName(tp.topic())
);
topicPartitionWriters.put(tp, topicPartitionWriter);
}
Expand Down Expand Up @@ -406,14 +407,17 @@ public void syncWithHive() throws ConnectException {
connectorConfig,
path
);
hive.createTable(hiveDatabase, topic, latestSchema, partitioner);
List<String> partitions = hiveMetaStore.listPartitions(hiveDatabase, topic, (short) -1);
String hiveTableName = connectorConfig.getHiveTableName(topic);
hive.createTable(hiveDatabase, hiveTableName, latestSchema, partitioner, topic);
List<String> partitions = hiveMetaStore.listPartitions(hiveDatabase,
hiveTableName,
(short) -1);
FileStatus[] statuses = FileUtils.getDirectories(storage, new Path(topicDir));
for (FileStatus status : statuses) {
String location = status.getPath().toString();
if (!partitions.contains(location)) {
String partitionValue = getPartitionValue(location);
hiveMetaStore.addPartition(hiveDatabase, topic, partitionValue);
hiveMetaStore.addPartition(hiveDatabase, hiveTableName, partitionValue);
}
}
}
Expand All @@ -439,7 +443,8 @@ public void open(Collection<TopicPartition> partitions) {
schemaFileReader,
executorService,
hiveUpdateFutures,
time
time,
connectorConfig.getHiveTableName(tp.topic())
);
topicPartitionWriters.put(tp, topicPartitionWriter);
// We need to immediately start recovery to ensure we pause consumption of messages for the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@

public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig {

private static final String TOPIC_SUBSTITUTION = "${topic}";

// HDFS Group
// This config is deprecated and will be removed in future releases. Use store.url instead.
public static final String HDFS_URL_CONFIG = "hdfs.url";
Expand All @@ -91,6 +93,12 @@ public class HdfsSinkConnectorConfig extends StorageSinkConnectorConfig {
private static final String HADOOP_HOME_DOC = "The Hadoop home directory.";
private static final String HADOOP_HOME_DISPLAY = "Hadoop home directory";

public static final String HIVE_TABLE_NAME_CONFIG = "hive.table.name";
public static final String HIVE_TABLE_NAME_DEFAULT = TOPIC_SUBSTITUTION;
private static final String HIVE_TABLE_NAME_DOC = "The hive table name to use. "
+ "It must contain '${topic}' to inject the corresponding topic name.";
private static final String HIVE_TABLE_NAME_DISPLAY = "Hive table name";

// Storage group
public static final String TOPIC_CAPTURE_GROUPS_REGEX_CONFIG = "topic.capture.groups.regex";
public static final String TOPIC_CAPTURE_GROUPS_REGEX_DISPLAY = "Topic Capture Groups Regex";
Expand Down Expand Up @@ -244,6 +252,18 @@ public static ConfigDef newConfigDef() {
Width.SHORT,
LOGS_DIR_DISPLAY
);

configDef.define(
HIVE_TABLE_NAME_CONFIG,
Type.STRING,
HIVE_TABLE_NAME_DEFAULT,
Importance.LOW,
HIVE_TABLE_NAME_DOC,
group,
++orderInGroup,
Width.SHORT,
HIVE_TABLE_NAME_DISPLAY
);
}

{
Expand Down Expand Up @@ -545,6 +565,17 @@ public String getTopicsDirFromTopic(String topic) {
return getDirFromTopic(getString(TOPICS_DIR_CONFIG), topic, topicDirGroupsMaxIndex);
}

/**
* Performs all substitutions on {@value HIVE_TABLE_NAME_CONFIG} and calculates the final
* hive table name for the given topic
*
* @param topic String - the topic name
* @return String the hive table name
*/
public String getHiveTableName(String topic) {
return getString(HIVE_TABLE_NAME_CONFIG).replace("${topic}", topic);
}

/**
* Performs all substitutions and calculates the final directory for a topic
*
Expand Down Expand Up @@ -630,6 +661,41 @@ private void validateDirsAndRegex() {

validateReplacements(TOPICS_DIR_CONFIG);
validateReplacements(LOGS_DIR_CONFIG);
validateHiveTableNameReplacements();
}

private void validateHiveTableNameReplacements() {
String config = HIVE_TABLE_NAME_CONFIG;
String configValue = getString(config);

if (!configValue.contains(TOPIC_SUBSTITUTION)) {
throw new ConfigException(
String.format(
"%s: '%s' has to contain topic substitution '%s'.",
config,
getString(config),
TOPIC_SUBSTITUTION
)
);
}

// remove all valid ${} substitutions
String tableName = configValue.replace(TOPIC_SUBSTITUTION, "");

// check for invalid ${} substitutions
Matcher invalidMatcher = INVALID_SUB_PATTERN.matcher(tableName);
if (invalidMatcher.find()) {
throw new ConfigException(
String.format(
"%s: '%s' contains an invalid ${} substitution '%s'. "
+ "Valid substitution is '%s'",
config,
getString(config),
invalidMatcher.group(),
TOPIC_SUBSTITUTION
)
);
}
}

/**
Expand All @@ -640,18 +706,19 @@ private void validateDirsAndRegex() {
private void validateReplacements(String config) {
// remove all valid ${} substitutions
Matcher partsMatcher = SUBSTITUTION_PATTERN.matcher(getString(config));
String dir = partsMatcher.replaceAll("").replace("${topic}", "");
String dir = partsMatcher.replaceAll("").replace(TOPIC_SUBSTITUTION, "");

// check for invalid ${} substitutions
Matcher invalidMatcher = INVALID_SUB_PATTERN.matcher(dir);
if (invalidMatcher.find()) {
throw new ConfigException(
String.format(
"%s: %s contains an invalid ${} substitution %s. Valid substitutions are ${topic} "
"%s: %s contains an invalid ${} substitution %s. Valid substitutions are %s "
+ "and ${n} where n >= 0.",
config,
getString(config),
invalidMatcher.group()
invalidMatcher.group(),
TOPIC_SUBSTITUTION
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class TopicPartitionWriter {
private final ExecutorService executorService;
private final Queue<Future<Void>> hiveUpdateFutures;
private final Set<String> hivePartitions;
private final String hiveTableName;

public TopicPartitionWriter(
TopicPartition tp,
Expand Down Expand Up @@ -141,7 +142,8 @@ public TopicPartitionWriter(
null,
null,
null,
time
time,
tp.topic()
);
}

Expand All @@ -161,8 +163,10 @@ public TopicPartitionWriter(
schemaFileReader,
ExecutorService executorService,
Queue<Future<Void>> hiveUpdateFutures,
Time time
Time time,
String hiveTableName
) {
this.hiveTableName = hiveTableName;
this.time = time;
this.tp = tp;
this.context = context;
Expand Down Expand Up @@ -903,7 +907,7 @@ private void setRetryTimeout(long timeoutMs) {
private void createHiveTable() {
Future<Void> future = executorService.submit(() -> {
try {
hive.createTable(hiveDatabase, tp.topic(), currentSchema, partitioner);
hive.createTable(hiveDatabase, hiveTableName, currentSchema, partitioner, tp.topic());
} catch (Throwable e) {
log.error("Creating Hive table threw unexpected error", e);
}
Expand All @@ -915,7 +919,7 @@ private void createHiveTable() {
private void alterHiveSchema() {
Future<Void> future = executorService.submit(() -> {
try {
hive.alterSchema(hiveDatabase, tp.topic(), currentSchema);
hive.alterSchema(hiveDatabase, hiveTableName, currentSchema);
} catch (Throwable e) {
log.error("Altering Hive schema threw unexpected error", e);
}
Expand All @@ -927,7 +931,7 @@ private void alterHiveSchema() {
private void addHivePartition(final String location) {
Future<Void> future = executorService.submit(() -> {
try {
hiveMetaStore.addPartition(hiveDatabase, tp.topic(), location);
hiveMetaStore.addPartition(hiveDatabase, hiveTableName, location);
} catch (Throwable e) {
log.error("Adding Hive partition threw unexpected error", e);
}
Expand Down
17 changes: 11 additions & 6 deletions src/main/java/io/confluent/connect/hdfs/avro/AvroHiveUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ public AvroHiveUtil(
}

@Override
public void createTable(String database, String tableName, Schema schema, Partitioner partitioner)
throws HiveMetaStoreException {
Table table = constructAvroTable(database, tableName, schema, partitioner);
public void createTable(String database,
String tableName,
Schema schema,
Partitioner partitioner,
String topic)
throws HiveMetaStoreException {
Table table = constructAvroTable(database, tableName, schema, partitioner, topic);
hiveMetaStore.createTable(table);
}

Expand All @@ -80,14 +84,15 @@ private Table constructAvroTable(
String database,
String tableName,
Schema schema,
Partitioner partitioner
Partitioner partitioner,
String topic
)
throws HiveMetaStoreException {
Table table = newTable(database, tableName);
table.setTableType(TableType.EXTERNAL_TABLE);
table.getParameters().put("EXTERNAL", "TRUE");
// tableName is always the topic name
String tablePath = hiveDirectoryName(url, config.getTopicsDirFromTopic(tableName), tableName);

String tablePath = hiveDirectoryName(url, config.getTopicsDirFromTopic(topic), topic);
table.setDataLocation(new Path(tablePath));
table.setSerializationLib(AVRO_SERDE);
try {
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/io/confluent/connect/hdfs/hive/HiveUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,26 @@
public abstract class HiveUtil extends io.confluent.connect.storage.hive.HiveUtil {

public HiveUtil(HdfsSinkConnectorConfig connectorConfig, HiveMetaStore hiveMetaStore) {
super(connectorConfig, hiveMetaStore);
this.url = connectorConfig.url();
super(connectorConfig, hiveMetaStore, connectorConfig.url());
}

@Override
public void createTable(
String database,
String tableName,
Schema schema,
io.confluent.connect.storage.partitioner.Partitioner<FieldSchema> partitioner
io.confluent.connect.storage.partitioner.Partitioner<FieldSchema> partitioner,
String topic
) {
createTable(database, tableName, schema, (Partitioner) partitioner);
createTable(database, tableName, schema, (Partitioner) partitioner, topic);
}

public abstract void createTable(
String database,
String tableName,
Schema schema,
Partitioner partitioner
Partitioner partitioner,
String topic
);

}
11 changes: 6 additions & 5 deletions src/main/java/io/confluent/connect/hdfs/orc/OrcHiveUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,26 @@ public void createTable(
String database,
String tableName,
Schema schema,
Partitioner<FieldSchema> partitioner
Partitioner<FieldSchema> partitioner,
String topic
) throws HiveMetaStoreException {
Table table = constructOrcTable(database, tableName, schema, partitioner);
Table table = constructOrcTable(database, tableName, schema, partitioner, topic);
hiveMetaStore.createTable(table);
}

private Table constructOrcTable(
String database,
String tableName,
Schema schema,
Partitioner<FieldSchema> partitioner
Partitioner<FieldSchema> partitioner,
String topic
) throws HiveMetaStoreException {

Table table = newTable(database, tableName);
table.setTableType(TableType.EXTERNAL_TABLE);
table.getParameters().put("EXTERNAL", "TRUE");

// tableName is always the topic name
String tablePath = hiveDirectoryName(url, config.getTopicsDirFromTopic(tableName), tableName);
String tablePath = hiveDirectoryName(url, config.getTopicsDirFromTopic(topic), topic);
table.setDataLocation(new Path(tablePath));
table.setSerializationLib(getHiveOrcSerde());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ public void createTable(
String database,
String tableName,
Schema schema,
Partitioner partitioner
Partitioner partitioner,
String topic
) throws HiveMetaStoreException {
Table table = constructParquetTable(database, tableName, schema, partitioner);
Table table = constructParquetTable(database, tableName, schema, partitioner, topic);
hiveMetaStore.createTable(table);
}

Expand All @@ -65,13 +66,14 @@ private Table constructParquetTable(
String database,
String tableName,
Schema schema,
Partitioner partitioner
Partitioner partitioner,
String topic
) throws HiveMetaStoreException {
Table table = newTable(database, tableName);
table.setTableType(TableType.EXTERNAL_TABLE);
table.getParameters().put("EXTERNAL", "TRUE");
// tableName is always the topic name
String tablePath = hiveDirectoryName(url, config.getTopicsDirFromTopic(tableName), tableName);

String tablePath = hiveDirectoryName(url, config.getTopicsDirFromTopic(topic), topic);
table.setDataLocation(new Path(tablePath));
table.setSerializationLib(getHiveParquetSerde());
try {
Expand Down
Loading

0 comments on commit 1cbceff

Please sign in to comment.