Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Does Kinesis connector support reading compressed payloads on stream? #105

Open
sudeshnasengupta opened this issue Nov 10, 2021 · 1 comment

Comments

@sudeshnasengupta
Copy link

Hi,
We currently use the Qubole open source Kinesis connector in a Spark structured streaming application on AWS, to read payloads from the Kinesis stream, and generate Parquet files. Would it be possible to use Qubole Kinesis connector for reading compressed payloads (say, in GZip format), that have been posted to the stream using the same compression format as is recognized from the reading side (i.e. in terms of Kinesis Qubole source)? Thanks.

@d4r3topk
Copy link

d4r3topk commented Jan 2, 2022

Hello! I have been using this connector and I haven't seen a configuration for automatically unzipping the content. If you are expecting data from kinesis as GZIP compressed, you can use the following snippet to uncompress it in Python.

import gzip
from io import BytesIO
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def decompress(row):
    return gzip.GzipFile(mode="rb", fileobj=BytesIO(row)).read()

def decode_fn(row):
    return row.decode("utf8")
    
decompress_udf = udf(decompress, StringType())
decode_udf = udf(decode_fn, StringType())

kinesis_stream_name = "xxxx"
kinesis_endpoint_url = "xxxx"

data_source = (
        spark.readStream.format("kinesis")
        .option("streamName", kinesis_stream_name)
        .option("endpointUrl", kinesis_endpoint_url)
        .option("startingPosition", "TRIM_HORIZON")
        .load()
    )

# If you're using kinesis stream, then all the uncompressed parameters, eg. ApproximateArrivalTimestamp, 
# and others will be available here like so col('ApproximateArrivalTimestamp')
data_source = data_source.select(
        decode_udf(decompress_udf(col("data")))
    ).alias("data")
    
# Now you're data is in uncompressed format parallelized according to the number of partitions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants