Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ public class PipeSinkSubtask extends PipeAbstractSinkSubtask {
// to trigger the general event transfer function, causing potentially such as
// the random delay of the batch transmission. Therefore, here we inject cron events
// when no event can be pulled.
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT =
new PipeHeartbeatEvent("cron", false);
public static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new PipeHeartbeatEvent(-1, false);

public PipeSinkSubtask(
final String taskID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,12 @@ public class DeletionResource implements PersistentResource {
private volatile Exception cause;

public DeletionResource(
AbstractDeleteDataNode deleteDataNode,
Consumer<DeletionResource> removeHook,
String regionId) {
AbstractDeleteDataNode deleteDataNode, Consumer<DeletionResource> removeHook, int regionId) {
this.deleteDataNode = deleteDataNode;
this.removeHook = removeHook;
this.currentStatus = Status.RUNNING;
this.consensusGroupId =
ConsensusGroupId.Factory.create(
TConsensusGroupType.DataRegion.getValue(), Integer.parseInt(regionId));
ConsensusGroupId.Factory.create(TConsensusGroupType.DataRegion.getValue(), regionId);
this.pipeTaskReferenceCount =
new AtomicInteger(
DataRegionConsensusImpl.getInstance().getReplicationNum(consensusGroupId) - 1);
Expand Down Expand Up @@ -151,7 +148,7 @@ public ByteBuffer serialize() {
}

public static DeletionResource deserialize(
final ByteBuffer buffer, final String regionId, final Consumer<DeletionResource> removeHook)
final ByteBuffer buffer, final int regionId, final Consumer<DeletionResource> removeHook)
throws IOException {
AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
return new DeletionResource(node, removeHook, regionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class DeletionResourceManager implements AutoCloseable {
String.format(
"^_(?<%s>\\d+)-(?<%s>\\d+)\\%s$",
REBOOT_TIME, MEM_TABLE_FLUSH_ORDER, DELETION_FILE_SUFFIX);
private final String dataRegionId;
private final int dataRegionId;
private final DeletionBuffer deletionBuffer;
private final File storageDir;
private final Map<AbstractDeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
Expand All @@ -70,7 +70,7 @@ public class DeletionResourceManager implements AutoCloseable {
private final Condition recoveryReadyCondition = recoverLock.newCondition();
private volatile boolean hasCompletedRecovery = false;

private DeletionResourceManager(String dataRegionId) throws IOException {
private DeletionResourceManager(int dataRegionId) throws IOException {
this.dataRegionId = dataRegionId;
this.storageDir =
new File(
Expand Down Expand Up @@ -269,23 +269,23 @@ private boolean isFileProgressCoveredByGivenProgress(

//////////////////////////// singleton ////////////////////////////
private static class DeletionResourceManagerHolder {
private static Map<String, DeletionResourceManager> CONSENSU_GROUP_ID_2_INSTANCE_MAP;
private static Map<Integer, DeletionResourceManager> CONSENSUS_GROUP_ID_2_INSTANCE_MAP;

private DeletionResourceManagerHolder() {}

public static void build() {
if (CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
CONSENSU_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
if (CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
CONSENSUS_GROUP_ID_2_INSTANCE_MAP = new ConcurrentHashMap<>();
}
}
}

public static DeletionResourceManager getInstance(String groupId) {
public static DeletionResourceManager getInstance(int groupId) {
// If consensusImpl is not PipeConsensus.
if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
return null;
}
return DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
return DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.computeIfAbsent(
groupId,
key -> {
try {
Expand All @@ -305,10 +305,10 @@ public static void build() {
}

public static void exit() {
if (DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP == null) {
if (DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP == null) {
return;
}
DeletionResourceManagerHolder.CONSENSU_GROUP_ID_2_INSTANCE_MAP.forEach(
DeletionResourceManagerHolder.CONSENSUS_GROUP_ID_2_INSTANCE_MAP.forEach(
(groupId, resourceManager) -> {
resourceManager.close();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer {
? 0
: (o1.getProgressIndex().isAfter(o2.getProgressIndex()) ? 1 : -1));
// Data region id
private final String dataRegionId;
private final int dataRegionId;
// directory to store .deletion files
private final String baseDirectory;
// single thread to serialize WALEntry to workingBuffer
Expand All @@ -99,7 +99,7 @@ public class PageCacheDeletionBuffer implements DeletionBuffer {
// maxProgressIndex of each batch increases in the same order as the physical time.
private ProgressIndex maxProgressIndexInCurrentFile = MinimumProgressIndex.INSTANCE;

public PageCacheDeletionBuffer(String dataRegionId, String baseDirectory) {
public PageCacheDeletionBuffer(int dataRegionId, String baseDirectory) {
this.dataRegionId = dataRegionId;
this.baseDirectory = baseDirectory;
allocateBuffers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ public class DeletionReader implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(DeletionReader.class);
private static final int MAGIC_STRING_BYTES_SIZE =
DeletionResourceManager.MAGIC_VERSION_V1.getBytes(StandardCharsets.UTF_8).length;
private final String regionId;
private final int regionId;
private final Consumer<DeletionResource> removeHook;
private final File logFile;
private final FileInputStream fileInputStream;
private final FileChannel fileChannel;

public DeletionReader(File logFile, String regionId, Consumer<DeletionResource> removeHook)
public DeletionReader(File logFile, int regionId, Consumer<DeletionResource> removeHook)
throws IOException {
this.logFile = logFile;
this.regionId = regionId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);

private final String dataRegionId;
private final int dataRegionId;

private long timePublished;
private long timeAssigned;
Expand All @@ -52,17 +52,17 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
// The disruptor is usually nearly empty.
private int disruptorSize;

private int extractorQueueTabletSize;
private int extractorQueueTsFileSize;
private int extractorQueueSize;
private int sourceQueueTabletSize;
private int sourceQueueTsFileSize;
private int sourceQueueSize;

private int connectorQueueTabletSize;
private int connectorQueueTsFileSize;
private int connectorQueueSize;
private int sinkQueueTabletSize;
private int sinkQueueTsFileSize;
private int sinkQueueSize;

private final boolean shouldPrintMessage;

public PipeHeartbeatEvent(final String dataRegionId, final boolean shouldPrintMessage) {
public PipeHeartbeatEvent(final int dataRegionId, final boolean shouldPrintMessage) {
super(null, 0, null, null, null, null, null, null, true, Long.MIN_VALUE, Long.MAX_VALUE);
this.dataRegionId = dataRegionId;
this.shouldPrintMessage = shouldPrintMessage;
Expand All @@ -72,7 +72,7 @@ public PipeHeartbeatEvent(
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
final String dataRegionId,
final int dataRegionId,
final long timePublished,
final boolean shouldPrintMessage) {
super(
Expand Down Expand Up @@ -104,7 +104,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
@Override
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
// PipeName == null indicates that the event is the raw event at disruptor,
// not the event copied and passed to the extractor
// not the event copied and passed to the source
if (Objects.nonNull(pipeName)) {
PipeDataNodeSinglePipeMetrics.getInstance()
.decreaseHeartbeatEventCount(pipeName, creationTime);
Expand Down Expand Up @@ -208,17 +208,17 @@ public void recordDisruptorSize(final RingBuffer ringBuffer) {

public void recordExtractorQueueSize(final UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
extractorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
extractorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
extractorQueueSize = pendingQueue.size();
sourceQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
sourceQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
sourceQueueSize = pendingQueue.size();
}
}

public void recordConnectorQueueSize(final UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
connectorQueueSize = pendingQueue.size();
sinkQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
sinkQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
sinkQueueSize = pendingQueue.size();
}
}

Expand Down Expand Up @@ -259,19 +259,19 @@ public String toString() {

final String disruptorSizeMessage = Integer.toString(disruptorSize);

final String extractorQueueTabletSizeMessage =
timeAssigned != 0 ? Integer.toString(extractorQueueTabletSize) : unknownMessage;
final String extractorQueueTsFileSizeMessage =
timeAssigned != 0 ? Integer.toString(extractorQueueTsFileSize) : unknownMessage;
final String extractorQueueSizeMessage =
timeAssigned != 0 ? Integer.toString(extractorQueueSize) : unknownMessage;
final String sourceQueueTabletSizeMessage =
timeAssigned != 0 ? Integer.toString(sourceQueueTabletSize) : unknownMessage;
final String sourceQueueTsFileSizeMessage =
timeAssigned != 0 ? Integer.toString(sourceQueueTsFileSize) : unknownMessage;
final String sourceQueueSizeMessage =
timeAssigned != 0 ? Integer.toString(sourceQueueSize) : unknownMessage;

final String connectorQueueTabletSizeMessage =
timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) : unknownMessage;
final String connectorQueueTsFileSizeMessage =
timeProcessed != 0 ? Integer.toString(connectorQueueTsFileSize) : unknownMessage;
final String connectorQueueSizeMessage =
timeProcessed != 0 ? Integer.toString(connectorQueueSize) : unknownMessage;
final String sinkQueueTabletSizeMessage =
timeProcessed != 0 ? Integer.toString(sinkQueueTabletSize) : unknownMessage;
final String sinkQueueTsFileSizeMessage =
timeProcessed != 0 ? Integer.toString(sinkQueueTsFileSize) : unknownMessage;
final String sinkQueueSizeMessage =
timeProcessed != 0 ? Integer.toString(sinkQueueSize) : unknownMessage;

return "PipeHeartbeatEvent{"
+ "pipeName='"
Expand All @@ -290,18 +290,18 @@ public String toString() {
+ totalTimeMessage
+ ", disruptorSize="
+ disruptorSizeMessage
+ ", extractorQueueTabletSize="
+ extractorQueueTabletSizeMessage
+ ", extractorQueueTsFileSize="
+ extractorQueueTsFileSizeMessage
+ ", extractorQueueSize="
+ extractorQueueSizeMessage
+ ", connectorQueueTabletSize="
+ connectorQueueTabletSizeMessage
+ ", connectorQueueTsFileSize="
+ connectorQueueTsFileSizeMessage
+ ", connectorQueueSize="
+ connectorQueueSizeMessage
+ ", sourceQueueTabletSize="
+ sourceQueueTabletSizeMessage
+ ", sourceQueueTsFileSize="
+ sourceQueueTsFileSizeMessage
+ ", sourceQueueSize="
+ sourceQueueSizeMessage
+ ", sinkQueueTabletSize="
+ sinkQueueTabletSizeMessage
+ ", sinkQueueTsFileSize="
+ sinkQueueTsFileSizeMessage
+ ", sinkQueueSize="
+ sinkQueueSizeMessage
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

public class PipeCompactedTsFileInsertionEvent extends PipeTsFileInsertionEvent {

private final String dataRegionId;
private final int dataRegionId;
private final Set<String> originFilePaths;
private final List<Long> commitIds;

Expand Down Expand Up @@ -70,7 +70,7 @@ public PipeCompactedTsFileInsertionEvent(
anyOfOriginalEvents.getStartTime(),
anyOfOriginalEvents.getEndTime());

this.dataRegionId = String.valueOf(committerKey.getRegionId());
this.dataRegionId = committerKey.getRegionId();
this.originFilePaths =
originalEvents.stream()
.map(PipeTsFileInsertionEvent::getTsFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ public ProgressIndex forceGetProgressIndex() {
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(resource.getDataRegionId(), pipeName, resource.getTsFilePath());
.eliminateProgressIndex(
Integer.parseInt(resource.getDataRegionId()), pipeName, resource.getTsFilePath());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
}

public static PipeRealtimeEvent createRealtimeEvent(
final String dataRegionId, final boolean shouldPrintMessage) {
final int dataRegionId, final boolean shouldPrintMessage) {
return new PipeRealtimeEvent(
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
}
Expand Down
Loading
Loading