-
Notifications
You must be signed in to change notification settings - Fork 1
Hash Agg Spilling
This post comes courtesy of Boaz from his very clear description in his PR #822 for DRILL-5457: Spill implementation for Hash Aggregate.
This pull-request is for the work on enabling memory spill for the Hash Aggregate Operator.
To assist in reviewing this extensive code change, listed below are various topics/issues that describe the implementation decisions and give some code pointers. The reviewer can read these items and peek into their relevant code, or read all the items first (and comment on the design decisions as well).
The last topic is not "least": It describes many issues and solutions related to the need to estimate the memory size of batches (and hash tables, etc.) This work took a significant amount of time, and will need some more to get better.
The code was changed to pass the aggregation phase information (whether this is a Single phase, or 1st of two phase, or 2nd of two phase) from the planner to the HAG operator code. (See HashAggregate.java, AggPrelBase.java, HashAggPrel.java )
The data (rows/groups) coming into the HAG is partitioned into (a power of 2) number of partitions, based on the N least significant bits of the hash value (computed out of the row's key columns).
Each partition can be handled independently of the others. Ideally each partition should fit into the available memory. The number of partitions is initialized from the option "drill.exec.hashagg.num_partitions", and scaled down if the available memory seems too small (each partition needs to hold at least one batch in memory).
The scaling down uses the formula: AVAIL_MEMORY > NUM_PARTITIONS * ( K * EST_BATCH_SIZE + 8M ) (see delayedSetup() ) where K is the option drill.exec.hashagg.min_batches_per_partition -- see below).
Computing the number of partitions is delayed till actuall data arrives on incoming (in order to get an accurate sizing on varchars). See delayedSetup(). There is also special code for cases data never arrives (empty batches) hence no partitions (see beginning of outputCurrentBatch(), cleanUp(), delayedSetup() ).
Many of the code changes made in order to implement multi-partitions follow the original code, only changing scalar members (of HashAggTemplate) into arrays, like "htable" becomes "htables[]".
Each partition has its own hash table. After each time it is spilled, its hash table is freed and reallocated.
The hash code computation result was extracted from the HashTable (needed for the partitions), and added as a parameter to the put() method. Thus for each new incoming row, first the hash code is computed, and the low N bits are used to select the partition, then the hash code is right shifted by N, and the result is passed back to the put() method.
After spilling, the hash codes are not kept. When reading the rows (groups) from the spill, the hash codes are computed again (and right shifted before use - once per each cycle of spilling - thus repartitioning). (See more about "spilling cycle" below).
The put() method for the hash table was rewriten and simplified. In addition to the hash-code parameter change, it was changed to return the PutStatus, with two new states: NEW_BATCH_ADDED notifies the caller that a new batch was created internally, hence a new batch (only needed for Hash Agg) is needed (prior code was getting this from comparing the returned index against the prior number of batches).
A second new state is KEY_ADDED_LAST, which notifies that a batch was just filled, hence it is time for checking memory availability (because a new batch would be allocated soon).
Similar rewriting was done for the hash table containsKey() method (and addBatchifNeeded() ).
Logically the place to check for a memory pressure is when a new memory is needed (i.e., when a new group needs to be created.) However the code structure does not let this easily (e.g., a new batch is allocated inside the hash table object when a new group is detected, or the hash table structure is doubled in size), thus instead the check is done AFTER a new group was added, in case this was the last group added to that batch (see in checkGroupAndAggrValues() - checking for a new status KEY_ADDED_LAST )
This memory availability check checks if there is enough memory left between the allocated so far and the limit. Spill is initiated when: MEMORY_USED + MAX_MEMORY_NEEDED > MEMORY_LIMIT (see checkGroupAndAggrValues() ) where the memory needed is: (EST_BATCH_SIZE + 64K * (4+4)) * K * PLANNED_BATCHES + MAX_HASH_TABLES_RESIZE (See K above, under Partitioning, and the rest well below, under memory estimations).
Single phase HAG can not spill. Also under memory duress 2nd phase may end up with only a single partition, which can not allow spilling (no progress is made). In these two cases, the memory check is skipped, and the operator functions like the old code -- if runs out of memory then it will OOM. A try-catch was added into the code to provide more detail on the OOM (see getOOMErrorMsg() ).
Also in case of a single partition the allocator's memory limit is set to 10GB, to be compatible with the prior code.
Another "can't spill" situation is when choosing a partition to spill, but no partition has more than 1 batch (hence memory can not be gained, as after spilling 1 batch need to reinitialize that partition with a new batch). See chooseAPartitionToFlush(). In such a case the code "crosses its fingers" and continues without spilling.
The 1st phase HAG does not spill to disk. When the 1st detects a memory pressure it picks the current partition (the one whose last batch just got full) and returns that partition downstream (just like regular return, only early). Afterwards that partition is (deallocated and) initialized. Note the boolean "earlyOutput" in the code which controls special processing in this case - when turned on the code switches to output (e.g., innerNext() in HashAggBatch.java), and turned off when done (see outputCurrentBatch() ).
Using the SpillSet (like the External Sort does) for the actual IO. Each partition spills into a single file. Changes to SpillSet: Generalize it for any kind of "buffered memory" operator (pass in the operator's type). Also small changes to the spill file name.
When a memory pressure is detected (while reading and processing the incoming), one of the partitions is selected ( see chooseAPartitionToFlush() ) and flushed ( spillAPartition() ), and then its memory is freed and that partition is re-initialized ( reinitPartition() ). The choice of a partition gives some small priority to the current partition (since its last batch is full, unlike the others), and priority by a factor of 4 to partitions that are already spilled (i.e., a spilled partition with 10 batches would be chosen vs a pristine/non-spilled with 39 batches.)
Partition spilling (spillAPartition() ): For each batch in the partition - Allocate an outgoing container, link the values and the keys into this container, and write it to the file/stream.
2nd phase - End of incoming: After the last batch was read from the incoming - the original code ( next() ) returned a status of NONE. The new code - after spilling can't return NONE, so instead returning a special status of RESTART (see outputCurrentBatch() ). This RESTART is captured by the caller of the next() ( innerNext() in HashAggBatch.java ) which continues to drive the aggregation (instead of returning).
After the end of the incoming, all the (partially) spilled partitions finish spilling all their remaining in-memory batches to disk (see outputCurrentBatch() ). This is done to simplify the later processing of each spilled partition, as well as freeing memory which may be needed as partitions are processed. The spilled partitions are added into a list (spilledPartitionsList) to allow for later processing.
Reading of each spilled partition/file is performed like reading the incoming. For this purpose, a new class was added: SpilledRecordbatch. The main method there is next() which reads a batch from the stream -- first time it uses the original readFromStream() method, which creates a new container; subsequent calls use the new readFromStreamWithContainer() method, which is similar - only reuses the prior container. (This was done because many places in the code have references into the container).
Reading a spilled partition "just like incoming" allows for that input to spill again (and again ...); this was termed SECONDARY spilling (and TERTIARY ...). As the spilled partitions are kept in a FIFO list, processing of SECONDARY partitions would start only after all the regular spilled ones, etc. Hence a member "cycleNum" was created, incremented every time that processing the spilled list advances to another "cycle" (see outputCurrentBatch() ).
The "cycleNum" is used for the hash-code computation; the same hash-code is computed at every cycle, but the cycle tells how much to right-shift that code so that different bits would be used (for partitioning and hash-table bucket).
- drill.exec.hashagg.num_partitions: Initial number of partitions in each HAG operator (the number may be down adjusted in case too little memory is available). Default value: 32 , allowed range 1 - 128 , where a value of 1 means "No spilling" (and thus setting 10GB limit).
- drill.exec.hashagg.min_batches_per_partition: Range 2--5. Default 3. Used for internal initial estimate of number of partitions, and later when predicting memory needed (to avoid a spill). (A value of 2 may be better, but it evokes some bug which would be addressed separately).
Also using options common to all the "buffered" operators (can be overriden, per operator):
- drill.exe.spill.fs: File system for spilling into.
- drill.exec.spill.directories: (Array of) directories to spill into. (To override, per-operator: for the (managed) External Sort: "drill.exec.sort.external.spill.fs" and "drill.exec.sort.external.spill.directories", and for the Hash Aggregate: "drill.exec.hashagg.spill.fs" and "drill.exec.hashagg.spill.directories")
For testing:
- drill.exec.hashagg.mem_limit: Limit the memory for each HAG operator (also sets this number in the allocator, hence this is a "hard limit").
Also for testing (or for a customer workaround ??):
- planner.force_2phase_aggr: Forces the aggregation to be two phase.
The hash-table stats were modified to sum the stats across all the partitions' hash tables. (This only applies to the first spilling cycle; does not count for SECONDARY, TERTIARY spilling etc.). New metrics added:
- "num partitions" (actual number; may have been scaled down due to memory pressure)
- "spilled partitions" (number that has spilled)
- "MB spilled" (in case of 1st phase - that's the total data returned early). All the above three refer to the end of input into the HAG (does not include handling of spills, secondary spills, etc.)
- "cycle": 0 - no spill (or 1st phase), 1 - regular spill, 2 - Secondary, 3 - Tertiary ...)
Extended for all "buffered operators", not only for Sort. (Hash Join will be added later as well, etc.)
- New TestHashAggSpill : Runs two hash agg queries - One spills some of its partitions (1 out of 2), and the other test forces a smaller memory hence gets into a Secondary and Tertiary spills.
- TestBugFixes.testDRILL4884: This test implicitly relied on rows returned in order (the old Hash agg, plus the Parquet file). With the new division into partitions, that order was broken. Fix: added an "order by".
- TestTpchDistributedConcurrent.testConcurrentQueries: Needed longer timeout (probably spilled).
- After a spill, check again if enough memory was freed, else spill (another partition) again. (Not sure if needed.)
- Suggestion not implemented: Scaling down the initial hash-table sizes by the number of partitions (e.g. when 4 partitions, each hash-table starts with 1/4 of the initial size). Reason for not changing: starting with a small size immediately causes doubling and another doubling etc. Better allocate a little more and save that work.
- The RecordBatchSizer had a recent change to handle MAPs (recursively). Merged this change with the modified measureColumn() which returns an int (the est size).
As described above, we need to get good estimate of the memory needs in order to decide initially on the number of partitions, and later to decide each time (a batch gets filled) wheter to spill or not. These estimates are complicated due to:
(1) Possible changes in the incoming data batches (e.g., varchar(12) in the first batch becomes varchar(200) in the second incoming batch). This may invalidate prior estimates.
(2) Arbitrary setting of length 50 for varchar type (when sizing the allocation of DrillBufs)
(3) Allocation size aligned up to nearest power of 2 (DrillBufs for varchars)
(4) When an internal batch gets filled, and estimation shows ample memory -- a second batch may get filled before the first one's partition allocated a new batch (hence may cause "double booking").
(5) Inserting a single value may cause the "start indices" (the real actual "hash table") to double in size. This structure can get pretty large (few MB).
(6) Does the size of the incoming batch being charged against the HAG's memory allocator limit ? (Not sure; usually not a problem as the prior batch is deallocated before the next one comes; unless the next one is "much bigger")
(7) For varchars: The memory is allocated as a power of 2 (e.g. doubled via setSafe()). This can cause a big memory waste, like if the total memory needed for 64k varchars is ~5MB, then 8MB is allocated, wasting 3MB).
(8) The varchar value vector uses an internal "offset vector" that allocates "size+1", hence for 64K it allocates 512kb, of which 256kb are wasted (see DRILL-5446).
(1)+(6) above: Monitor the size of each incoming batch. Resize batch size estimate if the incoming batch is bigger (see doWork() )
(5) When estimating memory needs, take into account hash table size doubling in all partitions (using the new hash table method extraMemoryNeededForResize() ).
(4) Track "plannedBatches"; when "promising" few partitions a new batch each, take this into account when checking for available memory. (Though "more than 1" situation seems very rare).
(2)+(3) Idealy tracking the size of EACH varchar column could work better, but not simple to implement. Instead -- just find the maximum size of any of the incoming columns (for simplicity - not only varchars), and use this value (capped at 50, min value of 8; rounded up to the next power of 2 if needed). This addresses the common situation of multiple short varchar key columns but not the (very rare) situation of a huge varchar KEY column, plus few short ones.
(7) Update RecordBatchSizer.java -- added a method netRowWidthCap50() which takes into account the rounding up (per each column in a row), plus nulls arrays as needed, for each row (will multiply that by 64K in updateEstMaxBatchSize() ).