Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accept an interface instead of []byte in producer's Put method #11

Open
owais opened this issue Nov 19, 2018 · 4 comments
Open

Accept an interface instead of []byte in producer's Put method #11

owais opened this issue Nov 19, 2018 · 4 comments

Comments

@owais
Copy link
Contributor

owais commented Nov 19, 2018

Put expects records to be written to Kinesis as []byte. This works fine for simple cases but often a more complex layer wraps kinesis-producer and needs more control/flexibility over how different cases (such as failures) are handled. For example, a program might want to handle failures in a special way but right now the FailureResult struct only contains the partition key and the raw data as bytes. There is no other identifying information that might help identify the records.

Accepting an interface instead of raw bytes would make the library a lot more flexible. For example,

type Record struct {
      id string
      data []byte
      failure_count int
}

func (r Record) Bytes() []byte {
     return r.data
}

func process(r Record) {
      producer.Put(r, "partition-key")
}

func processErrors() {
   for f := producer.NotifyFailures() {
      r := f.Item.(Record)
      if (r.failure_count > 3) {
           // log failure 
           return
      }
      r.failure_count++
      process(r)
   }
}

Or a record could marshal itself as bytes that kinesis-producer could use internally by calling the Read() method and then wouldn't have to unmarshal the bytes on every failure in order to be able to inspect the record (for specialized logging, retries, failure handling etc).

kinesis-producer API would add an interface definition

type Record interface {
     Read() []byte
}

producer.Put(r Record, k string)

type FailureRecord struct {
     Record Record
     ParitionKey string
}

Or partition key could be rolled into the record as well like:

type Record interface {
      Read() []byte
      PartitionKey() string
}

producer.Put(r Record)

In this case, failure channel would just return failed Records and client side implementation could look like:

type Record struct {
    ID string
    UserID string
    Timestamp time.Time
    // any other number of fields
}

func (r Record) Read() []byte {
    // marshal r and return []bytes
}

func (r Record) PartitionKey() string {
     return r.ID
}

The library could also ship with a simple implementation to cover simple cases. Usage would look like:

pr.Put(producer.Record{myData, myPartitionKey})
@a8m
Copy link
Owner

a8m commented Nov 19, 2018

I think the general idea is good and useful. I’ll give it more attention tomorrow morning.

@guyest
Copy link

guyest commented Aug 20, 2019

Has there been any progress on this front? It would be a great feature addition.

@a8m
Copy link
Owner

a8m commented Aug 25, 2019

I didn't have any progress on this, but I'm in favor of adding it, since it's actually needed for other uses cases as well, like #17.

@tiny-dancer
Copy link

tiny-dancer commented Dec 3, 2019

this would be quite useful, will look at submitting a PR.

quick question: do we think this should be added using the existing Put or as a new function, e.g. PutRecord?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants