Skip to content

Commit

Permalink
Respect file extension setting for S3 destinations in Firehose provider
Browse files Browse the repository at this point in the history
The Firehose delivery stream implementation was not considering the file
extension setting when the target was defined as an S3 bucket. This fix
ensures that the specified file extension is appended to the S3 object's
key when writing data. The `_get_s3_object_path` method now accepts the
`file_extension` parameter to accommodate this enhancement.
  • Loading branch information
ozgenbaris1 committed Apr 12, 2024
1 parent a3b9ade commit 0162d87
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions localstack/services/firehose/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ def _put_records_to_s3_bucket(
):
bucket = s3_bucket_name(s3_destination_description["BucketARN"])
prefix = s3_destination_description.get("Prefix", "")
file_extension = s3_destination_description.get("FileExtension", "")

if role_arn := s3_destination_description.get("RoleARN"):
factory = connect_to.with_assumed_role(
Expand All @@ -777,15 +778,15 @@ def _put_records_to_s3_bucket(
)
batched_data = b"".join([base64.b64decode(r.get("Data") or r.get("data")) for r in records])

obj_path = self._get_s3_object_path(stream_name, prefix)
obj_path = self._get_s3_object_path(stream_name, prefix, file_extension)
try:
LOG.debug("Publishing to S3 destination: %s. Data: %s", bucket, batched_data)
s3.put_object(Bucket=bucket, Key=obj_path, Body=batched_data)
except Exception as e:
LOG.exception(f"Unable to put records {records} to s3 bucket.")
raise e

def _get_s3_object_path(self, stream_name, prefix):
def _get_s3_object_path(self, stream_name, prefix, file_extension):
# See https://aws.amazon.com/kinesis/data-firehose/faqs/#Data_delivery
# Path prefix pattern: myApp/YYYY/MM/DD/HH/
# Object name pattern: DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString
Expand All @@ -794,6 +795,10 @@ def _get_s3_object_path(self, stream_name, prefix):
pattern = "{pre}%Y/%m/%d/%H/{name}-%Y-%m-%d-%H-%M-%S-{rand}"
path = pattern.format(pre=prefix, name=stream_name, rand=str(uuid.uuid4()))
path = timestamp(format=path)

if file_extension:
path += file_extension

return path

def _put_to_redshift(
Expand Down

0 comments on commit 0162d87

Please sign in to comment.