Skip to content

Commit

Permalink
Revert "[HUDI-4324] Remove use_jdbc config from hudi sync (apache#6072)…
Browse files Browse the repository at this point in the history
…" (apache#6160)

This reverts commit 046044c.
  • Loading branch information
xushiyan authored Jul 23, 2022
1 parent a36762a commit d5c7c79
Show file tree
Hide file tree
Showing 20 changed files with 72 additions and 29 deletions.
2 changes: 1 addition & 1 deletion conf/hudi-defaults.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Example:
# hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
# hoodie.datasource.hive_sync.mode jdbc
# hoodie.datasource.hive_sync.use_jdbc true
# hoodie.datasource.hive_sync.support_timestamp false
# hoodie.index.type BLOOM
# hoodie.metadata.enable false
3 changes: 1 addition & 2 deletions docker/demo/config/hoodie-incr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,5 @@ hoodie.deltastreamer.source.hoodieincr.path=/docker_hoodie_sync_valid_test
hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt=true
# hive sync
hoodie.datasource.hive_sync.table=docker_hoodie_sync_valid_test_2
hoodie.datasource.hive_sync.mode=jdbc
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000
hoodie.datasource.hive_sync.partition_fields=partition
hoodie.datasource.hive_sync.partition_fields=partition
2 changes: 0 additions & 2 deletions docker/demo/sparksql-incremental.commands
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down Expand Up @@ -80,7 +79,6 @@ spark.sql("select key, `_hoodie_partition_path` as datestr, symbol, ts, open, cl
option(HoodieWriteConfig.TBL_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), "stock_ticks_derived_mor_bs").
option(HoodieSyncConfig.META_SYNC_DATABASE_NAME.key(), "default").
option(HiveSyncConfigHolder.HIVE_SYNC_MODE.key(), "jdbc").
option(HiveSyncConfigHolder.HIVE_URL.key(), "jdbc:hive2://hiveserver:10000").
option(HiveSyncConfigHolder.HIVE_USER.key(), "hive").
option(HiveSyncConfigHolder.HIVE_PASS.key(), "hive").
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void testLoadGlobalConfFile() {
DFSPropertiesConfiguration.refreshGlobalProps();
assertEquals(5, DFSPropertiesConfiguration.getGlobalProps().size());
assertEquals("jdbc:hive2://localhost:10000", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.jdbcurl"));
assertEquals("jdbc", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.mode"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.use_jdbc"));
assertEquals("false", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.datasource.hive_sync.support_timestamp"));
assertEquals("BLOOM", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.index.type"));
assertEquals("true", DFSPropertiesConfiguration.getGlobalProps().get("hoodie.metadata.enable"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

# Example:
hoodie.datasource.hive_sync.jdbcurl jdbc:hive2://localhost:10000
hoodie.datasource.hive_sync.mode jdbc
hoodie.datasource.hive_sync.use_jdbc true
hoodie.datasource.hive_sync.support_timestamp false
hoodie.index.type BLOOM
hoodie.metadata.enable true
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,12 @@ private FlinkOptions() {
.defaultValue(false)
.withDescription("Assume partitioning is yyyy/mm/dd, default false");

public static final ConfigOption<Boolean> HIVE_SYNC_USE_JDBC = ConfigOptions
.key("hive_sync.use_jdbc")
.booleanType()
.defaultValue(true)
.withDescription("Use JDBC when hive synchronization is enabled, default true");

public static final ConfigOption<Boolean> HIVE_SYNC_AUTO_CREATE_DB = ConfigOptions
.key("hive_sync.auto_create_db")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION;
Expand Down Expand Up @@ -104,6 +105,7 @@ public static Properties buildSyncConfig(Configuration conf) {
props.setPropertyIfNonNull(HIVE_TABLE_SERDE_PROPERTIES.key(), conf.getString(FlinkOptions.HIVE_SYNC_TABLE_SERDE_PROPERTIES));
props.setPropertyIfNonNull(META_SYNC_PARTITION_FIELDS.key(), String.join(",", FilePathUtils.extractHivePartitionFields(conf)));
props.setPropertyIfNonNull(META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), conf.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME));
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC)));
props.setPropertyIfNonNull(META_SYNC_USE_FILE_LISTING_FROM_METADATA.key(), String.valueOf(conf.getBoolean(FlinkOptions.METADATA_ENABLED)));
props.setPropertyIfNonNull(HIVE_IGNORE_EXCEPTIONS.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS)));
props.setPropertyIfNonNull(HIVE_SUPPORT_TIMESTAMP_TYPE.key(), String.valueOf(conf.getBoolean(FlinkOptions.HIVE_SYNC_SUPPORT_TIMESTAMP)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--hive-sync-assume-date-partitioning"}, description = "Assume partitioning is yyyy/mm/dd, default false")
public Boolean hiveSyncAssumeDatePartition = false;

@Parameter(names = {"--hive-sync-use-jdbc"}, description = "Use JDBC when hive synchronization is enabled, default true")
public Boolean hiveSyncUseJdbc = true;

@Parameter(names = {"--hive-sync-auto-create-db"}, description = "Auto create hive database if it does not exists, default true")
public Boolean hiveSyncAutoCreateDb = true;

Expand Down Expand Up @@ -417,6 +420,7 @@ public static org.apache.flink.configuration.Configuration toFlinkConfig(FlinkSt
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_FIELDS, config.hiveSyncPartitionFields);
conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS_NAME, config.hiveSyncPartitionExtractorClass);
conf.setBoolean(FlinkOptions.HIVE_SYNC_ASSUME_DATE_PARTITION, config.hiveSyncAssumeDatePartition);
conf.setBoolean(FlinkOptions.HIVE_SYNC_USE_JDBC, config.hiveSyncUseJdbc);
conf.setBoolean(FlinkOptions.HIVE_SYNC_AUTO_CREATE_DB, config.hiveSyncAutoCreateDb);
conf.setBoolean(FlinkOptions.HIVE_SYNC_IGNORE_EXCEPTIONS, config.hiveSyncIgnoreExceptions);
conf.setBoolean(FlinkOptions.HIVE_SYNC_SKIP_RO_SUFFIX, config.hiveSyncSkipRoSuffix);
Expand Down
1 change: 1 addition & 0 deletions hudi-kafka-connect/demo/config-sink-hive.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"hoodie.datasource.hive_sync.table": "huditesttopic",
"hoodie.datasource.hive_sync.partition_fields": "date",
"hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"dfs.client.use.datanode.hostname": "true",
"hive.metastore.uris": "thrift://hivemetastore:9083",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public String getHadoopConfHome() {
public static final String HIVE_URL = "hoodie.datasource.hive_sync.jdbcurl";
public static final String HIVE_PARTITION_FIELDS = "hoodie.datasource.hive_sync.partition_fields";
public static final String HIVE_PARTITION_EXTRACTOR_CLASS = "hoodie.datasource.hive_sync.partition_extractor_class";
public static final String HIVE_USE_JDBC = "hoodie.datasource.hive_sync.use_jdbc";
public static final String HIVE_SYNC_MODE = "hoodie.datasource.hive_sync.mode";
public static final String HIVE_AUTO_CREATE_DATABASE = "hoodie.datasource.hive_sync.auto_create_database";
public static final String HIVE_IGNORE_EXCEPTIONS = "hoodie.datasource.hive_sync.ignore_exceptions";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ object DataSourceWriteOptions {
val HIVE_ASSUME_DATE_PARTITION: ConfigProperty[String] = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION
@Deprecated
val HIVE_USE_PRE_APACHE_INPUT_FORMAT: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT

/** @deprecated Use {@link HIVE_SYNC_MODE} instead of this config from 0.9.0 */
@Deprecated
val HIVE_USE_JDBC: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_USE_JDBC
@Deprecated
val HIVE_AUTO_CREATE_DATABASE: ConfigProperty[String] = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE
@Deprecated
Expand Down Expand Up @@ -496,6 +500,9 @@ object DataSourceWriteOptions {
/** @deprecated Use {@link HIVE_USE_PRE_APACHE_INPUT_FORMAT} and its methods instead */
@Deprecated
val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT.key()
/** @deprecated Use {@link HIVE_USE_JDBC} and its methods instead */
@Deprecated
val HIVE_USE_JDBC_OPT_KEY = HiveSyncConfigHolder.HIVE_USE_JDBC.key()
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
@Deprecated
val HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.key()
Expand Down Expand Up @@ -686,6 +693,9 @@ object DataSourceWriteOptions {
val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = HoodieSyncConfig.META_SYNC_ASSUME_DATE_PARTITION.defaultValue()
@Deprecated
val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
/** @deprecated Use {@link HIVE_USE_JDBC} and its methods instead */
@Deprecated
val DEFAULT_HIVE_USE_JDBC_OPT_VAL = HiveSyncConfigHolder.HIVE_USE_JDBC.defaultValue()
/** @deprecated Use {@link HIVE_AUTO_CREATE_DATABASE} and its methods instead */
@Deprecated
val DEFAULT_HIVE_AUTO_CREATE_DATABASE_OPT_KEY = HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE.defaultValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ object HoodieWriterUtils {
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS)
hoodieConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
hoodieConfig.setDefaultValue(HIVE_STYLE_PARTITIONING)
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_USE_JDBC)
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE)
hoodieConfig.setDefaultValue(HiveSyncConfigHolder.HIVE_SYNC_AS_DATA_SOURCE_TABLE)
hoodieConfig.setDefaultValue(ASYNC_COMPACT_ENABLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USER;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_PRE_APACHE_INPUT_FORMAT;
import static org.apache.hudi.hive.HiveSyncConfigHolder.METASTORE_URIS;

Expand Down Expand Up @@ -94,6 +95,9 @@ public static class HiveSyncConfigParams {
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to "
+ "org.apache.hudi input format.")
public Boolean usePreApacheInputFormat;
@Deprecated
@Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
public Boolean useJdbc;
@Parameter(names = {"--metastore-uris"}, description = "Hive metastore uris")
public String metastoreUris;
@Parameter(names = {"--sync-mode"}, description = "Mode to choose for Hive ops. Valid values are hms,glue,jdbc and hiveql")
Expand Down Expand Up @@ -138,6 +142,7 @@ public TypedProperties toProps() {
props.setPropertyIfNonNull(HIVE_PASS.key(), hivePass);
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
props.setPropertyIfNonNull(HIVE_USE_PRE_APACHE_INPUT_FORMAT.key(), usePreApacheInputFormat);
props.setPropertyIfNonNull(HIVE_USE_JDBC.key(), useJdbc);
props.setPropertyIfNonNull(HIVE_SYNC_MODE.key(), syncMode);
props.setPropertyIfNonNull(METASTORE_URIS.key(), metastoreUris);
props.setPropertyIfNonNull(HIVE_AUTO_CREATE_DATABASE.key(), autoCreateDatabase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ public class HiveSyncConfigHolder {
.withDocumentation("Flag to choose InputFormat under com.uber.hoodie package instead of org.apache.hudi package. "
+ "Use this when you are in the process of migrating from "
+ "com.uber.hoodie to org.apache.hudi. Stop using this after you migrated the table definition to org.apache.hudi input format");
/**
* @deprecated Use {@link #HIVE_SYNC_MODE} instead of this config from 0.9.0
*/
@Deprecated
public static final ConfigProperty<String> HIVE_USE_JDBC = ConfigProperty
.key("hoodie.datasource.hive_sync.use_jdbc")
.defaultValue("true")
.deprecatedAfter("0.9.0")
.withDocumentation("Use JDBC when hive synchronization is enabled");
public static final ConfigProperty<String> METASTORE_URIS = ConfigProperty
.key("hoodie.datasource.hive_sync.metastore.uris")
.defaultValue("thrift://localhost:9083")
Expand Down Expand Up @@ -100,7 +109,7 @@ public class HiveSyncConfigHolder {
.withDocumentation("The number of partitions one batch when synchronous partitions to hive.");
public static final ConfigProperty<String> HIVE_SYNC_MODE = ConfigProperty
.key("hoodie.datasource.hive_sync.mode")
.defaultValue("jdbc")
.noDefaultValue()
.withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.");
public static final ConfigProperty<Boolean> HIVE_SYNC_BUCKET_SYNC = ConfigProperty
.key("hoodie.datasource.hive_sync.bucket_sync")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.hive.ddl.DDLExecutor;
import org.apache.hudi.hive.ddl.HMSDDLExecutor;
Expand Down Expand Up @@ -48,6 +49,7 @@

import static org.apache.hudi.hadoop.utils.HoodieHiveUtils.GLOBALLY_CONSISTENT_READ_TIMESTAMP;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_USE_JDBC;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_DATABASE_NAME;
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

Expand All @@ -70,19 +72,23 @@ public HoodieHiveSyncClient(HiveSyncConfig config) {
// Support JDBC, HiveQL and metastore based implementations for backwards compatibility. Future users should
// disable jdbc and depend on metastore client for all hive registrations
try {
HiveSyncMode syncMode = HiveSyncMode.of(config.getStringOrDefault(HIVE_SYNC_MODE));
switch (syncMode) {
case HMS:
ddlExecutor = new HMSDDLExecutor(config);
break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config);
break;
case JDBC:
ddlExecutor = new JDBCExecutor(config);
break;
default:
throw new HoodieHiveSyncException("Invalid sync mode given " + config.getString(HIVE_SYNC_MODE));
if (!StringUtils.isNullOrEmpty(config.getString(HIVE_SYNC_MODE))) {
HiveSyncMode syncMode = HiveSyncMode.of(config.getString(HIVE_SYNC_MODE));
switch (syncMode) {
case HMS:
ddlExecutor = new HMSDDLExecutor(config);
break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config);
break;
case JDBC:
ddlExecutor = new JDBCExecutor(config);
break;
default:
throw new HoodieHiveSyncException("Invalid sync mode given " + config.getString(HIVE_SYNC_MODE));
}
} else {
ddlExecutor = config.getBoolean(HIVE_USE_JDBC) ? new JDBCExecutor(config) : new HiveQueryDDLExecutor(config);
}
this.client = Hive.get(config.getHiveConf()).getMSC();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import static org.apache.hudi.sync.common.util.TableUtils.tableId;

/**
* This class offers DDL executor backed by the HiveQL Driver.
* This class offers DDL executor backed by the hive.ql Driver This class preserves the old useJDBC = false way of doing things.
*/
public class HiveQueryDDLExecutor extends QueryBasedDDLExecutor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import static org.apache.hudi.hive.util.HiveSchemaUtil.HIVE_ESCAPE_CHARACTER;

/**
* This class offers DDL executor backed by the jdbc.
* This class offers DDL executor backed by the jdbc This class preserves the old useJDBC = true way of doing things.
*/
public class JDBCExecutor extends QueryBasedDDLExecutor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.io.InputStream;
import java.util.Properties;

import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_SYNC_MODE;
import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_BASE_PATH;

Expand Down Expand Up @@ -93,7 +92,6 @@ Properties mkGlobalHiveSyncProps(boolean forRemote) {
String jdbcUrl = forRemote ? loadedProps.getProperty(REMOTE_HIVE_SERVER_JDBC_URLS)
: loadedProps.getProperty(LOCAL_HIVE_SERVER_JDBC_URLS, loadedProps.getProperty(HIVE_URL.key()));
props.setPropertyIfNonNull(HIVE_URL.key(), jdbcUrl);
props.setProperty(HIVE_SYNC_MODE.key(), "jdbc");
LOG.info("building hivesync config forRemote: " + forRemote + " " + jdbcUrl + " "
+ basePath);
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private static Iterable<Object[]> syncModeAndSchemaFromCommitMetadataAndManagedT
return opts;
}

// (useSchemaFromCommitMetadata, syncAsDataSource, syncMode)
// (useJdbc, useSchemaFromCommitMetadata, syncAsDataSource)
private static Iterable<Object[]> syncDataSourceTableParams() {
List<Object[]> opts = new ArrayList<>();
for (Object mode : SYNC_MODES) {
Expand Down
Loading

0 comments on commit d5c7c79

Please sign in to comment.