Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit disk spill and merge buffer utilisation metrics for GroupBy queries #17360

Merged
merged 18 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand Down Expand Up @@ -373,15 +374,17 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
groupByStatsProvider
);

factory = new GroupByQueryRunnerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.planning.DataSourceAnalysis;
Expand Down Expand Up @@ -357,15 +358,17 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
bufferSupplier,
processingConfig.getNumMergeBuffers()
);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergeBufferPool);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergeBufferPool, config);
new GroupByResourcesReservationPool(mergeBufferPool, config, groupByStatsProvider);
final GroupingEngine groupingEngine = new GroupingEngine(
processingConfig,
configSupplier,
groupByResourcesReservationPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
groupByStatsProvider
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
return new GroupByQueryRunnerFactory(groupingEngine, toolChest, bufferPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
Expand Down Expand Up @@ -490,15 +491,17 @@ public String getFormatString()
};

final Supplier<GroupByQueryConfig> configSupplier = Suppliers.ofInstance(config);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool);
final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(mergePool, config);
new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider);
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
QueryBenchmarkUtil.NOOP_QUERYWATCHER,
groupByStatsProvider
);

factory = new GroupByQueryRunnerFactory(
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Metric monitoring is an essential part of Druid operations. The following monito
|`org.apache.druid.java.util.metrics.CgroupV2MemoryMonitor`| **EXPERIMENTAL** Reports memory usage from `memory.current` and `memory.max` files. Only applicable to `cgroupv2`.|
|`org.apache.druid.server.metrics.HistoricalMetricsMonitor`|Reports statistics on Historical services. Available only on Historical services.|
|`org.apache.druid.server.metrics.SegmentStatsMonitor` | **EXPERIMENTAL** Reports statistics about segments on Historical services. Available only on Historical services. Not to be used when lazy loading is configured.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports stats for group by queries.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports stats for group by queries.|
|`org.apache.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted. It also reports statistics for the group by queries.|

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be made as a separate monitor? I don't see any downside of merging the QueryCountStatsMonitor with the group by statistics, but at the same time, I don't think there's any benefit to it.
However, merging group-by statistics with "query count stats" feels incongruent to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinions either, the only reason I merged them was an existing mergeBuffer metric in this monitor. I think a separate monitor would be better due to the added flexibility.

|`org.apache.druid.server.metrics.SubqueryCountStatsMonitor`|Reports how many subqueries have been materialized as rows or bytes and various other statistics related to the subquery execution|
|`org.apache.druid.server.emitter.HttpEmittingMonitor`|Reports internal metrics of `http` or `parametrized` emitter (see below). Must not be used with another emitter type. See the description of the metrics here: <https://github.com/apache/druid/pull/4973>.|
|`org.apache.druid.server.metrics.TaskCountStatsMonitor`|Reports how many ingestion tasks are currently running/pending/waiting and also the number of successful/failed tasks per emission period.|
Expand Down
9 changes: 9 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`subquery/fallback/unknownReason/count`|Number of subqueries which cannot be materialized as frames due other reasons.|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/rowLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given row limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`query/byteLimit/exceeded/count`|Number of queries whose inlined subquery results exceeded the given byte limit|This metric is only available if the `SubqueryCountStatsMonitor` module is included.| |
|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |

### Historical

Expand All @@ -104,6 +107,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |

### Real-time

Expand All @@ -120,6 +126,9 @@ Most metric values reset each emission period, as specified in `druid.monitoring
|`query/interrupted/count`|Number of queries interrupted due to cancellation.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`query/timeout/count`|Number of timed out queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/pendingRequests`|Number of requests waiting to acquire a batch of buffers from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.||
|`mergeBuffer/acquiredCount`|Number of merge buffers acquired from the merge buffer pool.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/acquisitionTimeNs`|Average time in nanos to acquire resource for group by queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |
|`groupBy/spilledBytes`|Number of bytes spilled on the disk by the groupBy queries.|This metric is only available if the `QueryCountStatsMonitor` module is included.| |

### Jetty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment;

import com.google.common.collect.ImmutableList;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.StupidPool;
import org.apache.druid.common.config.NullHandling;
Expand All @@ -44,6 +45,7 @@
import org.apache.druid.query.groupby.GroupByQueryQueryToolChest;
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
Expand All @@ -70,8 +72,14 @@ public void setup() throws IOException
{
final IncrementalIndex incrementalIndex = MapVirtualColumnTestBase.generateIndex();
final GroupByQueryConfig config = new GroupByQueryConfig();

final BlockingPool<ByteBuffer> mergePool =
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1);
final GroupByStatsProvider groupByStatsProvider = new GroupByStatsProvider(mergePool);

final GroupByResourcesReservationPool groupByResourcesReservationPool =
new GroupByResourcesReservationPool(new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1), config);
new GroupByResourcesReservationPool(mergePool, config, groupByStatsProvider);

