Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove datanode optimization #13559

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public enum CnToDnRequestType {

// Node Maintenance
DISABLE_DATA_NODE,
CLEAN_DATA_NODE_CACHE,
STOP_DATA_NODE,

FLUSH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.CnToDnRequestType;
import org.apache.iotdb.mpp.rpc.thrift.TCleanDataNodeCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
Expand Down Expand Up @@ -119,8 +119,8 @@ private Object executeSyncRequest(
return client.deleteRegion((TConsensusGroupId) req);
case INVALIDATE_PERMISSION_CACHE:
return client.invalidatePermissionCache((TInvalidatePermissionCacheReq) req);
case DISABLE_DATA_NODE:
return client.disableDataNode((TDisableDataNodeReq) req);
case CLEAN_DATA_NODE_CACHE:
return client.cleanDataNodeCache((TCleanDataNodeCacheReq) req);
case STOP_DATA_NODE:
return client.stopDataNode();
case SET_SYSTEM_STATUS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@
import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.env.RemoveDataNodeManager;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.model.CreateModelProcedure;
import org.apache.iotdb.confignode.procedure.impl.model.DropModelProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
Expand All @@ -75,6 +76,7 @@
import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrationPlan;
import org.apache.iotdb.confignode.procedure.impl.schema.AlterLogicalViewProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeactivateTemplateProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.DeleteDatabaseProcedure;
Expand Down Expand Up @@ -142,6 +144,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -577,17 +580,23 @@ public void removeConfigNode(RemoveConfigNodePlan removeConfigNodePlan) {
}

/**
* Generate {@link RemoveDataNodeProcedure}s, and serially execute all the {@link
* RemoveDataNodeProcedure}s.
* Generate {@link RemoveDataNodesProcedure}s, and serially execute all the {@link
* RemoveDataNodesProcedure}s.
*/
public boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
public synchronized boolean removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
Map<Integer, NodeStatus> nodeStatusMap = new HashMap<>();
removeDataNodePlan
.getDataNodeLocations()
.forEach(
tDataNodeLocation -> {
this.executor.submitProcedure(new RemoveDataNodeProcedure(tDataNodeLocation));
LOGGER.info("Submit RemoveDataNodeProcedure successfully, {}", tDataNodeLocation);
});
datanode ->
nodeStatusMap.put(
datanode.getDataNodeId(),
configManager.getLoadManager().getNodeStatus(datanode.getDataNodeId())));
this.executor.submitProcedure(
new RemoveDataNodesProcedure(removeDataNodePlan.getDataNodeLocations(), nodeStatusMap));
LOGGER.info(
"Submit RemoveDataNodeProcedure successfully, {}",
removeDataNodePlan.getDataNodeLocations());
return true;
}

Expand All @@ -598,14 +607,109 @@ public boolean removeAINode(RemoveAINodePlan removeAINodePlan) {
return true;
}

// region region migration
public TSStatus checkRemoveDataNodes(List<TDataNodeLocation> dataNodeLocations) {
// 1. Only one RemoveDataNodesProcedure is allowed in the cluster
Optional<Procedure<ConfigNodeProcedureEnv>> anotherRemoveProcedure =
this.executor.getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RemoveDataNodesProcedure) {
return !procedure.isFinished();
}
return false;
})
.findAny();

String failMessage = null;
if (anotherRemoveProcedure.isPresent()) {
List<TDataNodeLocation> anotherRemoveDataNodes =
((RemoveDataNodesProcedure) anotherRemoveProcedure.get()).getRemovedDataNodes();
failMessage =
String.format(
"Submit RemoveDataNodesProcedure failed, "
+ "because another RemoveDataNodesProcedure %s is already in processing. "
+ "IoTDB is able to have at most 1 RemoveDataNodesProcedure at the same time. "
+ "For further information, please search [pid%d] in log. ",
anotherRemoveDataNodes, anotherRemoveProcedure.get().getProcId());
}

