Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] Support MERGE_LISTS and MERGE_SETS for groupby::aggregate #7839

Closed
sperlingxx opened this issue Apr 2, 2021 · 14 comments · Fixed by #8436
Closed

[FEA] Support MERGE_LISTS and MERGE_SETS for groupby::aggregate #7839

sperlingxx opened this issue Apr 2, 2021 · 14 comments · Fixed by #8436
Assignees
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS

Comments

@sperlingxx
Copy link
Contributor

Is your feature request related to a problem? Please describe.
To perform collect aggregations in Spark, (like other aggregation functions) it needs to go through two phases. In phase one, we perform collect aggregation within each node locally, which can be implemented by cuDF group_by with AggOp collect_list or collect_set. In phase two, we need to merge partial aggregation results from each node. In terms of collect aggregations, we concatenate multiple lists/sets into one. For instance,

Node A has a partial result of collect_list in group_by: [('A', [1, 2, 3]), ('B', [10, 20])]
Node B has a result: [('B', [30, 40]), ('C', [100, 200])]
Node C has a result: [('A', [4]), ('C', [300, 400])]

For phase two, we have a combined input table: [('A', [1, 2, 3]), ('B', [10, 20]), ('B', [30, 40]), ('C', [100, 200]), ('A', [4]), ('C', [300, 400])]
Then, we concatenate lists sharing same group key and we suppose to get the result:  [('A', [1, 2, 3, 4]), ('B', [10, 20, 30, 40]), ('C', [100, 200, 300, 400])]

I think we need two additional AggOps (concatenate_list and concatenate_set) for current feature.

@sperlingxx sperlingxx added feature request New feature or request Needs Triage Need team to review and classify Spark Functionality that helps Spark RAPIDS labels Apr 2, 2021
@sperlingxx
Copy link
Contributor Author

I would like to pick up this task if no experienced c++ contributors are free to work on this issue.

@jrhemstad
Copy link
Contributor

Can't you just concatenate the partial results and do another collect_list or collect_set?

Even if this needs a new feature, this isn't a groupby or an aggregation operation. This is like a "concat by key" or something.

@sperlingxx
Copy link
Contributor Author

sperlingxx commented Apr 3, 2021

Can't you just concatenate the partial results and do another collect_list or collect_set?

Even if this needs a new feature, this isn't a groupby or an aggregation operation. This is like a "concat by key" or something.

I think if we "concatenate the partial results and do another collect_list or collect_set". Then, we will get result like:
[('A', [[1, 2, 3], [4]]), ('B', [[10, 20], [30, 40]]), ('C', [[100, 200], [300, 400]])]
instead of
[('A', [1, 2, 3, 4]), ('B', [10, 20, 30, 40]), ('C', [100, 200, 300, 400])]

And I am not sure whether these concatenation ops should be regarded as groupby operations or not. But it will be nice if concatenation ops can be used with table::group_by.

@kkraus14 kkraus14 added libcudf Affects libcudf (C++/CUDA) code. and removed Needs Triage Need team to review and classify labels Apr 6, 2021
@ttnghia
Copy link
Contributor

ttnghia commented May 5, 2021

So we will have something called cudf::lists::concatenate_by_key:

  • Input: a key column (containing keys) and a lists column (containing lists values).
  • Output: A lists column containing the concatenated results

@kkraus14 Is there any API in Pandas that is similar to this? I'm trying to steer the proposed API to satisfy any potential need.

@kkraus14
Copy link
Collaborator

kkraus14 commented May 5, 2021

No there isn't a similar API. In general Pandas doesn't have many built-in APIs for list handling.

@ttnghia ttnghia changed the title [FEA] support concatenate_collection aggregations in groupby [FEA] Support concatenate lists by key May 5, 2021
@ttnghia ttnghia changed the title [FEA] Support concatenate lists by key [FEA] Support groupby collect_merge May 14, 2021
@ttnghia
Copy link
Contributor

ttnghia commented May 21, 2021

There is a couple of changes since my last plan. The last plan is to execute the groupby operations (including collect_list) on multiple batches of data then concatenate the results of collect_list together into a single result column. However, there are several issues with this approach:

  • There may be several groupby operations running together (such as collect_list and sum, min, max etc). The latter operations depend on the first one. It is better to execute those operations at the same time, synchronously, within one groupby execution.
  • Since all the aggregate operations are running together, it makes more sense to have another groupby operation (the final one of the running chain) that will merge the intermediate results of the others.

So, we will not have cudf::lists::concatenate_by_key. Instead, we need another groupby (aggregate kind) that can be called collect_merge.

PS: Above is my proposed idea, which is WIP. Things may change along the way until I have a final, working implementation.

@jrhemstad
Copy link
Contributor

I've no issue with the first point. AFAIK, collect_list/collect_set are already supported aggregations for groupby.

it makes more sense to have another groupby operation (the final one of the running chain) that will merge the intermediate results of the others

Why?

For instance, there is no groupby aggregation to merge the intermediate results from any other groupby operation. The typical pattern is to concat intermediate results and do another aggregation.

That would work fine in this case if you explode the intermediate results from the collect_list/collect_set, concat them, and then run another collect_set/list.

@revans2
Copy link
Contributor

revans2 commented May 24, 2021

