-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
+str Add flatmapConcat with parallelism. #32024
base: main
Are you sure you want to change the base?
Conversation
if (isAvailable(out) && inflightSources.isEmpty) { | ||
push(out, single.elem) | ||
} else { | ||
inflightSources.enqueue(new InflightSingleSource(single.elem)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some allocation here
before:
after:
|
ae99e81
to
0be8cdd
Compare
I wonder if it is possible to write a Buffer.peek(n: Int) which peek from current to n, and by that, is it easy to impl Merge via this, just change |
@GreyPlane How about comment in line, thanks |
0be8cdd
to
6f2c507
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the implementation.
This will allow us to have more than 1 source pulled at a time keeping elements ordered from downstream perspective.
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
6f2c507
to
135ee1f
Compare
Seems should add another parameter |
Not obvious to me. So I can't see a scenario where you would like to continue after one of the upstream sources cancelled when using this operator. That being said, if you feel that could be useful, let's go 😉 |
@gael-ft that's true. but I saw your PR and you can return And for most case, I think we should make the I was wantted to optimize for |
@He-Pin I might be a bit too much use case centric (i.e. S3 object upload where we don't want any missing part).
So this question can probably be a valid question in some cases. But regarding the default value of In my understanding of Once again, just my opinion, but as long as this is parameter, users can use it as they want. |
@gael-ft Yes, let's just keep it simple for now. And I saw the |
push(out, sinkIn.grab()) | ||
} | ||
} | ||
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need take care for the upstream where fails immeditly
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Show resolved
Hide resolved
7d4965b
to
9813a33
Compare
Source(List(2, 3, 4)), | ||
Source.future(Future.successful(5)), | ||
Source.lazyFuture(() => Future.successful(6)))) | ||
.flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Random is good for trying it out while developing but means that we'd have no idea what value it fails for if it fails. Parameterize over a few values or use a single random chosen and logged on test class instantiation instead, so failures can be repeated.
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala
Outdated
Show resolved
Hide resolved
bd40615
to
92172b4
Compare
a7bf870
to
ca4a14b
Compare
ca4a14b
to
8bd6dc1
Compare
@johanandren How about I spliting the optimization to a dedicated PR from this one, I think this PR is too big |
Yes, I think that was an initial feedback from me, although I also said it might be fine. |
References #31958
@gael-ft Would you like to take a review, thanks?