// 2. Check if the RemoveDataNodesProcedure conflicts with the RegionMigrateProcedure
RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
Set<TConsensusGroupId> removedDataNodesRegionSet =
manager.getRemovedDataNodesRegionSet(dataNodeLocations);
Optional<Procedure<ConfigNodeProcedureEnv>> conflictRegionMigrateProcedure =
this.executor.getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RegionMigrateProcedure) {
RegionMigrateProcedure regionMigrateProcedure =
(RegionMigrateProcedure) procedure;
if (regionMigrateProcedure.isFinished()) {
return false;
}
return removedDataNodesRegionSet.contains(
regionMigrateProcedure.getConsensusGroupId())
|| dataNodeLocations.contains(regionMigrateProcedure.getDestDataNode());
}
return false;
})
.findAny();
if (conflictRegionMigrateProcedure.isPresent()) {
failMessage =
String.format(
"Submit RemoveDataNodesProcedure failed, "
+ "because another RegionMigrateProcedure %s is already in processing which conflicts with this RemoveDataNodesProcedure. "
+ "The RegionMigrateProcedure is migrating the region %s to the DataNode %s. "
+ "For further information, please search [pid%d] in log. ",
conflictRegionMigrateProcedure.get().getProcId(),
((RegionMigrateProcedure) conflictRegionMigrateProcedure.get()).getConsensusGroupId(),
((RegionMigrateProcedure) conflictRegionMigrateProcedure.get()).getDestDataNode(),
conflictRegionMigrateProcedure.get().getProcId());
}
// 3. Check if the RegionMigrateProcedure generated by RemoveDataNodesProcedure conflicts with
// each other
List<RegionMigrationPlan> regionMigrationPlans =
manager.getRegionMigrationPlans(dataNodeLocations);
removedDataNodesRegionSet.clear();
for (RegionMigrationPlan regionMigrationPlan : regionMigrationPlans) {
if (removedDataNodesRegionSet.contains(regionMigrationPlan.getRegionId())) {
failMessage =
String.format(
"Submit RemoveDataNodesProcedure failed, "
+ "because the RegionMigrateProcedure generated by this RemoveDataNodesProcedure conflicts with each other. "
+ "The conflict region id is %s . ",
regionMigrationPlan.getRegionId());
break;
}
removedDataNodesRegionSet.add(regionMigrationPlan.getRegionId());
}
if (failMessage != null) {
LOGGER.warn(failMessage);
TSStatus failStatus = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
failStatus.setMessage(failMessage);
return failStatus;
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

/**
* Checks whether region migration is allowed.
*
* @param migrateRegionReq the migration request details
* @param regionGroupId the ID of the consensus group for the region
* @param originalDataNode the original DataNode location from which the region is being migrated
* @param destDataNode the destination DataNode location to which the region is being migrated
* @param coordinatorForAddPeer the DataNode location acting as the coordinator for adding a peer
* @return the status of the migration request (TSStatus)
*/
private TSStatus checkRegionMigrate(
TMigrateRegionReq migrateRegionReq,
TConsensusGroupId regionGroupId,
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode,
TDataNodeLocation coordinatorForAddPeer) {
String failMessage = null;
// 1. Check if the RegionMigrateProcedure has conflict with another RegionMigrateProcedure
Optional<Procedure<ConfigNodeProcedureEnv>> anotherMigrateProcedure =
this.executor.getProcedures().values().stream()
.filter(
Expand Down Expand Up @@ -685,6 +789,49 @@ private TSStatus checkRegionMigrate(
"Submit RegionMigrateProcedure failed, because the destDataNode %s is ReadOnly or Unknown.",
migrateRegionReq.getToId());
}

// 2. Check if the RegionMigrateProcedure has conflict with RemoveDataNodesProcedure
Optional<Procedure<ConfigNodeProcedureEnv>> conflictRemoveDataNodesProcedure =
this.executor.getProcedures().values().stream()
.filter(
procedure -> {
if (procedure instanceof RemoveDataNodesProcedure) {
return !procedure.isFinished();
}
return false;
})
.findAny();

if (conflictRemoveDataNodesProcedure.isPresent()) {
RemoveDataNodeManager manager = env.getRemoveDataNodeManager();
List<TDataNodeLocation> removedDataNodes =
((RemoveDataNodesProcedure) conflictRemoveDataNodesProcedure.get()).getRemovedDataNodes();
Set<TConsensusGroupId> removedDataNodesRegionSet =
manager.getRemovedDataNodesRegionSet(removedDataNodes);
if (removedDataNodesRegionSet.contains(regionGroupId)) {
failMessage =
String.format(
"Submit RegionMigrateProcedure failed, "
+ "because another RemoveDataNodesProcedure %s is already in processing which conflicts with this RegionMigrateProcedure. "
+ "The RemoveDataNodesProcedure is removing the DataNodes %s which contains the region %s. "
+ "For further information, please search [pid%d] in log. ",
conflictRemoveDataNodesProcedure.get().getProcId(),
removedDataNodes,
regionGroupId,
conflictRemoveDataNodesProcedure.get().getProcId());
} else if (removedDataNodes.contains(destDataNode)) {
failMessage =
String.format(
"Submit RegionMigrateProcedure failed, "
+ "because another RemoveDataNodesProcedure %s is already in processing which conflicts with this RegionMigrateProcedure. "
+ "The RemoveDataNodesProcedure is removing the target DataNode %s. "
+ "For further information, please search [pid%d] in log. ",
conflictRemoveDataNodesProcedure.get().getProcId(),
destDataNode,
conflictRemoveDataNodesProcedure.get().getProcId());
}
}

if (failMessage != null) {
LOGGER.warn(failMessage);
TSStatus failStatus = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
Expand All @@ -695,6 +842,7 @@ private TSStatus checkRegionMigrate(
}

public synchronized TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
env.getSubmitRegionMigrateLock().lock();
TConsensusGroupId regionGroupId;
Optional<TConsensusGroupId> optional =
configManager
Expand All @@ -720,7 +868,7 @@ public synchronized TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
.getRegisteredDataNode(migrateRegionReq.getToId())
.getLocation();
// select coordinator for adding peer
RegionMaintainHandler handler = new RegionMaintainHandler(configManager);
RegionMaintainHandler handler = env.getRegionMaintainHandler();
// TODO: choose the DataNode which has lowest load
final TDataNodeLocation coordinatorForAddPeer =
handler
Expand All @@ -742,21 +890,25 @@ public synchronized TSStatus migrateRegion(TMigrateRegionReq migrateRegionReq) {
return status;
}

// finally, submit procedure
this.executor.submitProcedure(
new RegionMigrateProcedure(
regionGroupId,
originalDataNode,
destDataNode,
coordinatorForAddPeer,
coordinatorForRemovePeer));
LOGGER.info(
"Submit RegionMigrateProcedure successfully, Region: {}, Origin DataNode: {}, Dest DataNode: {}, Add Coordinator: {}, Remove Coordinator: {}",
regionGroupId,
originalDataNode,
destDataNode,
coordinatorForAddPeer,
coordinatorForRemovePeer);
try {
// finally, submit procedure
this.executor.submitProcedure(
new RegionMigrateProcedure(
regionGroupId,
originalDataNode,
destDataNode,
coordinatorForAddPeer,
coordinatorForRemovePeer));
LOGGER.info(
"Submit RegionMigrateProcedure successfully, Region: {}, Origin DataNode: {}, Dest DataNode: {}, Add Coordinator: {}, Remove Coordinator: {}",
regionGroupId,
originalDataNode,
destDataNode,
coordinatorForAddPeer,
coordinatorForRemovePeer);
} finally {
env.getSubmitRegionMigrateLock().unlock();
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ClusterManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.TTLManager;
import org.apache.iotdb.confignode.manager.TriggerManager;
Expand All @@ -75,7 +74,7 @@
import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.env.RemoveDataNodeManager;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TAINodeRestartReq;
Expand Down Expand Up @@ -371,17 +370,20 @@ public TDataNodeRestartResp updateDataNodeIfNecessary(TDataNodeRestartReq req) {
}

/**
* Remove DataNodes.
* Removes the specified DataNodes.
*
* @param removeDataNodePlan removeDataNodePlan
* @return DataNodeToStatusResp, The TSStatus will be SUCCEED_STATUS if the request is accepted,
* DATANODE_NOT_EXIST when some datanode does not exist.
* @param removeDataNodePlan the plan detailing which DataNodes to remove
* @return DataNodeToStatusResp, where the TSStatus will be SUCCEED_STATUS if the request is
* accepted, or DATANODE_NOT_EXIST if any DataNode does not exist.
*/
public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
public synchronized DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
configManager.getProcedureManager().getEnv().getSubmitRegionMigrateLock().lock();
LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);

RegionMaintainHandler handler = new RegionMaintainHandler((ConfigManager) configManager);
DataNodeToStatusResp preCheckStatus = handler.checkRemoveDataNodeRequest(removeDataNodePlan);
// Checks if the RemoveDataNode request is valid
RemoveDataNodeManager manager =
configManager.getProcedureManager().getEnv().getRemoveDataNodeManager();
DataNodeToStatusResp preCheckStatus = manager.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error(
"The remove DataNode request check failed. req: {}, check result: {}",
Expand All @@ -400,17 +402,22 @@ public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
return dataSet;
}

// Add request to queue, then return to client
boolean removeSucceed = configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
if (removeSucceed) {
status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("Server accepted the request");
} else {
status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
status.setMessage("Server rejected the request, maybe requests are too many");
try {
// Add request to queue, then return to client
boolean removeSucceed =
configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
TSStatus status;
if (removeSucceed) {
status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage("Server accepted the request");
} else {
status = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
status.setMessage("Server rejected the request, maybe requests are too many");
}
dataSet.setStatus(status);
} finally {
configManager.getProcedureManager().getEnv().getSubmitRegionMigrateLock().unlock();
}
dataSet.setStatus(status);

LOGGER.info(
"NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}",
Expand Down
Loading
Loading