You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm currently configuring Vector sinks to send data to an AWS Kinesis Stream. My input data consists of JSON objects with nested fields, and I need to use one of these nested fields (specifically _session_id) as the partition key for the Kinesis Stream. Here is a sample of the data:
Previously, I had the partition_key_field set to "timestamp", and the configuration was working fine and sank my data to kinesis correctly. However, I now need to use a nested field, specifically "data[0].attributes._session_id", as the partition key.
In my scenario, the data field comes in as a list, but only ever contains 1 list item in the list which is why I can safely use data[0] to attempt to parse that record from the list. When I made this adjustment, the parsing failed with an error indicating that the partition key does not exist.
2024-04-12T15:32:21.306393Z ERROR sink{component_kind="sink" component_id=kinesis_sink component_type=aws_kinesis_streams}: vector::internal_events::aws_kinesis: Partition key does not exist. partition_key_field=data[0].attributes._session_id error_type="parser_failed" stage="processing" internal_log_rate_limit=true
Is there a way to configure Vector sinks to properly parse nested fields for the Kinesis Stream partition key? Alternatively, are there any adjustments needed in my Vector configuration or transformation logic to ensure the correct parsing of nested fields?
Or the kinesis sink config is not capable of parsing the _session_id within that part of the config, is it possible to change the json_parser transformation to somehow get _session_id out of .message and add it to the top level alongside ingest_time and server_ingest_time ?
Any insights or suggestions would be greatly appreciated. Thank you!
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
I'm currently configuring Vector sinks to send data to an AWS Kinesis Stream. My input data consists of JSON objects with nested fields, and I need to use one of these nested fields (specifically _session_id) as the partition key for the Kinesis Stream. Here is a sample of the data:
Previously, I had the
partition_key_field
set to"timestamp"
, and the configuration was working fine and sank my data to kinesis correctly. However, I now need to use a nested field, specifically"data[0].attributes._session_id"
, as the partition key.In my scenario, the data field comes in as a list, but only ever contains 1 list item in the list which is why I can safely use data[0] to attempt to parse that record from the list. When I made this adjustment, the parsing failed with an error indicating that the partition key does not exist.
Here's a snippet of my current configuration:
and:
Is there a way to configure Vector sinks to properly parse nested fields for the Kinesis Stream partition key? Alternatively, are there any adjustments needed in my Vector configuration or transformation logic to ensure the correct parsing of nested fields?
Or the kinesis sink config is not capable of parsing the _session_id within that part of the config, is it possible to change the json_parser transformation to somehow get _session_id out of .message and add it to the top level alongside ingest_time and server_ingest_time ?
Any insights or suggestions would be greatly appreciated. Thank you!
Beta Was this translation helpful? Give feedback.
All reactions