CHIP : Incremental Batch Feature Aggregation
Problem Statement
Chronon reads data partitions based on window sizes to compute features for event sources. For example, sum_clicks_past_30days
and count_visits_past_90days
. When computing aggregations, one of the major time-taking tasks is reading data from disk. If a feature needs 90 days lookback, Chronon reads 90 days event data to compute.
This CHIP is to add support for incremental aggregation, which will avoid reading the full dataset every time.
Proposed Solution
Divide the aggregate computation into two parts:
- Daily Aggregation: Includes reading one day partition and storing partial aggregates.
- Final Aggregation: Read all partial aggregates and compute the final aggregates.
For the above two steps to compute, we assume:
Daily Aggregation IRs
Given a data source:
- Reads the latest partition and generates aggregated values (IRs).
- These values will be stored in a new intermediate table.
- Schema of intermediate table is of IRs (different from final groupby output).
The above diagram shows the daily IRs for count
aggregation.
Final Aggregation (Final feature values)
- Read the intermediate table and compute the final aggregates.
- Number of partitions read depends on the window size.
Pros
- Daily aggregates reduce the amount of data read from disk.
- Final aggregation is done on the intermediate table, which is smaller than the original data source.
Cons
- Intermediate table is created for each groupby, which increases the storage cost.
- Any changes in the datasource will require re-computation of the intermediate table.
Optimizations
The proposed solution is a good start but can be improved further for invertible operators. Invertible operators are those which can be deleted. For example, count
and sum
are invertible operators.
Requirements
- Previous day's groupby output. (Groupby values are basically ML features. So will refer this as
feature store
) - User activity table with latest and historical partitions.
Usecases
Case 1: Unwindowed deletable
Eg: How many listings purchased by the user since signup.
These features are not windowed, which means the feature calculation happens over the lifetime.
Case 2: Windowed deletable
These features are windowed and have an inverse operator.
Eg: Number of listings (Count Aggregation) purchased by the user in the past 90 days
GroupBy(
sources=[table_name],
keys=["key_col1", "key_col2"],
aggregations=[
Aggregation(
input_column="inp_col",
operation=Operation.COUNT,
windows=[
Window(length=3, timeUnit=TimeUnit.DAYS),
Window(length=10, timeUnit=TimeUnit.DAYS)
]
)],
accuracy=Accuracy.SNAPSHOT
)
To compute the above groupby incrementally:
- Read the output table from groupby to get previous day's aggregated values.
- Read day 0 to add the latest activity.
- Read user activity on day 4, day 11 to delete the unwanted user data.
Case 3: Windowed non-deletable
These features are windowed and do not have an inverse operator.
Eg: What is the max purchase price by the user in the past 30 days
For non-deletable operators, we will go with the current behavior of Chronon where we load all the data/partitions needed to compute feature.
Pros
- For invertible operators, drastically reduces the amount of data to read.
- No need to read daily aggregated IRs.
Cons
- Applicable for only invertible operators.
- If there is one operator in a GroupBy which is non-invertible, then we need to read all the partitions.
- This introduces 2 paths to compute a groupby.
- Invertible: Just read latest/specific partitions and feature store.
- Non-invertible: Read all partitions.
Implementation
API Changes
Add incremental=True
if the feature compute needs to happen in incremental way in GroupBy API.
GroupBy(
sources=[table_name],
keys=["key_col1", "key_col2"],
aggregations=[....],
incremental_agg=True,
accuracy=Accuracy.SNAPSHOT
)
Need thrift changes for the flag.
Benefits
The proposed solution provides several benefits:
- Reduced data read from disk, resulting in faster computation times.
- Improved storage efficiency by storing partial aggregates in an intermediate table.
- Support for invertible operators, which can be deleted, reducing the amount of data to read.
Limitations
The proposed solution has some limitations:
- Applicable only for invertible operators.
- Requires additional storage for the intermediate table.
- Any changes in the datasource will require re-computation of the intermediate table.
Future Work
Future work includes:
- Implementing the proposed solution in Chronon.
- Testing and evaluating the performance of the proposed solution.
- Exploring ways to improve the solution for non-invertible operators.
Q&A: CHIP - Incremental Batch Feature Aggregation =====================================================
Q: What is the main goal of the CHIP?
A: The main goal of the CHIP is to add support for incremental aggregation, which will avoid reading the full dataset every time.
Q: How does the proposed solution work?
A: The proposed solution divides the aggregate computation into two parts: Daily Aggregation and Final Aggregation. Daily Aggregation includes reading one day partition and storing partial aggregates, while Final Aggregation reads all partial aggregates and computes the final aggregates.
Q: What are the benefits of the proposed solution?
A: The proposed solution provides several benefits, including reduced data read from disk, improved storage efficiency, and support for invertible operators.
Q: What are the limitations of the proposed solution?
A: The proposed solution has some limitations, including applicability only for invertible operators, additional storage required for the intermediate table, and the need for re-computation of the intermediate table in case of changes in the datasource.
Q: How does the proposed solution handle non-invertible operators?
A: For non-invertible operators, the proposed solution goes with the current behavior of Chronon, where all data/partitions needed to compute feature are loaded.
Q: What are the use cases for the proposed solution?
A: The proposed solution has several use cases, including unwindowed deletable features, windowed deletable features, and windowed non-deletable features.
Q: How does the proposed solution improve storage efficiency?
A: The proposed solution improves storage efficiency by storing partial aggregates in an intermediate table, which is smaller than the original data source.
Q: What are the API changes required for the proposed solution?
A: The proposed solution requires adding incremental=True
to the GroupBy API if the feature compute needs to happen in incremental way.
Q: What are the future work plans for the proposed solution?
A: The future work plans for the proposed solution include implementing the proposed solution in Chronon, testing and evaluating the performance of the proposed solution, and exploring ways to improve the solution for non-invertible operators.
Q: How does the proposed solution reduce data read from disk?
A: The proposed solution reduces data read from disk by storing partial aggregates in an intermediate table, which reduces the amount of data that needs to be read from disk.
Q: What are the implications of the proposed solution on data processing?
A: The proposed solution has implications on data processing, including reduced computation times, improved storage efficiency, and support for invertible operators.
Q: How does the proposed solution support invertible operators?
A: The proposed solution supports invertible operators by storing partial aggregates in an intermediate table, which can be deleted, reducing the amount of data to read.
Q: What are the challenges in implementing the proposed solution?
A: The challenges in implementing the proposed solution include implementing the proposed solution in Chronon, testing and evaluating the performance the proposed solution, and exploring ways to improve the solution for non-invertible operators.