Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: identified memory leak in receiver #10094

Closed
itizir opened this issue May 3, 2024 · 8 comments · May be fixed by #10173
Closed

pubsub: identified memory leak in receiver #10094

itizir opened this issue May 3, 2024 · 8 comments · May be fixed by #10173
Assignees
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@itizir
Copy link
Contributor

itizir commented May 3, 2024

Client

PubSub

Environment

Any

Go Environment

go version 1.21
Likely irrelevant

Expected behavior

Long-running processing, making repeated calls to Subscription.Receive() does not leak resources.

Actual behavior

gRPC StreamingPull streams remain open in the backgroud and accumulate over time.

Root cause

pullStream.cancel() never gets called, and CloseSend() is not enough to actually terminate the underlying stream.

Code

A minimal fix would be e.g.

--- a/pubsub/iterator.go
+++ b/pubsub/iterator.go
@@ -157,6 +157,9 @@ func (it *messageIterator) stop() {
        it.checkDrained()
        it.mu.Unlock()
        it.wg.Wait()
+       if it.ps != nil {
+               it.ps.cancel()
+       }
 }
 
 // checkDrained closes the drained channel if the iterator has been stopped and all
@@ -243,6 +246,9 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
        }
        // Any error here is fatal.
        if err != nil {
+               if status.Code(err) == codes.Canceled {
+                       err = io.EOF
+               }
                return nil, it.fail(err)
        }
        recordStat(it.ctx, PullCount, int64(len(rmsgs)))

Or this could be done either further up or down the call chain. For instance get Subscription.Receive to do it after everything else is done, so that the cancelation error does not appear sooner (which however may lead to unnecessary wait?).

Additional context

This could well be the same issue as #5888 and #5265, however those were closed, and I thought it would make more sense to reopen a new one with the full diagnostics.

@itizir itizir added the triage me I really want to be triaged. label May 3, 2024
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label May 3, 2024
@hongalex hongalex added status: investigating The issue is under investigation, which is determined to be non-trivial. triage me I really want to be triaged. and removed triage me I really want to be triaged. labels May 6, 2024
@hongalex
Copy link
Member

hongalex commented May 6, 2024

Can you clarify if this is an issue you encountered in production, or how you came across this issue?

I haven't fully verified the reason why pullStream.cancel() is not called, but comparing memory utilization using go tool -pprof with an application that constantly opens and closes streams does not show a memory leak based on my initial testing.

@itizir
Copy link
Contributor Author

itizir commented May 7, 2024

Hi! Yup, we noticed because it started showing up in production. The memory leak there is only really measurable over the scale of hours or days, however if I run something pathological like this, it's very apparent:

	//
	// set up client and subscription
	//

	for i := 0; ; i++ {
		ctx, cancel := context.WithCancel(context.Background())
		cancel()
		err = subscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
			m.Ack()
		})
		if err != nil {
			log.Fatal(err)
		}
		if i%1000 == 0 {
			log.Println("num goroutines:", runtime.NumGoroutine())
		}
	}

But even with something more reasonable (i.e. not cancelling the context straight away, but after receiving some messages and/or some sensible timeout), it's very clear that the program accumulates grpc stream goroutines.

Can you reproduce the above? What tests were you running?

@itizir
Copy link
Contributor Author

itizir commented May 7, 2024

Oh! And btw, this is what the pprof heap diff looks like (when we noticed the issue in our prod):

@hongalex
Copy link
Member

hongalex commented May 7, 2024

I was doing something like this:

for {
	ctx, cancel := context.WithCancel(ctx)
	go func() {
		time.Sleep(15 * time.Second)
		cancel()
	}()
	err = sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
		m.Ack()
	})
	fmt.Printf("got err from receive: %v\n", err)
	time.Sleep(10 * time.Second)
}

I was not actively receiving message on the stream when I cancel this though. From this, I've been running this for about 24 hours but have not noticed any memory leak in pprof (total memory is about 2MB)

I'll try with your code to see if I can reproduce this issue.

@itizir
Copy link
Contributor Author

itizir commented May 7, 2024

Hmm, I tried your test and dug into it a bit more: it's indeed somewhat subtle. The reason why you don't see the leak is that basically there's a race when stopping the 'Receive': the Recv on the underlying stream may or may not get called one last time after the shutdown was initiated (and the stream would get the EOF and actually shut down) - via iter.receive(maxToPull), and instead e.g. return in the case <-ctx.Done() just above or below.

I haven't fully figured out the ins and outs, but in your example, the 15s receive time seems to make it so that the 'Recv' always 'wins' the race, thus the stream shuts down. But in other cases, e.g. shorter receive window, or try for instance cancelling the context from within the message handler, it's the other way around, and bailing out before calling 'Recv' enough may consistently 'win', leaving the grpc streams hanging around...

Just recalling the relevant docs for reference:

To ensure resources are not leaked due to the stream returned, one of the following
actions must be performed:

 1. Call Close on the ClientConn.
 2. Cancel the context provided.
 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
    client-streaming RPC, for instance, might use the helper function
    CloseAndRecv (note that CloseSend does not Recv, therefore is not
    guaranteed to release all resources).
 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.

If none of the above happen, a goroutine and a context will be leaked, and grpc
will not call the optionally-configured stats handler with a stats.End message.

(so basically in the current code 3 may happen and clear the resources, but it's not guaranteed, and in some cases seems to be guaranteed not to happen. and my 'fix' suggested above is to simply ensure 2 happens...)

@hongalex
Copy link
Member

Yeah so after using your change, I was able to see a slow growth of memory usage from 3MB -> 22MB over the course of 2 hours.

Yeah given that the change is fairly small and shouldn't have any adverse effects, I would be happy to merge in a PR for this if you want to create one. Otherwise, I can create a PR for this next week and credit you.

@itizir
Copy link
Contributor Author

itizir commented May 10, 2024

Ah, sure, can do! I'll push that change up as it is, then, but feel free to solve the issue some other way if you prefer.

N.B. If you cancel the context before the call to sub.Receive() and don't sleep between iterations, the increase itsn't so slow... ;)

@hongalex hongalex added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. and removed status: investigating The issue is under investigation, which is determined to be non-trivial. labels May 13, 2024
@hongalex
Copy link
Member

Side note, the docs you linked are very relevant and probably what contributed to this issue in the first place. I think one issue is CloseAndSend was deferred in the sender goroutine, but that doesn't actually close the stream (which is contradicts the docs of the underlying method CloseSend):

// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.

I confirmed that calling it.ps.cancel there also works to fix this issue, but I think stop is a better place for it anyway, so your PR looks good.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants