Aggregates

Now that we have computed the sorted, filtered, and paginated rows of the original collection, we can compute any aggregates over those rows.

Each aggregate is computed in turn by the eval_aggregate function, and added to the list of all aggregates to return:

    let aggregates = query
        .aggregates
        .as_ref()
        .map(|aggregates| eval_aggregates(variables, aggregates, &paginated))
        .transpose()?;

The eval_aggregate function works by pattern matching on the type of the aggregate being computed:

  • A star_count aggregate simply counts all rows,
  • A column_count aggregate computes the subset of rows where the named column is non-null, and returns the count of only those rows,
  • A single_column aggregate is computed by delegating to the eval_aggregate_function function, which computes a custom aggregate operator over the values of the selected column taken from all rows.
fn eval_aggregate(
    variables: &BTreeMap<models::VariableName, serde_json::Value>,
    aggregate: &models::Aggregate,
    rows: &[Row],
) -> Result<serde_json::Value> {
    match aggregate {
        models::Aggregate::StarCount {} => Ok(serde_json::Value::from(rows.len())),
        models::Aggregate::ColumnCount {
            column,
            arguments,
            field_path,
            distinct,
        } => {
            let values = rows
                .iter()
                .map(|row| {
                    eval_column_field_path(variables, row, column, field_path.as_deref(), arguments)
                })
                .collect::<Result<Vec<_>>>()?;

            let non_null_values = values.iter().filter(|value| !value.is_null());

            let agg_value = if *distinct {
                non_null_values
                    .map(|value| {
                        serde_json::to_string(value).map_err(|_| {
                            (
                                StatusCode::INTERNAL_SERVER_ERROR,
                                Json(models::ErrorResponse {
                                    message: "unable to encode value".into(),
                                    details: serde_json::Value::Null,
                                }),
                            )
                        })
                    })
                    .collect::<Result<HashSet<_>>>()?
                    .len()
            } else {
                non_null_values.count()
            };
            serde_json::to_value(agg_value).map_err(|_| {
                (
                    StatusCode::INTERNAL_SERVER_ERROR,
                    Json(models::ErrorResponse {
                        message: "unable to encode value".into(),
                        details: serde_json::Value::Null,
                    }),
                )
            })
        }
        models::Aggregate::SingleColumn {
            column,
            arguments,
            field_path,
            function,
        } => {
            let values = rows
                .iter()
                .map(|row| {
                    eval_column_field_path(variables, row, column, field_path.as_deref(), arguments)
                })
                .collect::<Result<Vec<_>>>()?;
            eval_aggregate_function(function, &values)
        }
    }
}

The eval_aggregate_function function discovers the type of data being aggregated and then dispatches to a specific function that implements aggregation for that type.

fn eval_aggregate_function(
    function: &models::AggregateFunctionName,
    values: &[serde_json::Value],
) -> Result<serde_json::Value> {
    if let Some(first_value) = values.iter().next() {
        if first_value.is_i64() {
            let int_values = values
                .iter()
                .map(|value| {
                    value.as_i64().ok_or_else(|| {
                        (
                            StatusCode::BAD_REQUEST,
                            Json(models::ErrorResponse {
                                message: "column is not an integer".into(),
                                details: serde_json::Value::Null,
                            }),
                        )
                    })
                })
                .collect::<Result<Vec<i64>>>()?;

            eval_integer_aggregate_function(function, int_values)
        }
...

For example, integer aggregation is implemented by eval_integer_aggregate_function. In it, the min, max, sum, and avg functions are implemented.

#[allow(clippy::cast_precision_loss)]
fn eval_integer_aggregate_function(
    function: &models::AggregateFunctionName,
    int_values: Vec<i64>,
) -> Result<serde_json::Value> {
    match function.as_str() {
        "min" => Ok(serde_json::Value::from(int_values.into_iter().min())),
        "max" => Ok(serde_json::Value::from(int_values.into_iter().max())),
        "sum" => Ok(serde_json::Value::from(int_values.into_iter().sum::<i64>())),
        "avg" => {
            let count: f64 = int_values.len() as f64; // Potential precision loss (u64 -> f64)
            let sum: f64 = int_values.into_iter().sum::<i64>() as f64; // Potential precision loss (i64 -> f64)
            let avg = sum / count;
            Ok(serde_json::Value::from(avg))
        }
        _ => Err((
            StatusCode::BAD_REQUEST,
            Json(models::ErrorResponse {
                message: "invalid integer aggregation function".into(),
                details: serde_json::Value::Null,
            }),
        )),
    }
}