Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement groupby::merge for collect_list and collect_set #8407

Closed
wants to merge 3 commits into from

Conversation

ttnghia
Copy link
Contributor

@ttnghia ttnghia commented May 31, 2021

Groupby aggregations can be performed for distributed computing by the following approach:

  • Divide the dataset into batches
  • Run separate (distributed) aggregations on those batches
  • Merge the results of the step above into one final result

This PR supports merging operations for collect_list and collect_set.

Closes #7839.

PS: This is a WIP and not ready for review.

@ttnghia ttnghia requested a review from sperlingxx May 31, 2021 15:04
@ttnghia ttnghia self-assigned this May 31, 2021
@ttnghia ttnghia requested review from a team as code owners May 31, 2021 15:04
@github-actions github-actions bot added CMake CMake build issue libcudf Affects libcudf (C++/CUDA) code. labels May 31, 2021
@ttnghia ttnghia marked this pull request as draft May 31, 2021 15:04
@ttnghia ttnghia added the Spark Functionality that helps Spark RAPIDS label May 31, 2021
@ttnghia ttnghia requested a review from jlowe May 31, 2021 15:05
@ttnghia ttnghia changed the title Implement groupby::merge Implement groupby::merge for collect_list and collect_set May 31, 2021
Comment on lines +621 to +629
/**
* @brief Indicates whether the specified aggregation operation can be computed
* with a hash-based implementation.
*
* @param t The aggregation operation to verify
* @return true `t` is valid for a hash based groupby
* @return false `t` is invalid for a hash based groupby
*/
bool is_hash_aggregation(aggregation::Kind t) { return array_contains(hash_aggregations, t); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Separating the definition makes this no longer constexpr. If you want it available in the header, move it's definition there as well and keep it constexpr.

@sperlingxx
Copy link
Contributor

sperlingxx commented Jun 2, 2021

The implementation looks good. But I have some concern about the interface: how can we perform this mergeOp along with other AggOps (like Sum/Count/Max) ?

In the JNI wrapper of groupByAggregate, we combine multiple aggregation requests with the help of groupBy::aggregate (code link). Perhaps we can separate all merge requests from other requests, and handle them separately with groupBy::merge. Then we combine the results of groupBy::aggregate and groupBy::merge. I am not sure it is a good approach.

In addition, if this merge API is specialized for spark-rapids, the concatenation of multiple keys and values looks unnecessary. Because spark-rapids will concatenate all partial results (also with cudf::concatenate) before the phase 2 aggregation (merging partial results). code link
Therefore, in terms of spark-rapids, it looks better if we can run mergeOp with groupBy::aggregate instead of groupBy::merge.

@revans2
Copy link
Contributor

revans2 commented Jun 2, 2021

The implementation looks good. But I have some concern about the interface: how can we perform this mergeOp along with other AggOps (like Sum/Count/Max) ?

I totally agree this is why I wanted just a concat aggregation. Not some new merge op API. Just a concat that is a regualar aggregation, like collect list or collect set.

@ttnghia
Copy link
Contributor Author

ttnghia commented Jun 2, 2021

The implementation looks good. But I have some concern about the interface: how can we perform this mergeOp along with other AggOps (like Sum/Count/Max) ?

I totally agree this is why I wanted just a concat aggregation. Not some new merge op API. Just a concat that is a regualar aggregation, like collect list or collect set.

Initially I did it. But then I found out that the new API will need to operate on (merge) multiple pair of keys-values (which are the grouped keys and lists columns resulted from the previous collect_list), and those results may have very different numbers of rows. Since groupby::aggregate requires all the input requests to have the same number of rows, I decided to resort this out to a new groupby::merge API that can accept arbitrary size inputs.

Do you guys have any suggestion for this issue (mismatching numbers of input rows of the collect_list results for merging)?

@revans2
Copy link
Contributor

revans2 commented Jun 2, 2021

To merge the outputs we concat them into a single table and then call another aggregation on them. This is actually very simplified because it involves multiple machines shuffling the data around. We don't need a new merge API. Just concat_list and concat_set aggregations. You don't need to worry about how the data gets chopped up and redistributed. You just need to worry about adding in the desired aggregations we will handle the rest. We already do.

@jlowe
Copy link
Member

jlowe commented Jun 2, 2021

Do you guys have any suggestion for this issue (mismatching numbers of input rows of the collect_list results for merging)?

It's easier to think of this as it's own, standalone type of aggregation, i.e.: given a table, aggregate the list columns associated with a particular key by concatenating the list values together into a new list (and removing duplicates if it's collect_set merge).

As @sperlingxx said,

the concatenation of multiple keys and values looks unnecessary. Because spark-rapids will concatenate all partial results (also with cudf::concatenate) before the phase 2 aggregation (merging partial results).

Therefore there's no need to consider multiple tables or multiple collect_list results. It's just an aggregation like any other aggregation. Therefore there's no issue with mismatched result row counts. For this aggregation, there's just one input table. There are some key columns and a list column, the latter which will be aggregated with this new concat_lists aggregation (or whatever we want to call the aggregation operation). There's no "mismatch of row counts" because there's just keys and lists associated with keys. Sort the table by the key columns and concatenate all the sequential rows associated with the same key. There may be different numbers of rows associated for one key than another key, but that's no different than what collect_list or other aggregations need to handle.

@ttnghia
Copy link
Contributor Author

ttnghia commented Jun 2, 2021

Got it. Thanks all, I'm starting a new PR according to your suggestions.

@ttnghia ttnghia closed this Jun 2, 2021
@ttnghia ttnghia deleted the groupby_concat_lists branch July 12, 2021 19:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CMake CMake build issue libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] Support MERGE_LISTS and MERGE_SETS for groupby::aggregate
6 participants