Skip to content

Commit

Permalink
broke the process into Source and sink
Browse files Browse the repository at this point in the history
  • Loading branch information
asishupadhyay committed Feb 10, 2025
1 parent fc997e3 commit cc932cf
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 180 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.redis.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;

public class RedisBaseOptions {

public static final String CONNECTOR_IDENTITY = "Redis";

public enum RedisMode {
SINGLE,
CLUSTER;
}

public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("redis hostname or ip");

public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("redis port");

public static final Option<String> AUTH =
Options.key("auth")
.stringType()
.noDefaultValue()
.withDescription(
"redis authentication password, you need it when you connect to an encrypted cluster");

public static final Option<Integer> DB_NUM =
Options.key("db_num")
.intType()
.defaultValue(0)
.withDescription(
"Redis database index id, it is connected to db 0 by default");

public static final Option<String> USER =
Options.key("user")
.stringType()
.noDefaultValue()
.withDescription(
"redis authentication user, you need it when you connect to an encrypted cluster");

public static final Option<String> KEY_PATTERN =
Options.key("keys")
.stringType()
.noDefaultValue()
.withDescription(
"keys pattern, redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type");

public static final Option<String> KEY =
Options.key("key")
.stringType()
.noDefaultValue()
.withDescription("The value of key you want to write to redis.");

public static final Option<RedisDataType> DATA_TYPE =
Options.key("data_type")
.enumType(RedisDataType.class)
.noDefaultValue()
.withDescription("redis data types, support string hash list set zset.");

public static final Option<RedisBaseOptions.Format> FORMAT =
Options.key("format")
.enumType(RedisBaseOptions.Format.class)
.defaultValue(RedisBaseOptions.Format.JSON)
.withDescription(
"the format of upstream data, now only support json and text, default json.");

public static final Option<RedisBaseOptions.RedisMode> MODE =
Options.key("mode")
.enumType(RedisBaseOptions.RedisMode.class)
.defaultValue(RedisMode.SINGLE)
.withDescription(
"redis mode, support single or cluster, default value is single");

public static final Option<List<String>> NODES =
Options.key("nodes")
.listType()
.noDefaultValue()
.withDescription(
"redis nodes information, used in cluster mode, must like as the following format: [host1:port1, host2:port2]");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(10)
.withDescription(
"batch_size is used to control the size of a batch of data during read and write operations"
+ ",default 10");

public enum Format {
JSON,
// TEXT will be supported later
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public class RedisParameters implements Serializable {
private String keysPattern;
private String keyField;
private RedisDataType redisDataType;
private RedisSinkOptions.RedisMode mode;
private RedisSinkOptions.HashKeyParseMode hashKeyParseMode;
private RedisBaseOptions.RedisMode mode;
private RedisSourceOptions.HashKeyParseMode hashKeyParseMode;
private List<String> redisNodes = Collections.emptyList();
private long expire = RedisSinkOptions.EXPIRE.defaultValue();
private int batchSize = RedisSinkOptions.BATCH_SIZE.defaultValue();
private int batchSize = RedisBaseOptions.BATCH_SIZE.defaultValue();
private Boolean supportCustomKey;
private String valueField;
private String hashKeyField;
Expand All @@ -66,41 +66,41 @@ public class RedisParameters implements Serializable {

public void buildWithConfig(ReadonlyConfig config) {
// set host
this.host = config.get(RedisSinkOptions.HOST);
this.host = config.get(RedisBaseOptions.HOST);
// set port
this.port = config.get(RedisSinkOptions.PORT);
this.port = config.get(RedisBaseOptions.PORT);
// set db_num
this.dbNum = config.get(RedisSinkOptions.DB_NUM);
this.dbNum = config.get(RedisBaseOptions.DB_NUM);
// set hash key mode
this.hashKeyParseMode = config.get(RedisSinkOptions.HASH_KEY_PARSE_MODE);
this.hashKeyParseMode = config.get(RedisSourceOptions.HASH_KEY_PARSE_MODE);
// set expire
this.expire = config.get(RedisSinkOptions.EXPIRE);
// set auth
if (config.getOptional(RedisSinkOptions.AUTH).isPresent()) {
this.auth = config.get(RedisSinkOptions.AUTH);
if (config.getOptional(RedisBaseOptions.AUTH).isPresent()) {
this.auth = config.get(RedisBaseOptions.AUTH);
}
// set user
if (config.getOptional(RedisSinkOptions.USER).isPresent()) {
this.user = config.get(RedisSinkOptions.USER);
if (config.getOptional(RedisBaseOptions.USER).isPresent()) {
this.user = config.get(RedisBaseOptions.USER);
}
// set mode
this.mode = config.get(RedisSinkOptions.MODE);
this.mode = config.get(RedisBaseOptions.MODE);
// set redis nodes information
if (config.getOptional(RedisSinkOptions.NODES).isPresent()) {
this.redisNodes = config.get(RedisSinkOptions.NODES);
if (config.getOptional(RedisBaseOptions.NODES).isPresent()) {
this.redisNodes = config.get(RedisBaseOptions.NODES);
}
// set key
if (config.getOptional(RedisSinkOptions.KEY).isPresent()) {
this.keyField = config.get(RedisSinkOptions.KEY);
if (config.getOptional(RedisBaseOptions.KEY).isPresent()) {
this.keyField = config.get(RedisBaseOptions.KEY);
}
// set keysPattern
if (config.getOptional(RedisSinkOptions.KEY_PATTERN).isPresent()) {
this.keysPattern = config.get(RedisSinkOptions.KEY_PATTERN);
if (config.getOptional(RedisBaseOptions.KEY_PATTERN).isPresent()) {
this.keysPattern = config.get(RedisBaseOptions.KEY_PATTERN);
}
// set redis data type verification factory createAndPrepareSource
this.redisDataType = config.get(RedisSinkOptions.DATA_TYPE);
this.redisDataType = config.get(RedisBaseOptions.DATA_TYPE);
// Indicates the number of keys to attempt to return per iteration.default 10
this.batchSize = config.get(RedisSinkOptions.BATCH_SIZE);
this.batchSize = config.get(RedisBaseOptions.BATCH_SIZE);
// set support custom key
if (config.getOptional(RedisSinkOptions.SUPPORT_CUSTOM_KEY).isPresent()) {
this.supportCustomKey = config.get(RedisSinkOptions.SUPPORT_CUSTOM_KEY);
Expand All @@ -122,7 +122,7 @@ public void buildWithConfig(ReadonlyConfig config) {
public RedisClient buildRedisClient() {
Jedis jedis = this.buildJedis();
this.redisVersion = extractRedisVersion(jedis);
if (mode.equals(RedisSinkOptions.RedisMode.SINGLE)) {
if (mode.equals(RedisBaseOptions.RedisMode.SINGLE)) {
return new RedisSingleClient(this, jedis, redisVersion);
} else {
return new RedisClusterClient(this, jedis, redisVersion);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,146 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.redis.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.util.List;

public class RedisSinkOptions {

public static final String CONNECTOR_IDENTITY = "Redis";

public enum RedisMode {
SINGLE,
CLUSTER;
}

public enum HashKeyParseMode {
ALL,
KV;
}

public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("redis hostname or ip");

public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("redis port");

public static final Option<String> AUTH =
Options.key("auth")
.stringType()
.noDefaultValue()
.withDescription(
"redis authentication password, you need it when you connect to an encrypted cluster");

public static final Option<Integer> DB_NUM =
Options.key("db_num")
.intType()
.defaultValue(0)
.withDescription(
"Redis database index id, it is connected to db 0 by default");

public static final Option<String> USER =
Options.key("user")
.stringType()
.noDefaultValue()
.withDescription(
"redis authentication user, you need it when you connect to an encrypted cluster");

public static final Option<String> KEY_PATTERN =
Options.key("keys")
.stringType()
.noDefaultValue()
.withDescription(
"keys pattern, redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type");

public static final Option<String> KEY =
Options.key("key")
.stringType()
.noDefaultValue()
.withDescription("The value of key you want to write to redis.");

public static final Option<RedisDataType> DATA_TYPE =
Options.key("data_type")
.enumType(RedisDataType.class)
.noDefaultValue()
.withDescription("redis data types, support string hash list set zset.");

public static final Option<RedisSinkOptions.Format> FORMAT =
Options.key("format")
.enumType(RedisSinkOptions.Format.class)
.defaultValue(RedisSinkOptions.Format.JSON)
.withDescription(
"the format of upstream data, now only support json and text, default json.");

public static final Option<RedisSinkOptions.RedisMode> MODE =
Options.key("mode")
.enumType(RedisSinkOptions.RedisMode.class)
.defaultValue(RedisMode.SINGLE)
.withDescription(
"redis mode, support single or cluster, default value is single");

public static final Option<List<String>> NODES =
Options.key("nodes")
.listType()
.noDefaultValue()
.withDescription(
"redis nodes information, used in cluster mode, must like as the following format: [host1:port1, host2:port2]");

public static final Option<RedisSinkOptions.HashKeyParseMode> HASH_KEY_PARSE_MODE =
Options.key("hash_key_parse_mode")
.enumType(RedisSinkOptions.HashKeyParseMode.class)
.defaultValue(HashKeyParseMode.ALL)
.withDescription(
"hash key parse mode, support all or kv, default value is all");
public class RedisSinkOptions extends RedisBaseOptions {

public static final Option<Long> EXPIRE =
Options.key("expire")
.longType()
.defaultValue(-1L)
.withDescription("Set redis expiration time.");

public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(10)
.withDescription(
"batch_size is used to control the size of a batch of data during read and write operations"
+ ",default 10");

public static final Option<Boolean> SUPPORT_CUSTOM_KEY =
Options.key("support_custom_key")
.booleanType()
.defaultValue(false)
.withDescription(
"if true, the key can be customized by the field value in the upstream data.");

public static final Option<String> VALUE_FIELD =
Options.key("value_field")
.stringType()
.noDefaultValue()
.withDescription(
"The field of value you want to write to redis, support string list set zset");

public static final Option<String> HASH_KEY_FIELD =
Options.key("hash_key_field")
.stringType()
Expand All @@ -152,9 +34,4 @@ public enum HashKeyParseMode {
.stringType()
.noDefaultValue()
.withDescription("The field of hash value you want to write to redis");

public enum Format {
JSON,
// TEXT will be supported later
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.seatunnel.connectors.seatunnel.redis.config;

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

public class RedisSourceOptions extends RedisBaseOptions {
public enum HashKeyParseMode {
ALL,
KV;
}

public static final Option<HashKeyParseMode> HASH_KEY_PARSE_MODE =
Options.key("hash_key_parse_mode")
.enumType(HashKeyParseMode.class)
.defaultValue(HashKeyParseMode.ALL)
.withDescription(
"hash key parse mode, support all or kv, default value is all");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisBaseOptions;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisParameters;
import org.apache.seatunnel.connectors.seatunnel.redis.config.RedisSinkOptions;

import java.io.IOException;
import java.util.Optional;
Expand All @@ -46,7 +46,7 @@ public RedisSink(ReadonlyConfig config, CatalogTable table) {

@Override
public String getPluginName() {
return RedisSinkOptions.CONNECTOR_IDENTITY;
return RedisBaseOptions.CONNECTOR_IDENTITY;
}

@Override
Expand Down
Loading

0 comments on commit cc932cf

Please sign in to comment.