That would work fine in this case if you explode the intermediate results from the collect_list/collect_set, concat them, and then run another collect_set/list.

That is sort of correct. In isolation this is 100% correct, but you have to look at how the group by API is used. The group by APIs have no guarantee on the order of the output rows. Also columns that are not a part of a given aggregation cannot be preserved in the output of the aggregation. That is inherent in how group by aggregations work. As such multiple aggregations are grouped together, not just for efficiency but so we can do them correctly. If we do what you ask that does not just mean we have to explode the key columns along with the list/set operation, Which is already memory and performance inefficient. bit for each list/set operation that we do we would also have to do a join between the result of each list/set operation and the rest of the operations that were done. If all we care about is getting an answer we can do it. But if you want something to actually be fast we need a better solution.

@revans2
Copy link
Contributor

revans2 commented May 24, 2021

Perhaps the name is just not great. What we really want is a concat aggregation instead of a collect_merge. input of multiple lists and we concat them together.

@jrhemstad
Copy link
Contributor

no guarantee on the order of the output rows

I see. I wasn't thinking of merging multiple distributed aggregations and wanting the order to be consistent. That makes more sense.

Perhaps the name is just not great. What we really want is a concat aggregation instead of a collect_merge. input of multiple lists and we concat them together.

Adding a concat aggregation is certainly much more appealing as it is more intuitive and general purpose. So for a distributed collect_set you'd do a concat + drop_list_duplicates (this is certainly how it'd be implemented internally, so you don't really have to worry about decreased efficiency here) ?

@ttnghia
Copy link
Contributor

ttnghia commented May 24, 2021

Humm, I still feel concat is not enough meaningful. It basically does only concatenation of intermediate results of other aggregates, not a standalone aggregate. In addition, we should use merge, not concat, since we don't just append results of one kind one after another. Some longer name like merge_intermediate makes more sense to me but it may be so long and you may not like it (I'm fine with this name). Any better suggestion would be appreciated.

@jrhemstad
Copy link
Contributor

I still feel concat is not enough meaningful. It basically does only concatenation of intermediate results of other aggregates, not a standalone aggregate.

There is no notion of "intermediate results" in libcudf as libcudf is a single GPU library. As such, concat is just what it says: it concatenates the lists across all members of the group.

since we don't just append results of one kind one after another

Why does concat do more than appending the lists from all members of the group?

@revans2
Copy link
Contributor

revans2 commented May 24, 2021

Why does concat do more than appending the lists from all members of the group?

It shouldn't I would propose a concat_lists that does exactly what you are suggesting. I would also propose a concat_sets that would deduplicate the values as well. We can make it all work with concat_lists so I would have that be the priority.

@ttnghia ttnghia assigned ttnghia and unassigned ttnghia May 26, 2021
@ttnghia ttnghia changed the title [FEA] Support groupby collect_merge [FEA] Support groupby concat_lists May 30, 2021
@ttnghia ttnghia changed the title [FEA] Support groupby concat_lists [FEA] Support groupby merge_lists May 30, 2021
@ttnghia
Copy link
Contributor

ttnghia commented May 30, 2021

Why does concat do more than appending the lists from all members of the group?

Intuitively, concat is just appending columns together. For example, if it operates on lists columns like [[1, 2], [3], [4, 5, 6]] and [[], [7, 8], [9]] then the result will be [[1, 2], [3], [4, 5, 6], [], [7, 8], [9]] (vertical concat) or [[1, 2], [3, 7, 8], [4, 5, 6, 9]] (horizontal concat).

The proposed (groupby) API here operates on values tables that are pairs of keys-values columns/tables which are the results of previous collect_list or collect_set. For example, keys = [1, 2, 3], values = [[1, 2], [3], [4, 5, 6]] and keys = [1, 3, 4], values = [[], [7, 8], [9]]. We want to merge those keys-values columns to have the result that looks like keys = [1, 2, 3, 4], values = [[1, 2], [3], [4, 5, 6, 7, 8], [9]]. That's why I previously worked on something called lists::concat_by_keys (which is abandoned because I don't find other use cases except here).

You may find it a little bit confusing here as typically one groupby operation only has one shared keys table for all the requests. In my draft implementation, the values attaching to each request is a table of pair of keys-values output from previous collect_list (collect_set) calls as shown in the example above. As such, I may need to have a new interface for groupby that accepts requests with different numbers of rows in their values tables.

@ttnghia ttnghia changed the title [FEA] Support groupby merge_lists [FEA] Support groupby::merge May 31, 2021
@ttnghia ttnghia changed the title [FEA] Support groupby::merge [FEA] Support MERGE_LISTS and MERGE_SETS for groupby::aggregate Jun 3, 2021
rapids-bot bot pushed a commit that referenced this issue Jun 22, 2021
Groupby aggregations can be performed for distributed computing by the following approach:
 * Divide the dataset into batches
 * Run separate (distributed) aggregations over those batches on the distributed nodes
 * Merge the results of the step above into one final result by calling `groupby::aggregate` a final time on the master node

This PR supports merging operations for the lists resulted from distributed aggregate `collect_list` and `collect_set`.

Closes #7839.

Authors:
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Jake Hemstad (https://github.com/jrhemstad)
  - Mark Harris (https://github.com/harrism)

URL: #8436
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS
Projects
None yet
5 participants