Skip to content

Commit

Permalink
Update PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
Browse files Browse the repository at this point in the history
  • Loading branch information
SteveYurongSu committed Sep 30, 2024
1 parent 8d49608 commit fa2a329
Showing 1 changed file with 20 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,26 @@ private void extractDeletions(
originalDeletionCount);
}

@Override
public synchronized Event supply() {
if (!hasBeenStarted && StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
start();
}

if (Objects.isNull(pendingQueue)) {
return null;
}

final PersistentResource resource = pendingQueue.poll();
if (resource == null) {
return supplyTerminateEvent();
} else if (resource instanceof TsFileResource) {
return supplyTsFileEvent((TsFileResource) resource);
} else {
return supplyDeletionEvent((DeletionResource) resource);
}
}

private Event supplyTsFileEvent(TsFileResource resource) {
final PipeTsFileInsertionEvent event =
new PipeTsFileInsertionEvent(
Expand Down Expand Up @@ -708,26 +728,6 @@ private Event supplyTerminateEvent() {
return terminateEvent;
}

@Override
public synchronized Event supply() {
if (!hasBeenStarted && StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
start();
}

if (Objects.isNull(pendingQueue)) {
return null;
}

final PersistentResource resource = pendingQueue.poll();
if (resource == null) {
return supplyTerminateEvent();
} else if (resource instanceof TsFileResource) {
return supplyTsFileEvent((TsFileResource) resource);
} else {
return supplyDeletionEvent((DeletionResource) resource);
}
}

@Override
public synchronized boolean hasConsumedAll() {
// If the pendingQueues are null when the function is called, it implies that the extractor only
Expand Down

0 comments on commit fa2a329

Please sign in to comment.