Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support modify 'fast_schema_evolution' property for newly created tables #42894

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/alter/AlterJobMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,8 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_SIZE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_FOREIGN_KEY_CONSTRAINT) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_UNIQUE_CONSTRAINT) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC));
properties.containsKey(PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_USE_FAST_SCHEMA_EVOLUTION));

olapTable = (OlapTable) db.getTable(tableName);
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_INMEMORY)) {
Expand All @@ -717,6 +718,15 @@ public void processAlterTable(AlterTableStmt stmt) throws UserException {
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_PRIMARY_INDEX_CACHE_EXPIRE_SEC)) {
schemaChangeHandler.updateTableMeta(db, tableName, properties,
TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_USE_FAST_SCHEMA_EVOLUTION)) {
if (!olapTable.canUseFastSchemaEvolution()) {
String s = "table: " + olapTable.getName() + " maybe created in old version and does not support"
+ " modify meta: FAST_SCHEMA_EVOLUTION";
LOG.warn(s);
throw new DdlException(s);
}
schemaChangeHandler.updateTableMeta(db, tableName, properties,
TTabletMetaType.FAST_SCHEMA_EVOLUTION);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_ENABLE) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_TTL) ||
properties.containsKey(PropertyAnalyzer.PROPERTIES_BINLOG_MAX_SIZE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,10 @@ private boolean processAddColumns(AddColumnsClause alterClause, OlapTable olapTa
}
}

boolean ligthSchemaChange = olapTable.getUseFastSchemaEvolution();
boolean fastSchemaEvolution = olapTable.enableFastSchemaEvolution();
if (alterClause.getGeneratedColumnPos() == null) {
for (Column column : columns) {
ligthSchemaChange &= addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap,
fastSchemaEvolution &= addColumnInternal(olapTable, column, null, targetIndexId, baseIndexId, indexSchemaMap,
newColNameSet);
}
} else {
Expand All @@ -251,10 +251,10 @@ private boolean processAddColumns(AddColumnsClause alterClause, OlapTable olapTa
addColumnInternal(olapTable, column, alterClause.getGeneratedColumnPos(),
targetIndexId, baseIndexId, indexSchemaMap, newColNameSet);
// add a generated column need to rewrite data, can not use light schema change
ligthSchemaChange = false;
fastSchemaEvolution = false;
}
}
return ligthSchemaChange;
return fastSchemaEvolution;
}

