Skip to content

Commit

Permalink
Emit a counter for when aws s3 and azure blob storage requests are made
Browse files Browse the repository at this point in the history
  • Loading branch information
flavioc committed Apr 30, 2024
1 parent e4e0ea5 commit b6c40da
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 8 deletions.
24 changes: 24 additions & 0 deletions src/internal_events/aws_s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;

#[derive(Debug)]
pub struct AwsS3RequestEvent<'a> {
pub bytes_size: usize,
pub events: usize,
pub bucket: &'a str,
pub s3_key: &'a str,
}

impl InternalEvent for AwsS3RequestEvent<'_> {
fn emit(self) {
debug!(
message = "Sending events.",
bytes = self.bytes_size,
events_len = self.events,
s3_key = self.s3_key,
bucket = self.bucket,
);
counter!("aws_s3_requests_sent_total", 1,
"bucket" => self.bucket.to_string());
}
}
24 changes: 24 additions & 0 deletions src/internal_events/azure_blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use metrics::counter;
use vector_lib::internal_event::InternalEvent;

#[derive(Debug)]
pub struct AzureBlobRequestEvent<'a> {
pub bytes_size: usize,
pub events: usize,
pub container_name: &'a str,
pub partition_key: &'a str,
}

impl InternalEvent for AzureBlobRequestEvent<'_> {
fn emit(self) {
debug!(
message = "Sending events.",
bytes = self.bytes_size,
events_len = self.events,
blob = self.partition_key,
container = self.container_name,
);
counter!("azure_blob_requests_sent_total", 1,
"container_name" => self.container_name.to_string());
}
}
8 changes: 8 additions & 0 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ mod aws_ecs_metrics;
mod aws_kinesis;
#[cfg(feature = "sources-aws_kinesis_firehose")]
mod aws_kinesis_firehose;
#[cfg(feature = "sources-aws_s3")]
mod aws_s3;
#[cfg(any(feature = "sources-aws_s3", feature = "sources-aws_sqs",))]
mod aws_sqs;
#[cfg(feature = "sinks-azure_blob")]
mod azure_blob;
mod batch;
mod codecs;
mod common;
Expand Down Expand Up @@ -164,8 +168,12 @@ pub(crate) use self::aws_ecs_metrics::*;
pub(crate) use self::aws_kinesis::*;
#[cfg(feature = "sources-aws_kinesis_firehose")]
pub(crate) use self::aws_kinesis_firehose::*;
#[cfg(feature = "sources-aws_s3")]
pub(crate) use self::aws_s3::*;
#[cfg(any(feature = "sources-aws_s3", feature = "sources-aws_sqs",))]
pub(crate) use self::aws_sqs::*;
#[cfg(feature = "sinks-azure_blob")]
pub(crate) use self::azure_blob::*;
pub(crate) use self::codecs::*;
#[cfg(feature = "sources-datadog_agent")]
pub(crate) use self::datadog_agent::*;
Expand Down
12 changes: 11 additions & 1 deletion src/sinks/aws_s3/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::io;

use bytes::Bytes;
use chrono::{FixedOffset, Utc};
use crate::internal_events::AwsS3RequestEvent;
use uuid::Uuid;
use vector_lib::codecs::encoding::Framer;
use vector_lib::event::Finalizable;
Expand Down Expand Up @@ -103,8 +104,17 @@ impl RequestBuilder<(S3PartitionKey, Vec<Event>)> for S3RequestOptions {

s3metadata.s3_key = format_s3_key(&s3metadata.s3_key, &filename, &extension);

let blob_data = payload.into_payload();

emit!(AwsS3RequestEvent {
bytes_size: blob_data.len(),
events: request_metadata.event_count(),
bucket: &self.bucket,
s3_key: &s3metadata.s3_key,
});

S3Request {
body: payload.into_payload(),
body: blob_data,
bucket: self.bucket.clone(),
metadata: s3metadata,
request_metadata,
Expand Down
14 changes: 7 additions & 7 deletions src/sinks/azure_blob/request_builder.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::internal_events::AzureBlobRequestEvent;
use bytes::Bytes;
use chrono::Utc;
use uuid::Uuid;
Expand Down Expand Up @@ -82,13 +83,12 @@ impl RequestBuilder<(String, Vec<Event>)> for AzureBlobRequestOptions {

let blob_data = payload.into_payload();

debug!(
message = "Sending events.",
bytes = ?blob_data.len(),
events_len = ?azure_metadata.count,
blob = ?azure_metadata.partition_key,
container = ?self.container_name,
);
emit!(AzureBlobRequestEvent {
bytes_size: blob_data.len(),
events: azure_metadata.count,
container_name: &self.container_name,
partition_key: &azure_metadata.partition_key
});

AzureBlobRequest {
blob_data,
Expand Down

0 comments on commit b6c40da

Please sign in to comment.