Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(reduce transform): New setting for reduce transform: end_every_period_ms #20440

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog.d/reduce_end_every_period.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
A new configuration option `end_every_period_ms` is available on reduce transforms
If supplied, every time this interval elapses for a given grouping, the reduced value
for that grouping is flushed. Checked every `flush_period_ms`.

authors: charlesconnell
10 changes: 10 additions & 0 deletions lib/vector-config/src/external/serde_with.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::cell::RefCell;

use vector_config_common::{attributes::CustomAttribute, constants};

use crate::schema::generate_optional_schema;
use crate::{
num::NumberClass,
schema::{generate_number_schema, SchemaGenerator, SchemaObject},
Expand Down Expand Up @@ -132,3 +133,12 @@ impl Configurable for serde_with::DurationMilliSeconds<u64, serde_with::formats:
Ok(generate_number_schema::<u64>())
}
}

impl Configurable for Option<serde_with::DurationMilliSeconds<u64, serde_with::formats::Strict>> {
fn generate_schema(gen: &RefCell<SchemaGenerator>) -> Result<SchemaObject, GenerateError>
where
Self: Sized,
{
generate_optional_schema(&u64::as_configurable_ref(), gen)
}
}
7 changes: 7 additions & 0 deletions src/transforms/reduce/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ pub struct ReduceConfig {
#[configurable(metadata(docs::human_name = "Expire After"))]
pub expire_after_ms: Duration,

/// If supplied, every time this interval elapses for a given grouping, the reduced value
/// for that grouping is flushed. Checked every flush_period_ms.
#[serde_as(as = "Option<serde_with::DurationMilliSeconds<u64>>")]
#[derivative(Default(value = "Option::None"))]
#[configurable(metadata(docs::human_name = "End-Every Period"))]
pub end_every_period_ms: Option<Duration>,

/// The interval to check for and flush any expired events, in milliseconds.
#[serde(default = "default_flush_period_ms")]
#[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
Expand Down
10 changes: 10 additions & 0 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct ReduceState {
events: usize,
fields: HashMap<KeyString, Box<dyn ReduceValueMerger>>,
stale_since: Instant,
creation: Instant,
metadata: EventMetadata,
}

Expand All @@ -35,6 +36,7 @@ impl ReduceState {
Self {
events: 0,
stale_since: Instant::now(),
creation: Instant::now(),
fields,
metadata,
}
Expand Down Expand Up @@ -93,6 +95,7 @@ impl ReduceState {
pub struct Reduce {
expire_after: Duration,
flush_period: Duration,
end_every_period: Option<Duration>,
group_by: Vec<String>,
merge_strategies: IndexMap<KeyString, MergeStrategy>,
reduce_merge_states: HashMap<Discriminant, ReduceState>,
Expand Down Expand Up @@ -126,6 +129,7 @@ impl Reduce {
Ok(Reduce {
expire_after: config.expire_after_ms,
flush_period: config.flush_period_ms,
end_every_period: config.end_every_period_ms,
group_by,
merge_strategies: config.merge_strategies.clone(),
reduce_merge_states: HashMap::new(),
Expand All @@ -139,6 +143,12 @@ impl Reduce {
let mut flush_discriminants = Vec::new();
let now = Instant::now();
for (k, t) in &self.reduce_merge_states {
if let Some(period) = self.end_every_period {
if (now - t.creation) >= period {
flush_discriminants.push(k.clone());
}
}

if (now - t.stale_since) >= self.expire_after {
flush_discriminants.push(k.clone());
}
Expand Down
10 changes: 10 additions & 0 deletions website/cue/reference/components/transforms/base/reduce.cue
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ base: components: transforms: reduce: configuration: {
required: false
type: condition: {}
}
end_every_period_ms: {
description: """
If supplied, every time this interval elapses for a given grouping, the reduced value
for that grouping is flushed. Checked every flush_period_ms.
"""
required: false
type: uint: {
unit: "milliseconds"
}
}
expire_after_ms: {
description: """
The maximum period of time to wait after the last event is received, in milliseconds, before
Expand Down