/**
Expand All @@ -267,7 +267,7 @@ private boolean processAddColumns(AddColumnsClause alterClause, OlapTable olapTa
*/
private boolean processDropColumn(DropColumnClause alterClause, OlapTable olapTable,
Map<Long, LinkedList<Column>> indexSchemaMap, List<Index> indexes) throws DdlException {
boolean fastSchemaEvolution = olapTable.getUseFastSchemaEvolution();
boolean fastSchemaEvolution = olapTable.enableFastSchemaEvolution();
String dropColName = alterClause.getColName();
String targetIndexName = alterClause.getRollupName();
checkIndexExists(olapTable, targetIndexName);
Expand Down Expand Up @@ -795,7 +795,7 @@ private boolean addColumnInternal(OlapTable olapTable, Column newColumn, ColumnP
throw new DdlException("unsupported default expr:" + newColumn.getDefaultExpr().getExpr());
}

boolean fastSchemaEvolution = olapTable.getUseFastSchemaEvolution();
boolean fastSchemaEvolution = olapTable.enableFastSchemaEvolution();
// if column is generated column, need to rewrite table data, so we can not use light schema change
if (newColumn.isAutoIncrement() || newColumn.isGeneratedColumn()) {
fastSchemaEvolution = false;
Expand Down Expand Up @@ -1512,8 +1512,7 @@ public AlterJobV2 analyzeAndCreateJob(List<AlterClause> alterClauses, Database d
if (olapTable == null) {
throw new DdlException("olapTable is null");
}
boolean fastSchemaEvolution = olapTable.getUseFastSchemaEvolution();
//for multi add colmuns clauses
boolean fastSchemaEvolution = olapTable.enableFastSchemaEvolution();
IntSupplier colUniqueIdSupplier = new IntSupplier() {
private int pendingMaxColUniqueId = olapTable.getMaxColUniqueId();

Expand Down Expand Up @@ -1912,6 +1911,11 @@ public void updateTableMeta(Database db, String tableName, Map<String, String> p
if (primaryIndexCacheExpireSec == olapTable.primaryIndexCacheExpireSec()) {
return;
}
} else if (metaType == TTabletMetaType.FAST_SCHEMA_EVOLUTION) {
metaValue = Boolean.parseBoolean(properties.get(PropertyAnalyzer.PROPERTIES_USE_FAST_SCHEMA_EVOLUTION));
if (metaValue == olapTable.getUseFastSchemaEvolution()) {
return;
}
} else {
LOG.warn("meta type: {} does not support", metaType);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public enum RefreshMoment {
}

@Override
public boolean getUseFastSchemaEvolution() {
public boolean enableFastSchemaEvolution() {
return false;
}

Expand Down
15 changes: 14 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2806,8 +2806,21 @@ public void setForeignKeyConstraints(List<ForeignKeyConstraint> foreignKeyConstr
tableProperty.setForeignKeyConstraints(foreignKeyConstraints);
}

public boolean canUseFastSchemaEvolution() {
return maxColUniqueId > Column.COLUMN_UNIQUE_ID_INIT_VALUE;
}

public boolean getUseFastSchemaEvolution() {
if (hasRowStorageType()) {
if (tableProperty != null) {
return tableProperty.getUseFastSchemaEvolution();
}
return false;
}

// if table can use fast schema evolution and `fast_schema_evolution` in property is true,
// enableFastSchemaEvolution return true
public boolean enableFastSchemaEvolution() {
if (!canUseFastSchemaEvolution() || hasRowStorageType()) {
// row storage type does not support fast schema evolution currently
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@ public void readFields(DataInput in) throws IOException {
case OperationType.OP_MODIFY_ENABLE_PERSISTENT_INDEX:
case OperationType.OP_MODIFY_PRIMARY_INDEX_CACHE_EXPIRE_SEC:
case OperationType.OP_ALTER_TABLE_PROPERTIES:
case OperationType.OP_MODIFY_TABLE_CONSTRAINT_PROPERTY: {
case OperationType.OP_MODIFY_TABLE_CONSTRAINT_PROPERTY:
case OperationType.OP_MODIFY_FAST_SCHEMA_EVOLUTION: {
data = ModifyTablePropertyOperationLog.read(in);
isRead = true;
break;
Expand Down
2 changes: 1 addition & 1 deletion fe/fe-core/src/main/java/com/starrocks/lake/LakeTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void gsonPostProcess() throws IOException {
}

@Override
public boolean getUseFastSchemaEvolution() {
public boolean enableFastSchemaEvolution() {
return !hasRowStorageType() && Config.enable_fast_schema_evolution_in_share_data_mode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,8 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
case OperationType.OP_MODIFY_ENABLE_PERSISTENT_INDEX:
case OperationType.OP_MODIFY_PRIMARY_INDEX_CACHE_EXPIRE_SEC:
case OperationType.OP_ALTER_TABLE_PROPERTIES:
case OperationType.OP_MODIFY_TABLE_CONSTRAINT_PROPERTY: {
case OperationType.OP_MODIFY_TABLE_CONSTRAINT_PROPERTY:
case OperationType.OP_MODIFY_FAST_SCHEMA_EVOLUTION: {
ModifyTablePropertyOperationLog modifyTablePropertyOperationLog =
(ModifyTablePropertyOperationLog) journal.getData();
globalStateMgr.getLocalMetastore().replayModifyTableProperty(opCode, modifyTablePropertyOperationLog);
Expand Down Expand Up @@ -1720,6 +1721,10 @@ public void logModifyPrimaryIndexCacheExpireSec(ModifyTablePropertyOperationLog
logEdit(OperationType.OP_MODIFY_PRIMARY_INDEX_CACHE_EXPIRE_SEC, info);
}

public void logModifyFastSchemaEvolution(ModifyTablePropertyOperationLog info) {
logEdit(OperationType.OP_MODIFY_FAST_SCHEMA_EVOLUTION, info);
}

public void logModifyWriteQuorum(ModifyTablePropertyOperationLog info) {
logEdit(OperationType.OP_MODIFY_WRITE_QUORUM, info);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,8 @@ public class OperationType {
public static final short OP_DISABLE_TABLE_RECOVERY = 13510;
@IgnorableOnReplayFailed
public static final short OP_DISABLE_PARTITION_RECOVERY = 13511;
@IgnorableOnReplayFailed
public static final short OP_MODIFY_FAST_SCHEMA_EVOLUTION = 13512;

/**
* NOTICE: OperationType cannot use a value exceeding 20000, and an error will be reported if it exceeds
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4659,6 +4659,22 @@ public void modifyTablePrimaryIndexCacheExpireSec(Database db, OlapTable table,
GlobalStateMgr.getCurrentState().getEditLog().logModifyPrimaryIndexCacheExpireSec(info);
}

public void modifyTableFastSchemaEvolution(Database db, OlapTable table, Map<String, String> properties) {
Locker locker = new Locker();
Preconditions.checkArgument(locker.isDbWriteLockHeldByCurrentThread(db));
TableProperty tableProperty = table.getTableProperty();
if (tableProperty == null) {
tableProperty = new TableProperty(properties);
table.setTableProperty(tableProperty);
} else {
tableProperty.modifyTableProperties(properties);
}
tableProperty.buildUseFastSchemaEvolution();
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(),
properties);
GlobalStateMgr.getCurrentState().getEditLog().logModifyFastSchemaEvolution(info);
}

public void modifyTableMeta(Database db, OlapTable table, Map<String, String> properties,
TTabletMetaType metaType) {
if (metaType == TTabletMetaType.INMEMORY) {
Expand All @@ -4673,6 +4689,8 @@ public void modifyTableMeta(Database db, OlapTable table, Map<String, String> pr
modifyTableAutomaticBucketSize(db, table, properties);
} else if (metaType == TTabletMetaType.PRIMARY_INDEX_CACHE_EXPIRE_SEC) {
modifyTablePrimaryIndexCacheExpireSec(db, table, properties);
} else if (metaType == TTabletMetaType.FAST_SCHEMA_EVOLUTION) {
modifyTableFastSchemaEvolution(db, table, properties);
}
}

Expand Down Expand Up @@ -4793,6 +4811,8 @@ public void replayModifyTableProperty(short opCode, ModifyTablePropertyOperation
if (!olapTable.isBinlogEnabled()) {
olapTable.clearBinlogAvailableVersion();
}
} else if (opCode == OperationType.OP_MODIFY_FAST_SCHEMA_EVOLUTION) {
olapTable.setUseFastSchemaEvolution(tableProperty.getUseFastSchemaEvolution());
}
}
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,16 +265,15 @@ public Table createTable(LocalMetastore metastore, Database db, CreateTableStmt
throw new DdlException(e.getMessage());
}
table.setUseFastSchemaEvolution(useFastSchemaEvolution);

List<Integer> sortKeyUniqueIds = new ArrayList<>();
for (Column column : baseSchema) {
column.setUniqueId(table.incAndGetMaxColUniqueId());
LOG.debug("table: {}, newColumn: {}, uniqueId: {}", table.getName(), column.getName(),
column.getUniqueId());
}
List<Integer> sortKeyUniqueIds = new ArrayList<>();
if (useFastSchemaEvolution) {
for (Integer idx : sortKeyIdxes) {
sortKeyUniqueIds.add(baseSchema.get(idx).getUniqueId());
}
} else {
LOG.debug("table: {} doesn't use light schema change", table.getName());
for (Integer idx : sortKeyIdxes) {
sortKeyUniqueIds.add(baseSchema.get(idx).getUniqueId());
}

// analyze bloom filter columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,15 @@ public Void visitModifyTablePropertiesClause(ModifyTablePropertiesClause clause,
ErrorReport.reportSemanticException(ErrorCode.ERR_COMMON_ERROR, e.getMessage());
}
clause.setNeedTableStable(false);
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_USE_FAST_SCHEMA_EVOLUTION)) {
if (!properties.get(PropertyAnalyzer.PROPERTIES_USE_FAST_SCHEMA_EVOLUTION).equalsIgnoreCase("true") &&
!properties.get(PropertyAnalyzer.PROPERTIES_USE_FAST_SCHEMA_EVOLUTION).equalsIgnoreCase("false")) {
ErrorReport.reportSemanticException(ErrorCode.ERR_COMMON_ERROR,
"Property " + PropertyAnalyzer.PROPERTIES_USE_FAST_SCHEMA_EVOLUTION +
" must be bool type(false/true)");
}
clause.setNeedTableStable(false);
clause.setOpType(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC);
} else {
ErrorReport.reportSemanticException(ErrorCode.ERR_COMMON_ERROR, "Unknown properties: " + properties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ protected void runBeforeAll() throws Exception {

createTable(createPKTblStmtStr);

String createDupTbl3StmtStr = "CREATE TABLE IF NOT EXISTS test.sc_dup3 (\n" + "timestamp DATETIME,\n"
+ "type INT,\n" + "error_code INT,\n" + "error_msg VARCHAR(1024),\n" + "op_id BIGINT,\n"
+ "op_time DATETIME)\n" + "DUPLICATE KEY(timestamp, type)\n" + "DISTRIBUTED BY HASH(type) BUCKETS 1\n"
+ "PROPERTIES ('replication_num' = '1', 'fast_schema_evolution' = 'true');";

createTable(createDupTbl3StmtStr);

}

private void waitAlterJobDone(Map<Long, AlterJobV2> alterJobs) throws Exception {
Expand Down Expand Up @@ -437,4 +444,66 @@ public void testSetPrimaryIndexCacheExpireSec() throws Exception {
Assert.assertTrue(e.getMessage().contains("Property primary_index_cache_expire_sec must be integer"));
}
}

@Test
public void testModifyTableFastSchemaEvolution() throws Exception {
LOG.info("dbName: {}", GlobalStateMgr.getCurrentState().getLocalMetastore().listDbNames());

Database db = GlobalStateMgr.getCurrentState().getDb("test");
OlapTable tbl = (OlapTable) db.getTable("sc_dup3");
Locker locker = new Locker();
locker.lockDatabase(db, LockType.READ);
try {
Assertions.assertNotNull(tbl);
System.out.println(tbl.getName());
Assertions.assertEquals("StarRocks", tbl.getEngine());
Assertions.assertEquals(6, tbl.getBaseSchema().size());
} finally {
locker.unLockDatabase(db, LockType.READ);
}

Assertions.assertTrue(tbl.canUseFastSchemaEvolution());
Assertions.assertTrue(tbl.enableFastSchemaEvolution());
// process set property
String modifyStmtStr = "alter table test.sc_dup3 set ('fast_schema_evolution' = 'false');";
AlterTableStmt modifyStmt = (AlterTableStmt) parseAndAnalyzeStmt(modifyStmtStr);
GlobalStateMgr.getCurrentState().getAlterJobMgr().processAlterTable(modifyStmt);
Assertions.assertTrue(tbl.canUseFastSchemaEvolution());
Assertions.assertFalse(tbl.enableFastSchemaEvolution());

int maxColUniqueId = tbl.getMaxColUniqueId();
tbl.setMaxColUniqueId(-1);

Assertions.assertFalse(tbl.canUseFastSchemaEvolution());
Assertions.assertFalse(tbl.enableFastSchemaEvolution());
try {
String modifyStmtStr2 = "alter table test.sc_dup3 set ('fast_schema_evolution' = 'true');";
AlterTableStmt modifyStmt2 = (AlterTableStmt) parseAndAnalyzeStmt(modifyStmtStr2);
GlobalStateMgr.getCurrentState().getAlterJobMgr().processAlterTable(modifyStmt);
} catch (Exception e) {
LOG.warn(e.getMessage());
Assert.assertTrue(e.getMessage().contains("not support modify meta: FAST_SCHEMA_EVOLUTION"));
}

tbl.setMaxColUniqueId(maxColUniqueId);

String addValColStmtStr = "alter table test.sc_dup3 add column new_v1 int default '0'";
AlterTableStmt addValColStmt = (AlterTableStmt) parseAndAnalyzeStmt(addValColStmtStr);
GlobalStateMgr.getCurrentState().getAlterJobMgr().processAlterTable(addValColStmt);
jobSize++;
Map<Long, AlterJobV2> alterJobs = GlobalStateMgr.getCurrentState().getSchemaChangeHandler().getAlterJobsV2();

waitAlterJobDone(alterJobs);
locker.lockDatabase(db, LockType.READ);
try {
Assertions.assertEquals(7, tbl.getBaseSchema().size());
String baseIndexName = tbl.getIndexNameById(tbl.getBaseIndexId());
Assertions.assertEquals(baseIndexName, tbl.getName());
MaterializedIndexMeta indexMeta = tbl.getIndexMetaByIndexId(tbl.getBaseIndexId());
Assertions.assertNotNull(indexMeta);
Assertions.assertEquals(maxColUniqueId + 1, tbl.getMaxColUniqueId());
} finally {
locker.unLockDatabase(db, LockType.READ);
}
}
}
3 changes: 2 additions & 1 deletion gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ enum TTabletMetaType {
BINLOG_CONFIG,
BUCKET_SIZE,
PRIMARY_INDEX_CACHE_EXPIRE_SEC,
STORAGE_TYPE
STORAGE_TYPE,
FAST_SCHEMA_EVOLUTION
}

struct TTabletMetaInfo {
Expand Down
Loading
Loading