-
Notifications
You must be signed in to change notification settings - Fork 68
Aggregation_Scalability
The speed of aggregation (particularly daily aggregation) has been slowing with the increasing number of jobs added. Aggregation of a single day takes roughly 30s on tas-db1 and roughly 17.5s on a 2-core cloud instance with 24G of memory. Examine ways to speed up aggregation, especially when aggregating many periods.
When aggregating a given period, we need to identify the following jobs:
Period: |----------|
(1) Period encloses job: |---|
(2) Job ends in period: |----|
(3) Job starts in period: |-----|
(4) Job encloses period: |-----------------------|
The indexes on a job are based on the job start and end time. In order to use the index for a range
access the key column must be on the left-hand-side of the where clause (e.g., Key = Value
or Key BETWEEN V1 AND V2
) and select a small enough portion of the data that it is beneficial to use the
index rather than a full table scan. If we have a period defined as P = (P_start, P_end)
and a job
defined as J = (J_start, J_end)
we need to identify the jobs that overlap any portion of the
period so we can calculate the job's contribution to that period. Given a period P
we need to find
each of the 4 overlap conditions above, ideally using a condition that allows the database to
perform a range access using J_start
and/or J_end
. Conditions (1) and (3) are satisfied by
J_start BETWEEN P_start AND P_end
and (2) is satisfied by J_end BETWEEN P_start AND P_end
both
of which support range access. However (4) requires J_start < P_start AND J_end > P_end
, which
does not. All 4 conditions can be represented as J_start <= P_end AND J_end >= P_start
which also
does not allow range access. As the number of jobs grows, calculating daily aggregation periods
requiring full table scans approaches > 30s per period and does not scale when processing larger
numbers of days.
To reduce the number of full table scans, we can perform aggregation in batches where we select a range of dates (i.e., multiple aggregation periods) with a single full-table scan and then aggregate against these - essentially cached - copies. The algorithm is as follows:
- Get the list of periods (e.g., day, month, etc.) needing aggregation
- If N > Threshold (default = 50) then enable aggregation chunking
- Modify the Query object used to construct the aggregation query by replacing the FROM table with temporary cache table. Save the original FROM table so it can be replaced if the next aggregation period is below the threshold.
- Slice up the aggregation period list into chunks of size C (default = 10)
- Generate the cached table by selecting only jobs that meet the original criteria from the aggregation query and only the aggreagation periods in this slice.
- Aggregate jobs against the cached table, greatly reducing the time required for the full-table scan.
Note that using -k year
may cause issues with recovery on the postgres server so -k month
is slower but recommended.
./etl_overseer.php -c ../../../etc/etl/etl.json -s "2010-01-01" -e "2016-11-07" -a XdcdbJobRecordIngestor -x TACC-WRANGLER -x OSG -k month -v info
./etl_overseer.php -c ../../../etc/etl/etl.json -s "2010-01-01" -e "2016-11-07" -a XdcdbPostIngestJobUpdates -x TACC-WRANGLER -x OSG -v info
./etl_overseer.php -c ../../../etc/etl/etl.json -m "2016-11-07 12:26:14" -y "2016-11-07 12:30:00" -a XdcdbJobRecordAggregator -x TACC-WRANGLER -x OSG -v info
Average 17.5s/period. 5041 periods (days) = ~24.5 hours
2016-11-08 09:59:45 [info] Aggregate over 5041 days
2016-11-08 13:12:29 [info] Aggregated day (1 of 5041) 201600305 records = 2116, time = 17.62s
2016-11-08 13:12:46 [info] Aggregated day (2 of 5041) 201600304 records = 1382, time = 17.73s
2016-11-08 13:13:04 [info] Aggregated day (3 of 5041) 201600303 records = 1463, time = 17.44s
2016-11-08 13:13:21 [info] Aggregated day (4 of 5041) 201600302 records = 2146, time = 17.35s
2016-11-08 13:13:39 [info] Aggregated day (5 of 5041) 201600301 records = 2238, time = 17.58s
...
./etl_overseer.php -c ../../../etc/etl/etl.json -a XdcdbJobRecordAggregator -o "experimental_enable_batch_aggregation=true" -o "truncate_destination=true" -v info -m "2016-11-07"
Average 2.62s/period. 5041 periods (days) = ~3.68 hours
2016-11-08 13:03:55 [info] Setup for batch (day_id 201600296 - 201600305): 18.56s
2016-11-08 13:03:56 [info] Aggregated day (1 of 5041) 201600305 records = 2116, time = 0.78s
2016-11-08 13:03:56 [info] Aggregated day (2 of 5041) 201600304 records = 1382, time = 0.83s
2016-11-08 13:03:57 [info] Aggregated day (3 of 5041) 201600303 records = 1463, time = 0.61s
2016-11-08 13:03:58 [info] Aggregated day (4 of 5041) 201600302 records = 2146, time = 0.57s
2016-11-08 13:03:58 [info] Aggregated day (5 of 5041) 201600301 records = 2238, time = 0.76s
2016-11-08 13:03:59 [info] Aggregated day (6 of 5041) 201600300 records = 2233, time = 0.61s
2016-11-08 13:03:59 [info] Aggregated day (7 of 5041) 201600299 records = 1281, time = 0.26s
2016-11-08 13:04:00 [info] Aggregated day (8 of 5041) 201600298 records = 1635, time = 0.34s
2016-11-08 13:04:00 [info] Aggregated day (9 of 5041) 201600297 records = 1337, time = 0.36s
2016-11-08 13:04:00 [info] Aggregated day (10 of 5041) 201600296 records = 1410, time = 0.46s
2016-11-08 13:04:00 [info] Total time for batch (day_id 201600296 - 201600305): 24.16s
2016-11-08 13:04:20 [info] Setup for batch (day_id 201600286 - 201600295): 19.43s
2016-11-08 13:04:20 [info] Aggregated day (11 of 5041) 201600295 records = 1867, time = 0.69s
2016-11-08 13:04:21 [info] Aggregated day (12 of 5041) 201600294 records = 1850, time = 0.93s
2016-11-08 13:04:22 [info] Aggregated day (13 of 5041) 201600293 records = 1970, time = 1.03s
2016-11-08 13:04:24 [info] Aggregated day (14 of 5041) 201600292 records = 1993, time = 1.37s
2016-11-08 13:04:25 [info] Aggregated day (15 of 5041) 201600291 records = 1958, time = 1.39s
2016-11-08 13:04:26 [info] Aggregated day (16 of 5041) 201600290 records = 1311, time = 0.85s
2016-11-08 13:04:26 [info] Aggregated day (17 of 5041) 201600289 records = 1481, time = 0.45s
2016-11-08 13:04:27 [info] Aggregated day (18 of 5041) 201600288 records = 2006, time = 0.5s
2016-11-08 13:04:28 [info] Aggregated day (19 of 5041) 201600287 records = 2265, time = 0.66s
2016-11-08 13:04:29 [info] Aggregated day (20 of 5041) 201600286 records = 2104, time = 1.12s
2016-11-08 13:04:29 [info] Total time for batch (day_id 201600286 - 201600295): 28.43s
./etl_overseer.php -c ../../../etc/etl/etl.json -a XdcdbJobRecordAggregator -o "experimental_enable_batch_aggregation=true" -o "experimental_batch_aggregation_bin_size=15" -o "truncate_destination=true" -v info -m "2016-11-07"
Average 2.18s/period. 5041 periods (days) = ~3.05 hours
2016-11-08 13:06:56 [info] Setup for batch (day_id 201600291 - 201600305): 19.97s
2016-11-08 13:06:57 [info] Aggregated day (1 of 5041) 201600305 records = 2116, time = 0.9s
2016-11-08 13:06:58 [info] Aggregated day (2 of 5041) 201600304 records = 1382, time = 0.98s
2016-11-08 13:06:59 [info] Aggregated day (3 of 5041) 201600303 records = 1463, time = 0.74s
2016-11-08 13:06:59 [info] Aggregated day (4 of 5041) 201600302 records = 2146, time = 0.7s
2016-11-08 13:07:00 [info] Aggregated day (5 of 5041) 201600301 records = 2238, time = 0.89s
2016-11-08 13:07:01 [info] Aggregated day (6 of 5041) 201600300 records = 2233, time = 0.73s
2016-11-08 13:07:02 [info] Aggregated day (7 of 5041) 201600299 records = 1281, time = 0.38s
2016-11-08 13:07:02 [info] Aggregated day (8 of 5041) 201600298 records = 1635, time = 0.44s
2016-11-08 13:07:02 [info] Aggregated day (9 of 5041) 201600297 records = 1337, time = 0.47s
2016-11-08 13:07:03 [info] Aggregated day (10 of 5041) 201600296 records = 1410, time = 0.65s
2016-11-08 13:07:04 [info] Aggregated day (11 of 5041) 201600295 records = 1867, time = 0.78s
2016-11-08 13:07:05 [info] Aggregated day (12 of 5041) 201600294 records = 1850, time = 1s
2016-11-08 13:07:06 [info] Aggregated day (13 of 5041) 201600293 records = 1970, time = 1.13s
2016-11-08 13:07:07 [info] Aggregated day (14 of 5041) 201600292 records = 1993, time = 1.45s
2016-11-08 13:07:09 [info] Aggregated day (15 of 5041) 201600291 records = 1958, time = 1.54s
2016-11-08 13:07:09 [info] Total time for batch (day_id 201600291 - 201600305): 32.76s
./etl_overseer.php -c ../../../etc/etl/etl.json -a XdcdbJobRecordAggregator -o "experimental_enable_batch_aggregation=true" -o "experimental_batch_aggregation_bin_size=20" -o "truncate_destination=true" -v info -m "2016-11-07"
Average 1.95s/period. 5041 periods (days) = ~2.73 hours
2016-11-08 13:05:25 [info] Setup for batch (day_id 201600286 - 201600305): 20.97s
2016-11-08 13:05:26 [info] Aggregated day (1 of 5041) 201600305 records = 2116, time = 1.06s
2016-11-08 13:05:27 [info] Aggregated day (2 of 5041) 201600304 records = 1382, time = 1.14s
...
2016-11-08 13:05:43 [info] Total time for batch (day_id 201600286 - 201600305): 39.1s