Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use In AWS Lambda #28

Open
future-mano opened this issue May 16, 2020 · 2 comments
Open

Use In AWS Lambda #28

future-mano opened this issue May 16, 2020 · 2 comments

Comments

@future-mano
Copy link

future-mano commented May 16, 2020

I want to use this library with AWS Lambda but producer cannot be reused after stop.
Below example code occuers Unable to Put record. Producer is already stopped when runs at second times.

package main

import (
	"fmt"
	"github.com/a8m/kinesis-producer"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/kinesis"
	"golang.org/x/sync/errgroup"
	"os"
)

var pr = producer.New(&producer.Config{
		StreamName: os.Getenv("KINESIS_STREAM"),
		Client:     kinesis.New(session.Must(session.NewSession())),
	})

func handle(e events.KinesisEvent) error {
	eg := errgroup.Group{}

	pr.Start()
	eg.Go(func() error {
		for r := range pr.NotifyFailures() {
			return r
		}
		return nil
	})

	for _, r := range e.Records {
		// Any logic for each records
		if err := pr.Put(r.Kinesis.Data, r.Kinesis.PartitionKey); err != nil {
			return err
		}
	}
	pr.Stop()
	return eg.Wait()
}

func main() {
	lambda.Start(handle)
}

Of course, if I generate Producer every time, it works well but I want to reuse the producer as much as possible.

var kc = kinesis.New(session.Must(session.NewSession()))

func handle(e events.KinesisEvent) error {
	var pr = producer.New(&producer.Config{
		StreamName: os.Getenv("KINESIS_STREAM"),
		Client:     kc,
	})

	eg := errgroup.Group{}

	pr.Start()
	eg.Go(func() error {
		for r := range pr.NotifyFailures() {
			return r
		}
		return nil
	})

	for _, r := range e.Records {
		if err := pr.Put(r.Kinesis.Data, r.Kinesis.PartitionKey); err != nil {
			return err
		}
	}
	pr.Stop() 
	return eg.Wait()
}

Is it possible to make the Producer discretion by making the flush() method of the Producer public or by Stop() and then Start() again?

@timesking
Copy link

I only do pr.Flush() in handle function, which is good enough

@tiny-dancer
Copy link

FYI: #24

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants