Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;

import java.util.List;
import java.util.Optional;

public class ConsensusConfig {

private final TEndPoint thisNodeEndPoint;
private final int thisNodeId;
private final String storageDir;
private final List<String> recvSnapshotDirs;
private final TConsensusGroupType consensusGroupType;
private final RatisConfig ratisConfig;
private final IoTConsensusConfig iotConsensusConfig;
Expand All @@ -38,13 +40,15 @@ private ConsensusConfig(
TEndPoint thisNode,
int thisNodeId,
String storageDir,
List<String> recvSnapshotDirs,
TConsensusGroupType consensusGroupType,
RatisConfig ratisConfig,
IoTConsensusConfig iotConsensusConfig,
PipeConsensusConfig pipeConsensusConfig) {
this.thisNodeEndPoint = thisNode;
this.thisNodeId = thisNodeId;
this.storageDir = storageDir;
this.recvSnapshotDirs = recvSnapshotDirs;
this.consensusGroupType = consensusGroupType;
this.ratisConfig = ratisConfig;
this.iotConsensusConfig = iotConsensusConfig;
Expand All @@ -63,6 +67,10 @@ public String getStorageDir() {
return storageDir;
}

public List<String> getRecvSnapshotDirs() {
return recvSnapshotDirs;
}

public TConsensusGroupType getConsensusGroupType() {
return consensusGroupType;
}
Expand All @@ -88,6 +96,7 @@ public static class Builder {
private TEndPoint thisNode;
private int thisNodeId;
private String storageDir;
private List<String> recvSnapshotDirs;
private TConsensusGroupType consensusGroupType;
private RatisConfig ratisConfig;
private IoTConsensusConfig iotConsensusConfig;
Expand All @@ -98,6 +107,7 @@ public ConsensusConfig build() {
thisNode,
thisNodeId,
storageDir,
recvSnapshotDirs,
consensusGroupType,
Optional.ofNullable(ratisConfig).orElseGet(() -> RatisConfig.newBuilder().build()),
Optional.ofNullable(iotConsensusConfig)
Expand All @@ -121,6 +131,11 @@ public Builder setStorageDir(String storageDir) {
return this;
}

public Builder setRecvSnapshotDirs(List<String> recvSnapshotDirs) {
this.recvSnapshotDirs = recvSnapshotDirs;
return this;
}

public Builder setConsensusGroupType(TConsensusGroupType groupType) {
this.consensusGroupType = groupType;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.utils.FileUtils;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class IoTConsensus implements IConsensus {
private final TEndPoint thisNode;
private final int thisNodeId;
private final File storageDir;
private final List<String> recvSnapshotDirs;
private final IStateMachine.Registry registry;
private final Map<ConsensusGroupId, IoTConsensusServerImpl> stateMachineMap =
new ConcurrentHashMap<>();
Expand All @@ -108,6 +110,7 @@ public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
this.thisNodeId = config.getThisNodeId();
this.storageDir = new File(config.getStorageDir());
this.recvSnapshotDirs = config.getRecvSnapshotDirs();
this.config = config.getIotConsensusConfig();
this.registry = registry;
this.service = new IoTConsensusRPCService(thisNode, config.getIotConsensusConfig());
Expand Down Expand Up @@ -174,6 +177,7 @@ private void initAndRecover() throws IOException {
IoTConsensusServerImpl consensus =
new IoTConsensusServerImpl(
path.toString(),
recvSnapshotDirs,
new Peer(consensusGroupId, thisNodeId, thisNode),
new TreeSet<>(),
registry.apply(consensusGroupId),
Expand All @@ -183,6 +187,8 @@ private void initAndRecover() throws IOException {
config);
stateMachineMap.put(consensusGroupId, consensus);
}
} catch (DiskSpaceInsufficientException e) {
throw new IOException(e);
}
}
if (correctPeerListBeforeStart != null) {
Expand Down Expand Up @@ -278,16 +284,22 @@ public void createLocalPeer(ConsensusGroupId groupId, List<Peer> peers)
return null;
}

IoTConsensusServerImpl impl =
new IoTConsensusServerImpl(
path,
new Peer(groupId, thisNodeId, thisNode),
new TreeSet<>(peers),
registry.apply(groupId),
backgroundTaskService,
clientManager,
syncClientManager,
config);
IoTConsensusServerImpl impl = null;
try {
impl =
new IoTConsensusServerImpl(
path,
recvSnapshotDirs,
new Peer(groupId, thisNodeId, thisNode),
new TreeSet<>(peers),
registry.apply(groupId),
backgroundTaskService,
clientManager,
syncClientManager,
config);
} catch (DiskSpaceInsufficientException e) {
throw new RuntimeException(e);
}
impl.start();
return impl;
}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
import org.apache.iotdb.commons.consensus.index.impl.IoTProgressIndex;
import org.apache.iotdb.commons.disk.FolderManager;
import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
Expand Down Expand Up @@ -113,6 +116,7 @@ public class IoTConsensusServerImpl {
private final Lock stateMachineLock = new ReentrantLock();
private final Condition stateMachineCondition = stateMachineLock.newCondition();
private final String storageDir;
private FolderManager recvFolderManager = null;
private final TreeSet<Peer> configuration;
private final AtomicLong searchIndex;
private final LogDispatcher logDispatcher;
Expand All @@ -130,15 +134,29 @@ public class IoTConsensusServerImpl {

public IoTConsensusServerImpl(
String storageDir,
List<String> recvSnapshotDirs,
Peer thisNode,
TreeSet<Peer> configuration,
IStateMachine stateMachine,
ScheduledExecutorService backgroundTaskService,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager,
IoTConsensusConfig config) {
IoTConsensusConfig config)
throws DiskSpaceInsufficientException {
this.active = true;
this.storageDir = storageDir;
List<String> snapshotDirs = new ArrayList<>();
if (recvSnapshotDirs != null) {
for (String dir : recvSnapshotDirs) {
snapshotDirs.add(dir + File.separator + SNAPSHOT_DIR_NAME);
}
} else {
snapshotDirs.add(storageDir);
}

this.recvFolderManager =
new FolderManager(
snapshotDirs, DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY);
this.thisNode = thisNode;
this.stateMachine = stateMachine;
this.cacheQueueMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -361,18 +379,22 @@ public void receiveSnapshotFragment(
throws ConsensusGroupModifyPeerException {
try {
String targetFilePath = calculateSnapshotPath(snapshotId, originalFilePath);
File targetFile = getSnapshotPath(targetFilePath);
Path parentDir = Paths.get(targetFile.getParent());
if (!Files.exists(parentDir)) {
Files.createDirectories(parentDir);
}
try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true);
FileChannel channel = fos.getChannel()) {
channel.write(fileChunk.slice(), fileOffset);
}
} catch (IOException e) {
recvFolderManager.getNextWithRetry(
folder -> {
File targetFile = getSnapshotPath(folder, targetFilePath);
Path parentDir = Paths.get(targetFile.getParent());
if (!Files.exists(parentDir)) {
Files.createDirectories(parentDir);
}
try (FileOutputStream fos = new FileOutputStream(targetFile.getAbsolutePath(), true);
FileChannel channel = fos.getChannel()) {
channel.write(fileChunk.slice(), fileOffset);
}
return null;
});
} catch (DiskSpaceInsufficientException e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when receiving snapshot %s", snapshotId), e);
String.format("Error when receiving snapshot %s", snapshotId), e);
}
}

Expand Down Expand Up @@ -408,12 +430,17 @@ private void clearOldSnapshot() {

public void loadSnapshot(String snapshotId) {
// TODO: (xingtanzjr) throw exception if the snapshot load failed
stateMachine.loadSnapshot(getSnapshotPath(snapshotId));
recvFolderManager
.getFolders()
.forEach(
dir -> {
stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId));
});
}

private File getSnapshotPath(String snapshotRelativePath) {
File storageDirFile = new File(storageDir);
File snapshotDir = new File(storageDir, snapshotRelativePath);
private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) {
File storageDirFile = new File(curStorageDir);
File snapshotDir = new File(curStorageDir, snapshotRelativePath);
try {
if (!snapshotDir
.getCanonicalFile()
Expand Down Expand Up @@ -839,15 +866,19 @@ public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupModifyPe
}

public void cleanupSnapshot(String snapshotId) throws ConsensusGroupModifyPeerException {
File snapshotDir = getSnapshotPath(snapshotId);
if (snapshotDir.exists()) {
try {
FileUtils.deleteDirectory(snapshotDir);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(e);
List<String> allDirs = new ArrayList<>(Collections.singletonList(storageDir));
allDirs.addAll(recvFolderManager.getFolders());
for (String dir : allDirs) {
File snapshotDir = getSnapshotPath(dir, snapshotId);
if (snapshotDir.exists()) {
try {
FileUtils.deleteDirectory(snapshotDir);
} catch (IOException e) {
throw new ConsensusGroupModifyPeerException(e);
}
} else {
logger.info("File not exist: {}", snapshotDir);
}
} else {
logger.info("File not exist: {}", snapshotDir);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,21 @@ public class ReplicateTest {
new File("target" + File.separator + "2"),
new File("target" + File.separator + "3"));

private final List<List<String>> peersRecvSnapshotDirs =
Arrays.asList(
Arrays.asList(
"target" + File.separator + "1-1",
"target" + File.separator + "1-2",
"target" + File.separator + "1-3"),
Arrays.asList(
"target" + File.separator + "2-1",
"target" + File.separator + "2-2",
"target" + File.separator + "2-3"),
Arrays.asList(
"target" + File.separator + "3-1",
"target" + File.separator + "3-2",
"target" + File.separator + "3-3"));

private final ConsensusGroup group = new ConsensusGroup(gid, peers);
private final List<IoTConsensus> servers = new ArrayList<>();
private final List<TestStateMachine> stateMachines = new ArrayList<>();
Expand All @@ -81,6 +96,7 @@ public void setUp() throws Exception {
file.mkdirs();
stateMachines.add(new TestStateMachine());
}
peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new File(dir).mkdirs()));
initServer();
}

Expand All @@ -90,6 +106,16 @@ public void tearDown() throws Exception {
for (File file : peersStorage) {
FileUtils.deleteFully(file);
}
peersRecvSnapshotDirs.forEach(
innerList ->
innerList.forEach(
dir -> {
try {
FileUtils.deleteFully(new File(dir));
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}

private void initServer() throws IOException {
Expand All @@ -105,6 +131,7 @@ private void initServer() throws IOException {
.setThisNodeId(peers.get(i).getNodeId())
.setThisNode(peers.get(i).getEndpoint())
.setStorageDir(peersStorage.get(i).getAbsolutePath())
.setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i))
.setConsensusGroupType(TConsensusGroupType.DataRegion)
.build(),
groupId -> stateMachines.get(finalI))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -60,6 +61,12 @@ public class StabilityTest {

private final File storageDir = new File("target" + java.io.File.separator + "stability");

private final List<String> recvSnapshotDirs =
Arrays.asList(
"target" + File.separator + "1-1",
"target" + File.separator + "1-2",
"target" + File.separator + "1-3");

private IoTConsensus consensusImpl;

private final int basePort = 6667;
Expand All @@ -73,6 +80,7 @@ public void constructConsensus() throws IOException {
.setThisNodeId(1)
.setThisNode(new TEndPoint("0.0.0.0", basePort))
.setStorageDir(storageDir.getAbsolutePath())
.setRecvSnapshotDirs(recvSnapshotDirs)
.setConsensusGroupType(TConsensusGroupType.DataRegion)
.build(),
gid -> new TestStateMachine())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,8 @@ public class IoTDBConfig {

private boolean includeNullValueInWriteThroughputMetric = false;

private boolean keepSameDiskWhenLoadingSnapshot = false;

private ConcurrentHashMap<String, EncryptParameter> tsFileDBToEncryptMap =
new ConcurrentHashMap<>(
Collections.singletonMap("root.__audit", new EncryptParameter("UNENCRYPTED", null)));
Expand Down Expand Up @@ -4257,6 +4259,14 @@ public void setPasswordLockTimeMinutes(int passwordLockTimeMinutes) {
this.passwordLockTimeMinutes = passwordLockTimeMinutes;
}

public boolean isKeepSameDiskWhenLoadingSnapshot() {
return keepSameDiskWhenLoadingSnapshot;
}

public void setKeepSameDiskWhenLoadingSnapshot(boolean keepSameDiskWhenLoadingSnapshot) {
this.keepSameDiskWhenLoadingSnapshot = keepSameDiskWhenLoadingSnapshot;
}

public ConcurrentHashMap<String, EncryptParameter> getTSFileDBToEncryptMap() {
return tsFileDBToEncryptMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,12 @@ private void loadIoTConsensusProps(TrimProperties properties) throws IOException
"region_migration_speed_limit_bytes_per_second",
ConfigurationFileUtils.getConfigurationDefaultValue(
"region_migration_speed_limit_bytes_per_second"))));
conf.setKeepSameDiskWhenLoadingSnapshot(
Boolean.parseBoolean(
properties.getProperty(
"keep_same_disk_when_loading_snapshot",
ConfigurationFileUtils.getConfigurationDefaultValue(
"keep_same_disk_when_loading_snapshot"))));
}

private void loadIoTConsensusV2Props(TrimProperties properties) throws IOException {
Expand Down
Loading
Loading