Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Venice Primitive keys #83

Merged
merged 2 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions deploy/samples/venicedb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ spec:
connector = venice
storeName = {{table}}
partial-update-mode = true
key.fields-prefix = KEY_
key.fields = {{keys}}
key.fields-prefix = {{keyPrefix:}}
key.fields = {{keys:KEY}}
key.type = {{keyType:PRIMITIVE}}
value.fields-include: EXCEPT_KEY
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class K8sConnector implements Connector<Source> {
@Override
public Map<String, String> configure(Source source) throws SQLException {
Template.Environment env =
Template.Environment.EMPTY.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
.with("database", source.database())
.with("table", source.table())
.with(source.options());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class K8sJobDeployer extends K8sYamlDeployer<Job> {
@Override
public List<String> specify(Job job) throws SQLException {
Function<SqlDialect, String> sql = job.sql();
Template.Environment env = Template.Environment.EMPTY.with("name",
Template.Environment env = new Template.SimpleEnvironment().with("name",
job.sink().database() + "-" + job.sink().table().toLowerCase(Locale.ROOT))
.with("database", job.sink().database())
.with("schema", job.sink().schema())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class K8sSourceDeployer extends K8sYamlDeployer<Source> {
@Override
public List<String> specify(Source source) throws SQLException {
Template.Environment env =
Template.Environment.EMPTY.with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
new Template.SimpleEnvironment().with("name", source.database() + "-" + source.table().toLowerCase(Locale.ROOT))
.with("database", source.database())
.with("schema", source.schema())
.with("table", source.table())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public SimpleTemplate(String template) {
public String render(Environment env) {
StringBuffer sb = new StringBuffer();
Pattern p =
Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]+))?\\s*((\\w+\\s*)*)\\s*\\}\\}");
Pattern.compile("([\\s\\-\\#]*)\\{\\{\\s*([\\w_\\-\\.]+)\\s*(:([\\w_\\-\\.]*))?\\s*((\\w+\\s*)*)\\s*\\}\\}");
Matcher m = p.matcher(template);
while (m.find()) {
String prefix = m.group(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.calcite.util.Litmus;
import org.apache.calcite.util.Pair;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;

import com.linkedin.hoptimator.Deployable;
Expand All @@ -44,6 +45,8 @@ public interface PipelineRel extends RelNode {

Convention CONVENTION = new Convention.Impl("PIPELINE", PipelineRel.class);
String KEY_OPTION = "keys";
String KEY_PREFIX_OPTION = "keyPrefix";
String KEY_TYPE_OPTION = "keyType";
String KEY_PREFIX = "KEY_";

void implement(Implementor implementor) throws SQLException;
Expand Down Expand Up @@ -95,7 +98,8 @@ public void setSink(String database, List<String> path, RelDataType rowType, Map
this.sinkOptions = addKeysAsOption(options, rowType);
}

private Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
@VisibleForTesting
static Map<String, String> addKeysAsOption(Map<String, String> options, RelDataType rowType) {
Map<String, String> newOptions = new LinkedHashMap<>(options);

RelDataType flattened = DataTypeUtils.flatten(rowType, new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT));
Expand All @@ -104,12 +108,15 @@ private Map<String, String> addKeysAsOption(Map<String, String> options, RelData
if (newOptions.containsKey(KEY_OPTION)) {
return newOptions;
}

String keyString = flattened.getFieldList().stream()
.map(x -> x.getName().replaceAll("\\$", "_"))
.filter(name -> name.startsWith(KEY_PREFIX))
.collect(Collectors.joining(";"));
if (!keyString.isEmpty()) {
newOptions.put(KEY_OPTION, keyString);
newOptions.put(KEY_PREFIX_OPTION, KEY_PREFIX);
newOptions.put(KEY_TYPE_OPTION, "RECORD");
}
return newOptions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.hoptimator.util;

import java.util.Arrays;
import java.util.List;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class TestTemplate {

@Test
public void testRender() {
Template.Environment env = new Template.SimpleEnvironment()
.with("name", "name")
.with("nameUpper", "name")
.with("nameLower", "NAME")
.with("multiline", "1\n2\n3\n")
.with("multilineUpper", "a\nb\nc\n")
.with("other", "test");

String template = "{{keys:KEY}}\n"
+ "{{keyPrefix:}}\n"
+ "{{name:default}}\n"
+ "{{nameUpper toUpperCase}}\n"
+ "{{nameLower toLowerCase}}\n"
+ "{{multiline concat}}\n"
+ "{{multilineUpper concat toUpperCase}}\n"
+ "{{other unknown}}\n";

String renderedTemplate = new Template.SimpleTemplate(template).render(env);
List<String> renderedTemplates = Arrays.asList(renderedTemplate.split("\n"));
assertEquals(8, renderedTemplates.size());
assertEquals("KEY", renderedTemplates.get(0));
assertEquals("", renderedTemplates.get(1));
assertEquals("name", renderedTemplates.get(2));
assertEquals("NAME", renderedTemplates.get(3));
assertEquals("name", renderedTemplates.get(4));
assertEquals("123", renderedTemplates.get(5));
assertEquals("ABC", renderedTemplates.get(6));
assertEquals("test", renderedTemplates.get(7));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.linkedin.hoptimator.util.planner;

import java.util.HashMap;
import java.util.Map;

import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.jupiter.api.Test;

import static com.linkedin.hoptimator.util.planner.PipelineRel.Implementor.addKeysAsOption;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TestPipelineRel {

@Test
public void testKeyOptions() {
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RelDataTypeFactory.Builder primitiveKeyBuilder = new RelDataTypeFactory.Builder(typeFactory);
primitiveKeyBuilder.add("KEY", SqlTypeName.VARCHAR);
primitiveKeyBuilder.add("intField", SqlTypeName.INTEGER);
Map<String, String> keyOptions = addKeysAsOption(new HashMap<>(), primitiveKeyBuilder.build());
assertTrue(keyOptions.isEmpty());

RelDataTypeFactory.Builder keyBuilder = new RelDataTypeFactory.Builder(typeFactory);
keyBuilder.add("keyInt", SqlTypeName.INTEGER);
keyBuilder.add("keyString", SqlTypeName.VARCHAR);
RelDataTypeFactory.Builder recordBuilder = new RelDataTypeFactory.Builder(typeFactory);
recordBuilder.add("intField", SqlTypeName.INTEGER);
recordBuilder.add("KEY", keyBuilder.build());
keyOptions = addKeysAsOption(new HashMap<>(), recordBuilder.build());
assertEquals(3, keyOptions.size());
assertEquals("KEY_keyInt;KEY_keyString", keyOptions.get("keys"));
assertEquals("KEY_", keyOptions.get("keyPrefix"));
assertEquals("RECORD", keyOptions.get("keyType"));
}
}
Loading