diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index ab1bf1be36926d..be11cbe73403e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -678,7 +678,7 @@ public void processAlterTable(AlterTableCommand command) throws UserException { DynamicPartitionUtil.checkAlterAllowed( (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP)); } - Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionOp) alterOp, false, 0, true); + Env.getCurrentEnv().addPartition(db, tableName, (AddPartitionOp) alterOp, false, 0, true, null); } else if (alterOp instanceof AddPartitionLikeOp) { if (!((AddPartitionLikeOp) alterOp).getTempPartition()) { DynamicPartitionUtil.checkAlterAllowed( diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 55d7b9bd4c6291..a291899553ac0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3479,14 +3479,17 @@ public boolean createTable(CreateTableInfo createTableInfo) throws UserException * @param isCreateTable this call is for creating table * @param generatedPartitionId the preset partition id for the partition to add * @param writeEditLog whether to write an edit log for this addition - * @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added. + * @batchPartitions output parameter, used to batch write edit log outside this function, can be null. + * first is editlog PartitionPersistInfo, second is the added Partition * @throws DdlException */ - public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionOp addPartitionOp, + public void addPartition(Database db, String tableName, AddPartitionOp addPartitionOp, boolean isCreateTable, long generatedPartitionId, - boolean writeEditLog) throws DdlException { - return getInternalCatalog().addPartition(db, tableName, addPartitionOp, - isCreateTable, generatedPartitionId, writeEditLog); + boolean writeEditLog, + List> batchPartitions) + throws DdlException { + getInternalCatalog().addPartition(db, tableName, addPartitionOp, + isCreateTable, generatedPartitionId, writeEditLog, batchPartitions); } public void addMultiPartitions(Database db, String tableName, AlterMultiPartitionOp multiPartitionOp) diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java index f77ec517667e82..495e8f9cdece74 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java @@ -774,7 +774,7 @@ public void executeDynamicPartition(Collection> dynamicPartitio cloudBatchBeforeCreatePartitions(executeFirstTime, addPartitionOps, olapTable, indexIds, db, tableName, generatedPartitionIds); - List partsInfo = new ArrayList<>(); + List> batchPartsInfo = new ArrayList<>(); for (int i = 0; i < addPartitionOps.size(); i++) { try { boolean needWriteEditLog = true; @@ -783,15 +783,10 @@ public void executeDynamicPartition(Collection> dynamicPartitio if (Config.isCloudMode()) { needWriteEditLog = !executeFirstTime; } - PartitionPersistInfo info = - Env.getCurrentEnv().addPartition(db, tableName, addPartitionOps.get(i), + Env.getCurrentEnv().addPartition(db, tableName, addPartitionOps.get(i), executeFirstTime, executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0, - needWriteEditLog); - if (info == null) { - throw new Exception("null persisted partition returned"); - } - partsInfo.add(info); + needWriteEditLog, batchPartsInfo); clearCreatePartitionFailedMsg(olapTable.getId()); } catch (Exception e) { recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId()); @@ -802,7 +797,7 @@ public void executeDynamicPartition(Collection> dynamicPartitio } } } - cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo, + cloudBatchAfterCreatePartitions(executeFirstTime, batchPartsInfo, addPartitionOps, db, olapTable, indexIds, tableName); // ATTN: Breaking up dynamic partition table scheduling, consuming peak CPU consumption @@ -822,15 +817,16 @@ public void executeDynamicPartition(Collection> dynamicPartitio } } - private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List partsInfo, - ArrayList addPartitionOps, Database db, - OlapTable olapTable, List indexIds, - String tableName) throws DdlException { + private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, + List> batchPartsInfo, + ArrayList addPartitionOps, Database db, + OlapTable olapTable, List indexIds, + String tableName) throws DdlException { if (Config.isNotCloudMode()) { return; } - List succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo - -> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList()); + List succeedPartitionIds = batchPartsInfo.stream().map(partitionInfo + -> partitionInfo.first.getPartition().getId()).collect(Collectors.toList()); if (!executeFirstTime || addPartitionOps.isEmpty()) { LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, addPartitionOps.size()); return; @@ -847,7 +843,7 @@ private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List, DdlException> getCurrentPartitionFutur } } - public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionOp addPartitionOp, + public void addPartition(Database db, String tableName, AddPartitionOp addPartitionOp, boolean isCreateTable, long generatedPartitionId, - boolean writeEditLog) throws DdlException { + boolean writeEditLog, + List> batchPartitions) + throws DdlException { // in cloud mode, isCreateTable == true, create dynamic partition use, so partitionId must have been generated. // isCreateTable == false, other case, partitionId generate in below, must be set 0 if (!FeConstants.runningUnitTest && Config.isCloudMode() @@ -1474,7 +1476,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti if (singlePartitionDesc.isSetIfNotExists()) { LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (!DebugPointUtil.isEnable("InternalCatalog.addPartition.noCheckExists")) { - return null; + return; } } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); @@ -1641,7 +1643,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti db, tableName, olapTable, partitionName, singlePartitionDesc); if (ownerFutureOr.isErr()) { if (ownerFutureOr.unwrapErr() == null) { - return null; + return; } else { throw ownerFutureOr.unwrapErr(); } @@ -1697,7 +1699,7 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti LOG.info("table[{}] add partition[{}] which already exists", olapTable.getName(), partitionName); if (singlePartitionDesc.isSetIfNotExists()) { failedCleanCallback.run(); - return null; + return; } else { ErrorReport.reportDdlException(ErrorCode.ERR_SAME_NAME_PARTITION, partitionName); } @@ -1755,12 +1757,6 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti // update partition info partitionInfo.handleNewSinglePartitionDesc(singlePartitionDesc, partitionId, isTempPartition); - if (isTempPartition) { - olapTable.addTempPartition(partition); - } else { - olapTable.addPartition(partition); - } - // log PartitionPersistInfo info = null; if (partitionInfo.getType() == PartitionType.RANGE) { @@ -1786,11 +1782,16 @@ public PartitionPersistInfo addPartition(Database db, String tableName, AddParti } if (writeEditLog) { Env.getCurrentEnv().getEditLog().logAddPartition(info); + if (isTempPartition) { + olapTable.addTempPartition(partition); + } else { + olapTable.addPartition(partition); + } LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition); } else { + batchPartitions.add(Pair.of(info, partition)); LOG.info("postpone creating partition[{}], temp: {}", partitionId, isTempPartition); } - return info; } finally { olapTable.writeUnlock(); } @@ -1837,7 +1838,7 @@ public void addMultiPartitions(Database db, String tableName, AlterMultiPartitio AddPartitionOp addPartitionOp = new AddPartitionOp( singlePartitionDesc.translateToPartitionDefinition(), null, multiPartitionOp.getProperties(), false); - addPartition(db, tableName, addPartitionOp, false, 0, true); + addPartition(db, tableName, addPartitionOp, false, 0, true, null); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java index eee02b62a98d06..924f51ce06463f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPartitionUtil.java @@ -405,7 +405,7 @@ public static void addPartition(MTMV mtmv, PartitionKeyDesc oldPartitionKeyDesc) mtmv.getDefaultDistributionInfo().toDistributionDesc().toDistributionDescriptor(), partitionProperties, false); Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause, - false, 0, true); + false, 0, true, null); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 0102d1a32a710b..4f126fcf21422c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -1666,7 +1666,21 @@ public void logRefreshExternalTableSchema(RefreshExternalTableInfo info) { } public long logAddPartition(PartitionPersistInfo info) { + if (DebugPointUtil.isEnable("FE.logAddPartition.slow")) { + DebugPointUtil.DebugPoint debugPoint = DebugPointUtil.getDebugPoint("FE.logAddPartition.slow"); + String pName = debugPoint.param("pName", ""); + if (info.getPartition().getName().equals(pName)) { + int sleepMs = debugPoint.param("sleep", 1000); + LOG.info("logAddPartition debug point hit, pName {}, sleep {} s", pName, sleepMs); + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + LOG.warn("sleep interrupted", e); + } + } + } long logId = logEdit(OperationType.OP_ADD_PARTITION, info); + LOG.info("log add partition, logId:{}, info: {}", logId, info.toJson()); AddPartitionRecord record = new AddPartitionRecord(logId, info); Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record); return logId; @@ -1674,6 +1688,7 @@ public long logAddPartition(PartitionPersistInfo info) { public long logDropPartition(DropPartitionInfo info) { long logId = logEdit(OperationType.OP_DROP_PARTITION, info); + LOG.info("log drop partition, logId:{}, info: {}", logId, info.toJson()); Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info, logId); return logId; } @@ -1684,6 +1699,7 @@ public void logErasePartition(long partitionId) { public void logRecoverPartition(RecoverInfo info) { long logId = logEdit(OperationType.OP_RECOVER_PARTITION, info); + LOG.info("log recover partition, logId:{}, info: {}", logId, info.toJson()); Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId); } @@ -1702,6 +1718,7 @@ public void logBatchModifyPartition(BatchModifyPartitionsInfo info) { public void logDropTable(DropInfo info) { long logId = logEdit(OperationType.OP_DROP_TABLE, info); + LOG.info("log drop table, logId : {}, infos: {}", logId, info); if (Strings.isNullOrEmpty(info.getCtl()) || info.getCtl().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) { DropTableRecord record = new DropTableRecord(logId, info); Env.getCurrentEnv().getBinlogManager().addDropTableRecord(record); @@ -1714,11 +1731,13 @@ public void logEraseTable(long tableId) { public void logRecoverTable(RecoverInfo info) { long logId = logEdit(OperationType.OP_RECOVER_TABLE, info); + LOG.info("log recover table, logId : {}, infos: {}", logId, info); Env.getCurrentEnv().getBinlogManager().addRecoverTableRecord(info, logId); } public void logDropRollup(DropInfo info) { long logId = logEdit(OperationType.OP_DROP_ROLLUP, info); + LOG.info("log drop rollup, logId : {}, infos: {}", logId, info); Env.getCurrentEnv().getBinlogManager().addDropRollup(info, logId); } @@ -1835,7 +1854,8 @@ public void logDropRole(PrivInfo info) { } public void logDatabaseRename(DatabaseInfo databaseInfo) { - logEdit(OperationType.OP_RENAME_DB, databaseInfo); + long logId = logEdit(OperationType.OP_RENAME_DB, databaseInfo); + LOG.info("log database rename, logId : {}, infos: {}", logId, databaseInfo); } public void logTableRename(TableInfo tableInfo) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 3a1f56c7672984..1720a27001a377 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3631,7 +3631,7 @@ public TCreatePartitionResult createPartition(TCreatePartitionRequest request) t for (AddPartitionOp addPartitionOp : addPartitionClauseMap.values()) { try { // here maybe check and limit created partitions num - Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionOp, false, 0, true); + Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionOp, false, 0, true, null); } catch (DdlException e) { LOG.warn(e); errorStatus.setErrorMsgs( diff --git a/regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy b/regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy similarity index 99% rename from regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy rename to regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy index 682a8fff9adaca..9433c97e130672 100644 --- a/regression-test/suites/cloud_p0/diffrent_serialize/diffrent_serialize.groovy +++ b/regression-test/suites/cloud_p0/different_serialize/different_serialize.groovy @@ -17,7 +17,7 @@ import org.codehaus.groovy.runtime.IOGroovyMethods -suite ("diffrent_serialize_cloud") { +suite ("different_serialize_cloud") { sql """ DROP TABLE IF EXISTS d_table; """ diff --git a/regression-test/suites/cloud_p0/partition/test_create_partition_and_insert_overwrite_race.groovy b/regression-test/suites/cloud_p0/partition/test_create_partition_and_insert_overwrite_race.groovy new file mode 100644 index 00000000000000..58c259db816b52 --- /dev/null +++ b/regression-test/suites/cloud_p0/partition/test_create_partition_and_insert_overwrite_race.groovy @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import com.mysql.cj.jdbc.StatementImpl +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.doris.regression.util.NodeType +import org.apache.doris.regression.suite.SuiteCluster + +suite("test_create_partition_and_insert_overwrite_race", 'p0, docker') { + if (!isCloudMode()) { + return + } + def options = new ClusterOptions() + options.enableDebugPoints() + // one master, one observer + options.setFeNum(2) + options.feConfigs.add('sys_log_verbose_modules=org') + options.setBeNum(3) + options.cloudMode = true + + // 1. connect to observer + options.connectToFollower = true + docker(options) { + sql """set enable_sql_cache=false""" + def tbl = 'test_create_partition_and_insert_overwrite_race_tbl' + def tbl2 = 'test_create_partition_and_insert_overwrite_race_tbl2' + def createTableSql = { String tableName -> + sql """ + CREATE TABLE ${tableName} ( + order_id BIGINT, + create_dt datetime, + username VARCHAR(20) + ) + DUPLICATE KEY(order_id) + PARTITION BY RANGE(create_dt) () + DISTRIBUTED BY HASH(order_id) BUCKETS 10 + PROPERTIES( + "dynamic_partition.enable" = "true", + "dynamic_partition.time_unit" = "DAY", + "dynamic_partition.start" = "-5", + "dynamic_partition.end" = "5", + "dynamic_partition.prefix" = "p", + "dynamic_partition.create_history_partition" = "true" + ); + """ + } + + createTableSql(tbl) + createTableSql(tbl2) + + // Generate insert statements with dates: current date -2, -1, 0, +1, +2 days + def now = new Date() + def dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd") + for (def i = -2; i <= 2; i++) { + def targetDate = new Date(now.time + i * 24 * 60 * 60 * 1000L) + def dateStr = dateFormat.format(targetDate) + def hour = String.format("%02d", Math.abs(i) + 1) + def insertDate = "${dateStr} ${hour}:00:00" + sql """insert into ${tbl2} values (${i + 3}, '${insertDate}', 'test')""" + } + + sql """DROP TABLE ${tbl}""" + def partitionNameFormat = new java.text.SimpleDateFormat("yyyyMMdd") + def currentPartitionName = "p" + partitionNameFormat.format(now) + cluster.injectDebugPoints(NodeType.FE, ['FE.logAddPartition.slow':[pName:currentPartitionName, sleep:50 * 1000]]) + def futrue = thread { + for (def i = 0; i < 55; i++) { + try_sql """INSERT OVERWRITE TABLE ${tbl} partition(*) select * from ${tbl2}""" + sleep(1 * 1000) + cluster.checkFeIsAlive(2, true) + } + } + def future1 = thread { + createTableSql(tbl) + } + futrue.get() + future1.get() + } +}