diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java index 96c519ce6361e..91bccc6cbda0c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java @@ -62,8 +62,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask { // to trigger the general event transfer function, causing potentially such as // the random delay of the batch transmission. Therefore, here we inject cron events // when no event can be pulled. - public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = - new PipeHeartbeatEvent("cron", false); + public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new PipeHeartbeatEvent(-1, false); public PipeSinkSubtask( final String taskID, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java index 3bf5c1287b762..520b57a229ff6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java @@ -58,15 +58,12 @@ public class DeletionResource implements PersistentResource { private volatile Exception cause; public DeletionResource( - AbstractDeleteDataNode deleteDataNode, - Consumer removeHook, - String regionId) { + AbstractDeleteDataNode deleteDataNode, Consumer removeHook, int regionId) { this.deleteDataNode = deleteDataNode; this.removeHook = removeHook; this.currentStatus = Status.RUNNING; this.consensusGroupId = - ConsensusGroupId.Factory.create( - TConsensusGroupType.DataRegion.getValue(), Integer.parseInt(regionId)); + ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), regionId); this.pipeTaskReferenceCount = new AtomicInteger( DataRegionConsensusImpl.getInstance().getReplicationNum(consensusGroupId) - 1); @@ -151,7 +148,7 @@ public ByteBuffer serialize() { } public static DeletionResource deserialize( - final ByteBuffer buffer, final String regionId, final Consumer removeHook) + final ByteBuffer buffer, final int regionId, final Consumer removeHook) throws IOException { AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer); return new DeletionResource(node, removeHook, regionId); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java index ff868ffd4451c..8e1dc8bb47096 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java @@ -61,7 +61,7 @@ public class DeletionResourceManager implements AutoCloseable { String.format( "^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$", REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX); - private final String dataRegionId; + private final int dataRegionId; private final DeletionBuffer deletionBuffer; private final File storageDir; private final Map deleteNode2ResourcesMap = @@ -70,7 +70,7 @@ public class DeletionResourceManager implements AutoCloseable { private final Condition recoveryReadyCondition = recoverLock.newCondition(); private volatile boolean hasCompletedRecovery = false; - private DeletionResourceManager(String dataRegionId) throws IOException { + private DeletionResourceManager(int dataRegionId) throws IOException { this.dataRegionId = dataRegionId; this.storageDir = new File( @@ -269,23 +269,23 @@ private boolean isFileProgressCoveredByGivenProgress( //////////////////////////// singleton //////////////////////////// private static class DeletionResourceManagerHolder { - private static Map CONSENSU_GROUP_ID_2_INSTANCE_MAP; + private static Map CONSENSUS_GROUP_ID_2_INSTANCE_MAP; private DeletionResourceManagerHolder() {} public static void build() { - if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) { - CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>(); + if (CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) { + CONSENSUS_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>(); } } } - public static DeletionResourceManager getInstance(String groupId) { + public static DeletionResourceManager getInstance(int groupId) { // If consensusImpl is not PipeConsensus. - if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) { + if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) { return null; } - return DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent( + return DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent( groupId, key -> { try { @@ -305,10 +305,10 @@ public static void build() { } public static void exit() { - if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) { + if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) { return; } - DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.forEach( + DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.forEach( (groupId, resourceManager) -> { resourceManager.close(); }); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java index f401cbbc76a4f..b77105720ba1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/persist/PageCacheDeletionBuffer.java @@ -75,7 +75,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer { ? 0 : (o1.getProgressIndex().isAfter(o2.getProgressIndex()) ? 1 : -1)); // Data region id - private final String dataRegionId; + private final int dataRegionId; // directory to store .deletion files private final String baseDirectory; // single thread to serialize WALEntry to workingBuffer @@ -99,7 +99,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer { // maxProgressIndex of each batch increases in the same order as the physical time. private ProgressIndex maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE; - public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) { + public PageCacheDeletionBuffer(int dataRegionId, String baseDirectory) { this.dataRegionId = dataRegionId; this.baseDirectory = baseDirectory; allocateBuffers(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java index 943693e7b0073..1fa00cd0a0e65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/recover/DeletionReader.java @@ -40,13 +40,13 @@ public class DeletionReader implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(DeletionReader.class); private static final int MAGIC_STRING_BYTES_SIZE = DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8).length; - private final String regionId; + private final int regionId; private final Consumer removeHook; private final File logFile; private final FileInputStream fileInputStream; private final FileChannel fileChannel; - public DeletionReader(File logFile, String regionId, Consumer removeHook) + public DeletionReader(File logFile, int regionId, Consumer removeHook) throws IOException { this.logFile = logFile; this.regionId = regionId; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java index 468292b8ecc4e..8ed7317d2bc5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java @@ -41,7 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent { private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class); - private final String dataRegionId; + private final int dataRegionId; private long timePublished; private long timeAssigned; @@ -52,17 +52,17 @@ public class PipeHeartbeatEvent extends EnrichedEvent { // The disruptor is usually nearly empty. private int disruptorSize; - private int extractorQueueTabletSize; - private int extractorQueueTsFileSize; - private int extractorQueueSize; + private int sourceQueueTabletSize; + private int sourceQueueTsFileSize; + private int sourceQueueSize; - private int connectorQueueTabletSize; - private int connectorQueueTsFileSize; - private int connectorQueueSize; + private int sinkQueueTabletSize; + private int sinkQueueTsFileSize; + private int sinkQueueSize; private final boolean shouldPrintMessage; - public PipeHeartbeatEvent(final String dataRegionId, final boolean shouldPrintMessage) { + public PipeHeartbeatEvent(final int dataRegionId, final boolean shouldPrintMessage) { super(null, 0, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE); this.dataRegionId = dataRegionId; this.shouldPrintMessage = shouldPrintMessage; @@ -72,7 +72,7 @@ public PipeHeartbeatEvent( final String pipeName, final long creationTime, final PipeTaskMeta pipeTaskMeta, - final String dataRegionId, + final int dataRegionId, final long timePublished, final boolean shouldPrintMessage) { super( @@ -104,7 +104,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { // PipeName == null indicates that the event is the raw event at disruptor, - // not the event copied and passed to the extractor + // not the event copied and passed to the source if (Objects.nonNull(pipeName)) { PipeDataNodeSinglePipeMetrics.getInstance() .decreaseHeartbeatEventCount(pipeName, creationTime); @@ -208,17 +208,17 @@ public void recordDisruptorSize(final RingBuffer ringBuffer) { public void recordExtractorQueueSize(final UnboundedBlockingPendingQueue pendingQueue) { if (shouldPrintMessage) { - extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount(); - extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount(); - extractorQueueSize = pendingQueue.size(); + sourceQueueTabletSize = pendingQueue.getTabletInsertionEventCount(); + sourceQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount(); + sourceQueueSize = pendingQueue.size(); } } public void recordConnectorQueueSize(final UnboundedBlockingPendingQueue pendingQueue) { if (shouldPrintMessage) { - connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount(); - connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount(); - connectorQueueSize = pendingQueue.size(); + sinkQueueTabletSize = pendingQueue.getTabletInsertionEventCount(); + sinkQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount(); + sinkQueueSize = pendingQueue.size(); } } @@ -259,19 +259,19 @@ public String toString() { final String disruptorSizeMessage = Integer.toString(disruptorSize); - final String extractorQueueTabletSizeMessage = - timeAssigned != 0 ? Integer.toString(extractorQueueTabletSize) : unknownMessage; - final String extractorQueueTsFileSizeMessage = - timeAssigned != 0 ? Integer.toString(extractorQueueTsFileSize) : unknownMessage; - final String extractorQueueSizeMessage = - timeAssigned != 0 ? Integer.toString(extractorQueueSize) : unknownMessage; + final String sourceQueueTabletSizeMessage = + timeAssigned != 0 ? Integer.toString(sourceQueueTabletSize) : unknownMessage; + final String sourceQueueTsFileSizeMessage = + timeAssigned != 0 ? Integer.toString(sourceQueueTsFileSize) : unknownMessage; + final String sourceQueueSizeMessage = + timeAssigned != 0 ? Integer.toString(sourceQueueSize) : unknownMessage; - final String connectorQueueTabletSizeMessage = - timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) : unknownMessage; - final String connectorQueueTsFileSizeMessage = - timeProcessed != 0 ? Integer.toString(connectorQueueTsFileSize) : unknownMessage; - final String connectorQueueSizeMessage = - timeProcessed != 0 ? Integer.toString(connectorQueueSize) : unknownMessage; + final String sinkQueueTabletSizeMessage = + timeProcessed != 0 ? Integer.toString(sinkQueueTabletSize) : unknownMessage; + final String sinkQueueTsFileSizeMessage = + timeProcessed != 0 ? Integer.toString(sinkQueueTsFileSize) : unknownMessage; + final String sinkQueueSizeMessage = + timeProcessed != 0 ? Integer.toString(sinkQueueSize) : unknownMessage; return "PipeHeartbeatEvent{" + "pipeName='" @@ -290,18 +290,18 @@ public String toString() { + totalTimeMessage + ", disruptorSize=" + disruptorSizeMessage - + ", extractorQueueTabletSize=" - + extractorQueueTabletSizeMessage - + ", extractorQueueTsFileSize=" - + extractorQueueTsFileSizeMessage - + ", extractorQueueSize=" - + extractorQueueSizeMessage - + ", connectorQueueTabletSize=" - + connectorQueueTabletSizeMessage - + ", connectorQueueTsFileSize=" - + connectorQueueTsFileSizeMessage - + ", connectorQueueSize=" - + connectorQueueSizeMessage + + ", sourceQueueTabletSize=" + + sourceQueueTabletSizeMessage + + ", sourceQueueTsFileSize=" + + sourceQueueTsFileSizeMessage + + ", sourceQueueSize=" + + sourceQueueSizeMessage + + ", sinkQueueTabletSize=" + + sinkQueueTabletSizeMessage + + ", sinkQueueTsFileSize=" + + sinkQueueTsFileSizeMessage + + ", sinkQueueSize=" + + sinkQueueSizeMessage + "}"; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java index bf3f2c97acb2f..cdcb87345032a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java @@ -38,7 +38,7 @@ public class PipeCompactedTsFileInsertionEvent extends PipeTsFileInsertionEvent { - private final String dataRegionId; + private final int dataRegionId; private final Set originFilePaths; private final List commitIds; @@ -70,7 +70,7 @@ public PipeCompactedTsFileInsertionEvent( anyOfOriginalEvents.getStartTime(), anyOfOriginalEvents.getEndTime()); - this.dataRegionId = String.valueOf(committerKey.getRegionId()); + this.dataRegionId = committerKey.getRegionId(); this.originFilePaths = originalEvents.stream() .map(PipeTsFileInsertionEvent::getTsFile) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 6b3e505d2eafb..2904e3e37cc4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -400,7 +400,8 @@ public ProgressIndex forceGetProgressIndex() { public void eliminateProgressIndex() { if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) { PipeTsFileEpochProgressIndexKeeper.getInstance() - .eliminateProgressIndex(resource.getDataRegionId(), pipeName, resource.getTsFilePath()); + .eliminateProgressIndex( + Integer.parseInt(resource.getDataRegionId()), pipeName, resource.getTsFilePath()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index acc62f7e7a4da..540589326ffec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -57,7 +57,7 @@ public static PipeRealtimeEvent createRealtimeEvent( } public static PipeRealtimeEvent createRealtimeEvent( - final String dataRegionId, final boolean shouldPrintMessage) { + final int dataRegionId, final boolean shouldPrintMessage) { return new PipeRealtimeEvent( new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java index 7f89cdc5372b8..59e4e892d8e51 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/source/PipeAssignerMetrics.java @@ -41,7 +41,7 @@ public class PipeAssignerMetrics implements IMetricSet { private AbstractMetricService metricService; - private final Map assignerMap = new HashMap<>(); + private final Map assignerMap = new HashMap<>(); //////////////////////////// bindTo & unbindFrom (metric framework) //////////////////////////// @@ -49,44 +49,44 @@ public class PipeAssignerMetrics implements IMetricSet { public void bindTo(AbstractMetricService metricService) { this.metricService = metricService; synchronized (this) { - for (String dataRegionId : assignerMap.keySet()) { + for (int dataRegionId : assignerMap.keySet()) { createMetrics(dataRegionId); } } } - private void createMetrics(String dataRegionId) { + private void createMetrics(int dataRegionId) { createAutoGauge(dataRegionId); } - private void createAutoGauge(String dataRegionId) { + private void createAutoGauge(int dataRegionId) { metricService.createAutoGauge( Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(), MetricLevel.IMPORTANT, assignerMap.get(dataRegionId), PipeDataRegionAssigner::getPipeHeartbeatEventCount, Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.createAutoGauge( Metric.UNASSIGNED_TABLET_COUNT.toString(), MetricLevel.IMPORTANT, assignerMap.get(dataRegionId), PipeDataRegionAssigner::getTabletInsertionEventCount, Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.createAutoGauge( Metric.UNASSIGNED_TSFILE_COUNT.toString(), MetricLevel.IMPORTANT, assignerMap.get(dataRegionId), PipeDataRegionAssigner::getTsFileInsertionEventCount, Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); } @Override public void unbindFrom(AbstractMetricService metricService) { - ImmutableSet dataRegionIds = ImmutableSet.copyOf(assignerMap.keySet()); - for (String dataRegionId : dataRegionIds) { + ImmutableSet dataRegionIds = ImmutableSet.copyOf(assignerMap.keySet()); + for (int dataRegionId : dataRegionIds) { deregister(dataRegionId); } if (!assignerMap.isEmpty()) { @@ -94,32 +94,32 @@ public void unbindFrom(AbstractMetricService metricService) { } } - private void removeMetrics(String dataRegionId) { + private void removeMetrics(int dataRegionId) { removeAutoGauge(dataRegionId); } - private void removeAutoGauge(String dataRegionId) { + private void removeAutoGauge(int dataRegionId) { metricService.remove( MetricType.AUTO_GAUGE, Metric.UNASSIGNED_HEARTBEAT_COUNT.toString(), Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.remove( MetricType.AUTO_GAUGE, Metric.UNASSIGNED_TABLET_COUNT.toString(), Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); metricService.remove( MetricType.AUTO_GAUGE, Metric.UNASSIGNED_TSFILE_COUNT.toString(), Tag.REGION.toString(), - dataRegionId); + Integer.toString(dataRegionId)); } //////////////////////////// register & deregister (pipe integration) //////////////////////////// public void register(PipeDataRegionAssigner pipeDataRegionAssigner) { - String dataRegionId = pipeDataRegionAssigner.getDataRegionId(); + int dataRegionId = pipeDataRegionAssigner.getDataRegionId(); synchronized (this) { assignerMap.putIfAbsent(dataRegionId, pipeDataRegionAssigner); if (Objects.nonNull(metricService)) { @@ -128,7 +128,7 @@ public void register(PipeDataRegionAssigner pipeDataRegionAssigner) { } } - public void deregister(String dataRegionId) { + public void deregister(final int dataRegionId) { synchronized (this) { if (!assignerMap.containsKey(dataRegionId)) { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index ab74d09474020..8aac47b01c7d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -498,7 +498,7 @@ public synchronized void start() { extractTsFiles(dataRegion, startHistoricalExtractionTime, originalResourceList); } if (shouldExtractDeletion) { - Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId))) + Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId)) .ifPresent(manager -> extractDeletions(manager, originalResourceList)); } @@ -946,7 +946,7 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) { dataRegionId, event); } else { - Optional.ofNullable(DeletionResourceManager.getInstance(String.valueOf(dataRegionId))) + Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId)) .ifPresent( manager -> event.setDeletionResource( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java index cd00e2975a0aa..de0ee2644736e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java @@ -91,7 +91,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor { protected String pipeName; protected long creationTime; - protected String dataRegionId; + protected int dataRegionId = -1; protected PipeTaskMeta pipeTaskMeta; protected boolean shouldExtractInsertion; @@ -214,7 +214,7 @@ public void customize( shouldExtractDeletion = insertionDeletionListeningOptionPair.getRight(); pipeName = environment.getPipeName(); - dataRegionId = String.valueOf(environment.getRegionId()); + dataRegionId = environment.getRegionId(); pipeTaskMeta = environment.getPipeTaskMeta(); // Metrics related to TsFileEpoch are managed in PipeExtractorMetrics. These metrics are @@ -317,7 +317,7 @@ public void start() throws Exception { @Override public void close() throws Exception { - if (Objects.nonNull(dataRegionId)) { + if (dataRegionId >= 0) { PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this); PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this); } @@ -555,7 +555,7 @@ public boolean isSkipIfNoPrivileges() { return skipIfNoPrivileges; } - public final String getDataRegionId() { + public final int getDataRegionId() { return dataRegionId; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 8ea973fbf0a3a..0f4193af7a9ea 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -58,32 +58,32 @@ public class PipeDataRegionAssigner implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataRegionAssigner.class); /** - * The {@link PipeDataRegionMatcher} is used to match the event with the extractor based on the + * The {@link PipeDataRegionMatcher} is used to match the event with the source based on the * pattern. */ private final PipeDataRegionMatcher matcher; - /** The {@link DisruptorQueue} is used to assign the event to the extractor. */ + /** The {@link DisruptorQueue} is used to assign the event to the source. */ private final DisruptorQueue disruptor; - private final String dataRegionId; + private final int dataRegionId; private Boolean isTableModel; private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); - public String getDataRegionId() { + public int getDataRegionId() { return dataRegionId; } - public PipeDataRegionAssigner(final String dataRegionId) { + public PipeDataRegionAssigner(final int dataRegionId) { this.matcher = new CachedSchemaPatternMatcher(); - this.disruptor = new DisruptorQueue(this::assignToExtractor, this::onAssignedHook); + this.disruptor = new DisruptorQueue(this::assignToSource, this::onAssignedHook); this.dataRegionId = dataRegionId; PipeAssignerMetrics.getInstance().register(this); final DataRegion dataRegion = - StorageEngine.getInstance().getDataRegion(new DataRegionId(Integer.parseInt(dataRegionId))); + StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId)); if (Objects.nonNull(dataRegion)) { final String databaseName = dataRegion.getDatabaseName(); if (Objects.nonNull(databaseName)) { @@ -128,7 +128,7 @@ private void onAssignedHook(final PipeRealtimeEvent realtimeEvent) { eventCounter.decreaseEventCount(innerEvent); } - private void assignToExtractor( + private void assignToSource( final PipeRealtimeEvent event, final long sequence, final boolean endOfBatch) { if (disruptor.isClosed()) { return; @@ -140,17 +140,15 @@ private void assignToExtractor( matchedAndUnmatched .getLeft() .forEach( - extractor -> { + source -> { if (disruptor.isClosed()) { return; } - if (event.getEvent().isGeneratedByPipe() && !extractor.isForwardingPipeRequests()) { + if (event.getEvent().isGeneratedByPipe() && !source.isForwardingPipeRequests()) { final ProgressReportEvent reportEvent = new ProgressReportEvent( - extractor.getPipeName(), - extractor.getCreationTime(), - extractor.getPipeTaskMeta()); + source.getPipeName(), source.getCreationTime(), source.getPipeTaskMeta()); reportEvent.bindProgressIndex(event.getProgressIndex()); if (!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { LOGGER.warn( @@ -158,33 +156,33 @@ private void assignToExtractor( reportEvent); return; } - extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); + source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); return; } final PipeRealtimeEvent copiedEvent = event.shallowCopySelfAndBindPipeTaskMetaForProgressReport( - extractor.getPipeName(), - extractor.getCreationTime(), - extractor.getPipeTaskMeta(), - extractor.getTreePattern(), - extractor.getTablePattern(), - String.valueOf(extractor.getUserId()), - extractor.getUserName(), - extractor.getCliHostname(), - extractor.isSkipIfNoPrivileges(), - extractor.getRealtimeDataExtractionStartTime(), - extractor.getRealtimeDataExtractionEndTime()); + source.getPipeName(), + source.getCreationTime(), + source.getPipeTaskMeta(), + source.getTreePattern(), + source.getTablePattern(), + String.valueOf(source.getUserId()), + source.getUserName(), + source.getCliHostname(), + source.isSkipIfNoPrivileges(), + source.getRealtimeDataExtractionStartTime(), + source.getRealtimeDataExtractionEndTime()); final EnrichedEvent innerEvent = copiedEvent.getEvent(); // if using IoTV2, assign a replicateIndex for this realtime event if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus && PipeConsensusProcessor.isShouldReplicate(innerEvent)) { innerEvent.setReplicateIndexForIoTV2( ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2( - extractor.getPipeName())); + source.getPipeName())); LOGGER.debug( "[{}]Set {} for realtime event {}", - extractor.getPipeName(), + source.getPipeName(), innerEvent.getReplicateIndexForIoTV2(), innerEvent); } @@ -192,15 +190,14 @@ private void assignToExtractor( if (innerEvent instanceof PipeTsFileInsertionEvent) { final PipeTsFileInsertionEvent tsFileInsertionEvent = (PipeTsFileInsertionEvent) innerEvent; - tsFileInsertionEvent.disableMod4NonTransferPipes( - extractor.isShouldTransferModFile()); + tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile()); } if (innerEvent instanceof PipeDeleteDataNodeEvent) { final PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) innerEvent; final DeletionResourceManager manager = - DeletionResourceManager.getInstance(extractor.getDataRegionId()); + DeletionResourceManager.getInstance(source.getDataRegionId()); // increase deletion resource's reference and bind real deleteEvent if (Objects.nonNull(manager) && DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2( @@ -217,13 +214,13 @@ private void assignToExtractor( copiedEvent); return; } - extractor.extract(copiedEvent); + source.extract(copiedEvent); }); matchedAndUnmatched .getRight() .forEach( - extractor -> { + source -> { if (disruptor.isClosed()) { return; } @@ -233,9 +230,7 @@ private void assignToExtractor( || innerEvent instanceof TsFileInsertionEvent) { final ProgressReportEvent reportEvent = new ProgressReportEvent( - extractor.getPipeName(), - extractor.getCreationTime(), - extractor.getPipeTaskMeta()); + source.getPipeName(), source.getCreationTime(), source.getPipeTaskMeta()); reportEvent.bindProgressIndex(event.getProgressIndex()); if (!reportEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { LOGGER.warn( @@ -243,17 +238,17 @@ private void assignToExtractor( reportEvent); return; } - extractor.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); + source.extract(PipeRealtimeEventFactory.createRealtimeEvent(reportEvent)); } }); } - public void startAssignTo(final PipeRealtimeDataRegionSource extractor) { - matcher.register(extractor); + public void startAssignTo(final PipeRealtimeDataRegionSource source) { + matcher.register(source); } - public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) { - matcher.deregister(extractor); + public void stopAssignTo(final PipeRealtimeDataRegionSource source) { + matcher.deregister(source); } public void invalidateCache() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java index ff7d90c377d10..bf15dcdc5475a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java @@ -32,11 +32,11 @@ public class PipeTsFileEpochProgressIndexKeeper { // data region id -> pipeName -> tsFile path -> max progress index - private final Map>> progressIndexKeeper = + private final Map>> progressIndexKeeper = new ConcurrentHashMap<>(); public synchronized void registerProgressIndex( - final String dataRegionId, final String pipeName, final TsFileResource resource) { + final int dataRegionId, final String pipeName, final TsFileResource resource) { progressIndexKeeper .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) @@ -44,7 +44,7 @@ public synchronized void registerProgressIndex( } public synchronized void eliminateProgressIndex( - final String dataRegionId, final @Nonnull String pipeName, final String filePath) { + final int dataRegionId, final @Nonnull String pipeName, final String filePath) { progressIndexKeeper .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>()) .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>()) @@ -52,7 +52,7 @@ public synchronized void eliminateProgressIndex( } public synchronized boolean isProgressIndexAfterOrEquals( - final String dataRegionId, + final int dataRegionId, final String pipeName, final String tsFilePath, final ProgressIndex progressIndex) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 8c49dd0a3dd8d..3cce521a51e0b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -48,7 +48,7 @@ * will filter events and assign them to different PipeRealtimeEventDataRegionExtractors. */ public class PipeInsertionDataNodeListener { - private final ConcurrentMap dataRegionId2Assigner = + private final ConcurrentMap dataRegionId2Assigner = new ConcurrentHashMap<>(); private final AtomicInteger listenToTsFileExtractorCount = new AtomicInteger(0); @@ -57,7 +57,7 @@ public class PipeInsertionDataNodeListener { //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( - final String dataRegionId, final PipeRealtimeDataRegionSource extractor) { + final int dataRegionId, final PipeRealtimeDataRegionSource extractor) { dataRegionId2Assigner .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId)) .startAssignTo(extractor); @@ -71,7 +71,7 @@ public synchronized void startListenAndAssign( } public synchronized void stopListenAndAssign( - final String dataRegionId, final PipeRealtimeDataRegionSource extractor) { + final int dataRegionId, final PipeRealtimeDataRegionSource extractor) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); if (assigner == null) { return; @@ -97,7 +97,7 @@ public synchronized void stopListenAndAssign( //////////////////////////// listen to events //////////////////////////// public void listenToTsFile( - final String dataRegionId, + final int dataRegionId, final String databaseName, final TsFileResource tsFileResource, final boolean isLoaded) { @@ -118,7 +118,7 @@ public void listenToTsFile( } public void listenToInsertNode( - final String dataRegionId, + final int dataRegionId, final String databaseName, final InsertNode insertNode, final TsFileResource tsFileResource) { @@ -139,7 +139,7 @@ public void listenToInsertNode( } public DeletionResource listenToDeleteData( - final String regionId, final AbstractDeleteDataNode node) { + final int regionId, final AbstractDeleteDataNode node) { final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(regionId); // only events from registered data region will be extracted if (assigner == null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java index c5e98cd1b2ceb..7c674a3bd1d7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeTimePartitionListener.java @@ -30,19 +30,18 @@ public class PipeTimePartitionListener { - private final Map> dataRegionId2Extractors = + private final Map> dataRegionId2Sources = new ConcurrentHashMap<>(); // This variable is used to record the upper and lower bounds that each data region's time // partition ID has ever reached. - private final Map> dataRegionId2TimePartitionIdBound = + private final Map> dataRegionId2TimePartitionIdBound = new ConcurrentHashMap<>(); //////////////////////////// start & stop //////////////////////////// - public synchronized void startListen( - String dataRegionId, PipeRealtimeDataRegionSource extractor) { - dataRegionId2Extractors + public synchronized void startListen(int dataRegionId, PipeRealtimeDataRegionSource extractor) { + dataRegionId2Sources .computeIfAbsent(dataRegionId, o -> new HashMap<>()) .put(extractor.getTaskID(), extractor); // Assign the previously recorded upper and lower bounds of time partition to the extractor that @@ -53,22 +52,21 @@ public synchronized void startListen( } } - public synchronized void stopListen(String dataRegionId, PipeRealtimeDataRegionSource extractor) { - Map extractors = - dataRegionId2Extractors.get(dataRegionId); + public synchronized void stopListen(int dataRegionId, PipeRealtimeDataRegionSource extractor) { + Map extractors = dataRegionId2Sources.get(dataRegionId); if (Objects.isNull(extractors)) { return; } extractors.remove(extractor.getTaskID()); if (extractors.isEmpty()) { - dataRegionId2Extractors.remove(dataRegionId); + dataRegionId2Sources.remove(dataRegionId); } } //////////////////////////// listen to changes //////////////////////////// public synchronized void listenToTimePartitionGrow( - String dataRegionId, Pair newTimePartitionIdBound) { + int dataRegionId, Pair newTimePartitionIdBound) { boolean shouldBroadcastTimePartitionChange = false; Pair oldTimePartitionIdBound = dataRegionId2TimePartitionIdBound.get(dataRegionId); @@ -86,8 +84,7 @@ public synchronized void listenToTimePartitionGrow( } if (shouldBroadcastTimePartitionChange) { - Map extractors = - dataRegionId2Extractors.get(dataRegionId); + Map extractors = dataRegionId2Sources.get(dataRegionId); if (Objects.isNull(extractors)) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index ef4a4910e1b85..20c78fd0b8c3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2019,7 +2019,7 @@ public void deleteFolder(String systemDir) { } public void deleteDALFolderAndClose() { - Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionIdString)) + Optional.ofNullable(DeletionResourceManager.getInstance(dataRegionId.getId())) .ifPresent( manager -> { manager.close(); @@ -2833,7 +2833,8 @@ public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode n deleteDataInUnsealedFiles(unsealedTsFileResource, deletion, sealedTsFileResource); // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -2936,7 +2937,8 @@ public void deleteByTable(RelationalDeleteDataNode node) throws IOException { // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -2993,7 +2995,8 @@ public void deleteDataDirectly(MeasurementPath pathToDelete, DeleteDataNode node deleteDataDirectlyInFile(unsealedTsFileResource, deletion); // capture deleteDataNode and wait it to be persisted to DAL. DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance().listenToDeleteData(dataRegionIdString, node); + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); // just get result. We have already waited for result in `listenToDeleteData` if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { throw deletionResource.getCause(); @@ -4164,7 +4167,7 @@ private boolean loadTsFileToUnSequence( // Listen before the tsFile is added into tsFile manager to avoid it being compacted PipeInsertionDataNodeListener.getInstance() - .listenToTsFile(dataRegionIdString, databaseName, tsFileResource, true); + .listenToTsFile(dataRegionId.getId(), databaseName, tsFileResource, true); tsFileManager.add(tsFileResource, false); @@ -4343,6 +4346,10 @@ public String getDataRegionIdString() { return dataRegionIdString; } + public int getDataRegionId() { + return dataRegionId.getId(); + } + /** * Get the storageGroupPath with dataRegionId. * diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 78b33c02c0bdb..518df5322a9d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -342,7 +342,7 @@ public void insert(InsertRowNode insertRowNode, long[] infoForMetrics) } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), insertRowNode, tsFileResource); @@ -439,7 +439,7 @@ public void insertRows(InsertRowsNode insertRowsNode, long[] infoForMetrics) } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), insertRowsNode, tsFileResource); @@ -612,7 +612,7 @@ public void insertTablet( } PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), insertTabletNode, tsFileResource); @@ -1752,7 +1752,7 @@ private void endFile() throws IOException, TsFileProcessorException { // before resource serialization to avoid missing hardlink after restart PipeInsertionDataNodeListener.getInstance() .listenToTsFile( - dataRegionInfo.getDataRegion().getDataRegionIdString(), + dataRegionInfo.getDataRegion().getDataRegionId(), dataRegionInfo.getDataRegion().getDatabaseName(), tsFileResource, false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java index 2e1161977f8b5..5ded82a36ba00 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java @@ -67,7 +67,7 @@ public void registerTimePartitionInfo(TimePartitionInfo timePartitionInfo) { // PipeInsertionDataNodeListener.listenToInsertNode. PipeTimePartitionListener.getInstance() .listenToTimePartitionGrow( - String.valueOf(timePartitionInfo.dataRegionId.getId()), + timePartitionInfo.dataRegionId.getId(), new Pair<>( timePartitionInfoMapForRegion.firstKey(), timePartitionInfoMapForRegion.lastKey())); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java index 06f823c0e23fb..b69f2d85ef750 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java @@ -42,7 +42,7 @@ import java.util.Collections; public class DeletionRecoverTest { - private static final String[] FAKE_DATA_REGION_IDS = {"2", "3"}; + private static final int[] FAKE_DATA_REGION_IDS = {2, 3}; private static final int THIS_DATANODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private static final String DELETION_BASE_DIR = @@ -50,7 +50,7 @@ public class DeletionRecoverTest { private final int deletionCount = 10; private DeletionResourceManager deletionResourceManager; - public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws Exception { + public void setUp(boolean isRelational, int FAKE_DATA_REGION_ID) throws Exception { File baseDir = new File(DELETION_BASE_DIR + File.separator + FAKE_DATA_REGION_ID); if (baseDir.exists()) { FileUtils.deleteFileOrDirectory(baseDir); @@ -84,7 +84,7 @@ public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws Excep @After public void tearDown() throws Exception { - for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { + for (int FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { File baseDir = new File(DELETION_BASE_DIR + File.separator + FAKE_DATA_REGION_ID); if (baseDir.exists()) { FileUtils.deleteFileOrDirectory(baseDir); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java index f94d909f94bd1..8b4ace5ec2210 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java @@ -66,7 +66,7 @@ import java.util.stream.Stream; public class DeletionResourceTest { - private static final String[] FAKE_DATA_REGION_IDS = {"2", "3", "4", "5", "6"}; + private static final int[] FAKE_DATA_REGION_IDS = {2, 3, 4, 5, 6}; private static final String DELETION_BASE_DIR = IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir(); private static final int THIS_DATANODE_ID = 0; @@ -84,7 +84,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { IoTDBDescriptor.getInstance().getConfig().setDataNodeId(previousDataNodeId); - for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { + for (int FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) { File baseDir = new File(DELETION_BASE_DIR + File.separator + FAKE_DATA_REGION_ID); if (baseDir.exists()) { FileUtils.deleteFileOrDirectory(baseDir); @@ -212,7 +212,7 @@ public void deletionRemove(final boolean isRelational, final int initialIndex) new PipeDeleteDataNodeEvent( deleteDataNode, "Test", 10, null, null, null, null, null, null, true, true); deletionEvent.setCommitterKeyAndCommitId( - new CommitterKey("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]), 0), i + 1); + new CommitterKey("Test", 10, FAKE_DATA_REGION_IDS[3], 0), i + 1); deletionEvents.add(deletionEvent); final DeletionResource deletionResource = @@ -227,8 +227,7 @@ public void deletionRemove(final boolean isRelational, final int initialIndex) // for event commit to invoke onCommit() to removeDAL if (initialIndex == 0) { - PipeEventCommitManager.getInstance() - .register("Test", 10, Integer.parseInt(FAKE_DATA_REGION_IDS[3]), "Test"); + PipeEventCommitManager.getInstance().register("Test", 10, FAKE_DATA_REGION_IDS[3], "Test"); } deletionEvents.forEach(deletionEvent -> deletionEvent.increaseReferenceCount("test")); final List paths = @@ -265,8 +264,7 @@ public void testWaitForResult() throws Exception { }); final PipeTaskRuntimeConfiguration configuration = new PipeTaskRuntimeConfiguration( - new PipeTaskSourceRuntimeEnvironment( - "1", 1, Integer.parseInt(FAKE_DATA_REGION_IDS[4]), null)); + new PipeTaskSourceRuntimeEnvironment("1", 1, FAKE_DATA_REGION_IDS[4], null)); extractor.customize(parameters, configuration); Assert.assertTrue(extractor.shouldExtractDeletion()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java index 59aec4b1f7b05..9e07e91e9839b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/PipeRealtimeExtractTest.java @@ -70,8 +70,8 @@ public class PipeRealtimeExtractTest { private static final Logger LOGGER = LoggerFactory.getLogger(PipeRealtimeExtractTest.class); - private final String dataRegion1 = "1"; - private final String dataRegion2 = "2"; + private final int dataRegion1 = 1; + private final int dataRegion2 = 2; private final String pattern1 = "root.sg.d"; private final String pattern2 = "root.sg.d.a"; private final String[] device = new String[] {"root", "sg", "d"}; @@ -151,31 +151,19 @@ public void testRealtimeExtractProcess() { final PipeTaskRuntimeConfiguration configuration0 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion1), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); final PipeTaskRuntimeConfiguration configuration1 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion1), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion1, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); final PipeTaskRuntimeConfiguration configuration2 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion2), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion2, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); final PipeTaskRuntimeConfiguration configuration3 = new PipeTaskRuntimeConfiguration( new PipeTaskSourceRuntimeEnvironment( - "1", - 1, - Integer.parseInt(dataRegion2), - new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); + "1", 1, dataRegion2, new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1))); // Some parameters of extractor are validated and initialized during the validation process. extractor0.validate(new PipeParameterValidator(parameters0)); @@ -274,7 +262,7 @@ public void testRealtimeExtractProcess() { } private Future write2DataRegion( - final int writeNum, final String dataRegionId, final int startNum) { + final int writeNum, final int dataRegionId, final int startNum) { final File dataRegionDir = new File(tsFileDir.getPath() + File.separator + dataRegionId + File.separator + "0"); final boolean ignored = dataRegionDir.mkdirs(); @@ -305,7 +293,7 @@ private Future write2DataRegion( PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionId, - dataRegionId, + Integer.toString(dataRegionId), new InsertRowNode( new PlanNodeId(String.valueOf(i)), new PartialPath(device), @@ -319,7 +307,7 @@ private Future write2DataRegion( PipeInsertionDataNodeListener.getInstance() .listenToInsertNode( dataRegionId, - dataRegionId, + Integer.toString(dataRegionId), new InsertRowNode( new PlanNodeId(String.valueOf(i)), new PartialPath(device), @@ -331,7 +319,7 @@ private Future write2DataRegion( false), resource); PipeInsertionDataNodeListener.getInstance() - .listenToTsFile(dataRegionId, dataRegionId, resource, false); + .listenToTsFile(dataRegionId, Integer.toString(dataRegionId), resource, false); } }); }