final GroupingEngine groupingEngine = new GroupingEngine(
new DruidProcessingConfig()
{
Expand Down Expand Up @@ -103,7 +111,8 @@ public int getNumThreads()
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
groupByStatsProvider
);

final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
import org.apache.druid.query.groupby.GroupByStatsProvider;

import java.nio.ByteBuffer;

Expand Down Expand Up @@ -135,9 +136,10 @@ public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, DruidProcessingCon
@Merging
public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
GroupByQueryConfig groupByQueryConfig
GroupByQueryConfig groupByQueryConfig,
GroupByStatsProvider groupByStatsProvider
)
{
return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig);
return new GroupByResourcesReservationPool(mergeBufferPool, groupByQueryConfig, groupByStatsProvider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ public interface BlockingPool<T>
* @return count of pending requests
*/
long getPendingRequests();

/**
* @return number of used buffers from the pool
*/
long getUsedBufferCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockingPool is generic. It shouldn't reference buffers.

Suggested change
long getUsedBufferCount();
long getUsedResourcesCount();

}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ public long getPendingRequests()
return pendingRequests.get();
}

@Override
public long getUsedBufferCount()
{
return maxSize - objects.size();
}

private List<T> pollObjects(int elementNum) throws InterruptedException
{
final List<T> list = new ArrayList<>(elementNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,10 @@ public long getPendingRequests()
{
return 0;
}

@Override
public long getUsedBufferCount()
{
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,30 @@ public class GroupByResourcesReservationPool
/**
* Map of query's resource id -> group by resources reserved for the query to execute
*/
final ConcurrentHashMap<QueryResourceId, AtomicReference<GroupByQueryResources>> pool = new ConcurrentHashMap<>();
private final ConcurrentHashMap<QueryResourceId, AtomicReference<GroupByQueryResources>> pool = new ConcurrentHashMap<>();

/**
* Buffer pool from where the merge buffers are picked and reserved
*/
final BlockingPool<ByteBuffer> mergeBufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;

/**
* Group by query config of the server
*/
final GroupByQueryConfig groupByQueryConfig;
private final GroupByQueryConfig groupByQueryConfig;

private final GroupByStatsProvider groupByStatsProvider;

@Inject
public GroupByResourcesReservationPool(
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
GroupByQueryConfig groupByQueryConfig
GroupByQueryConfig groupByQueryConfig,
GroupByStatsProvider groupByStatsProvider
)
{
this.mergeBufferPool = mergeBufferPool;
this.groupByQueryConfig = groupByQueryConfig;
this.groupByStatsProvider = groupByStatsProvider;
}

/**
Expand All @@ -114,6 +118,7 @@ public GroupByResourcesReservationPool(
*/
public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery, boolean willMergeRunner)
{
long startNs = System.nanoTime();
if (queryResourceId == null) {
throw DruidException.defensive("Query resource id must be populated");
}
Expand Down Expand Up @@ -145,6 +150,8 @@ public void reserve(QueryResourceId queryResourceId, GroupByQuery groupByQuery,
// Resources have been allocated, spot has been reserved. The reference would ALWAYS refer to 'null'. Refer the
// allocated resources from it
reference.compareAndSet(null, resources);

groupByStatsProvider.groupByResourceAcquisitionTimeNs(System.nanoTime() - startNs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.groupby;

import org.apache.druid.collections.BlockingPool;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.query.groupby.epinephelinae.LimitedTemporaryStorage;

import javax.inject.Inject;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
* Collects stats for group by queries like used merged buffer count, spilled bytes and group by resource acquisition time.
*/
public class GroupByStatsProvider
{
private final AtomicLong groupByResourceAcquisitionTimeNs = new AtomicLong(0);
private final AtomicLong groupByResourceAcquisitionCount = new AtomicLong(0);

private final BlockingPool<ByteBuffer> blockingPool;
private final ConcurrentLinkedQueue<LimitedTemporaryStorage> temporaryStorages;

@Inject
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We shouldn't need this

public GroupByStatsProvider(@Merging BlockingPool<ByteBuffer> blockingPool)
{
this.blockingPool = blockingPool;
this.temporaryStorages = new ConcurrentLinkedQueue<>();
}

public synchronized void groupByResourceAcquisitionTimeNs(long delayNs)
{
groupByResourceAcquisitionTimeNs.addAndGet(delayNs);
groupByResourceAcquisitionCount.incrementAndGet();
}

public synchronized long getAndResetGroupByResourceAcquisitionStats()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct. We are reporting the average as a metric. A better way would be to report the sum, as well as the count. This will allow a better weighted-average than directly reporting the average from the monitor. While we are amortizing a lot of metrics by emitting it from the monitor, I think it's better to report count and sum separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a way wherein I could report the sum and count together (so that any metric aggregation platform could automatically compute the averages).
Are you suggesting to report two different metrics groupBy/acquisitionTime & groupBy/acquisitionCount?

{
long average = (groupByResourceAcquisitionTimeNs.get() / groupByResourceAcquisitionCount.get());

groupByResourceAcquisitionTimeNs.set(0);
groupByResourceAcquisitionCount.set(0);

return average;
}

public long getAcquiredMergeBufferCount()
{
return blockingPool.getUsedBufferCount();
}

public void registerTemporaryStorage(LimitedTemporaryStorage temporaryStorage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How often does this method need to be called per group by query, and what does this rely upon? I feel that there's an inbuilt expense with tracking the spilled bytes this way. I think it is still fine, but I wonder if a more efficient way would be as follows:

  1. Have a running counter of the size occupied
  2. LimitedTemporaryStorage implements closeable. Modify wherever the LimitedTemporaryStorage is getting closed to subtract the value of the storage from the counter. There can be a set inclusion check where a hashset of the opened temporary storages being tracked by the running counter are stored, and the code confirms that the removed temporary storage was indeed being tracked by the counter.

Note: My approach seems more convoluted, so if there isn't much performance downside of the current version, I think its fine as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an inbuilt expense with tracking the spilled bytes this way

What is the expense you are referring to? The temporary storage object is added to a queue and cost of adding an element should be in most cases should be O(1).

LimitedTemporaryStorage implements closeable. Modify wherever the LimitedTemporaryStorage is getting closed to subtract the value of the storage from the counter. There can be a set inclusion check where a hashset of the opened temporary storages being tracked by the running counter are stored, and the code confirms that the removed temporary storage was indeed being tracked by the counter.

This approach also requires keeping track of the temporary storages thus incurring similar overhead as the existing implementation. Additionally, it requires incrementing the "running counter" whenever bytes are written out to the disk.

Copy link
Contributor

@LakshSingla LakshSingla Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expense you are referring to

Iterating while calculating the stored bytes.

keeping track of the temporary storages

That is true, but calculating the stored bytes won't require an iteration over the stored memories.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterating while calculating the stored bytes.

I understand. Given that the monitor will run at a fixed interval (every minute or so), is the overhead significant?
I am weighing the implementation complexity against the performance impact.

{
temporaryStorages.add(temporaryStorage);
}

public long getSpilledBytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the registered "limited temporary storage" is not closed between two invocations of the getSpilledBytes(). Would the code double-count the stored bytes? Perhaps that is the intended behavior. If that's the case, we should figure out the intent behind the metric, and take a call on whether we want to do that or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless of this, I think that there should be metric (either this one after repurposing or a new one) which indicates the total size of the spilled data, per query. This will allow the admins to estimate whether the queries need a larger merge buffer and by how much. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the code double-count the stored bytes? Perhaps that is the intended behavior.

Yes, that is the intention. It basically is reporting the amount of bytes spilled when the metric is emitted.

Regardless of this, I think that there should be metric (either this one after repurposing or a new one) which indicates the total size of the spilled data, per query. This will allow the admins to estimate whether the queries need a larger merge buffer and by how much.

Makes sense, but this would be reported at the end of every query?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the query can update the monitor at the end

{
long spilledBytes = 0;

Iterator<LimitedTemporaryStorage> iterator = temporaryStorages.iterator();

while (iterator.hasNext()) {
LimitedTemporaryStorage limitedTemporaryStorage = iterator.next();

spilledBytes += limitedTemporaryStorage.currentSize();

if (limitedTemporaryStorage.isClosed()) {
iterator.remove();
}
}

return spilledBytes;
}
}
Loading
Loading