-
Notifications
You must be signed in to change notification settings - Fork 793
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for multipart integ test failure #5176
Conversation
- Use content-length for range-get in TransferProgressUpdater, when multipart is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we kick off an integ test on the PR as well?
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Show resolved
Hide resolved
@@ -196,7 +208,7 @@ private boolean doEmit() { | |||
} | |||
|
|||
private void handleCancelState() { | |||
synchronized (this) { | |||
synchronized (cancelLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if !onStreanCalled().get()
is no longer true after we make the check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
onStreanCalled
will only go from false -> true
once and never back to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant more that just because onStreamCalled.get() == false
when we evaluate the condition, it doesn't mean it's still false
when we execute the body of the if
. Is that going to be an issue?
if (!onStreamCalled.get()) { // value is false
// value could have changed to true after we made the check
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aaah I see what you mean. Looking at it, yep, it might cause a NPE at line 293. Adding synchronization around the if (onStreamCalled.compareAndSet(false, true))
on cancelLock
there at line 293 should prevent it.
publisherToUpstream.send(byteBuffer).whenComplete((r, t) -> { | ||
if (t != null) { | ||
handleError(t); | ||
return; | ||
} | ||
subscription.request(1); | ||
if (!isCancelled.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar question as above, is it possible this is no longer true after we check it? If so, is it an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above, onStreanCalled
will only go from false -> true
once and never back to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the check here assuming subscription is cancelled properly or will be cancelled at this point?
https://github.com/reactive-streams/reactive-streams-jvm
If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
Note "eventually" not "right away"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do need to stop right away in the case of the return future being cancelled, for example when pausing with transfer-manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if we don't check it here and continue to send request demand? It should be no-op right?
Yep, already did |
.../main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java
Show resolved
Hide resolved
@@ -118,6 +121,15 @@ private SplittingTransformer(AsyncResponseTransformer<ResponseT, ResultT> upstre | |||
Validate.notNull(maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); | |||
this.maximumBufferInBytes = Validate.isPositive( | |||
maximumBufferSizeInBytes, "maximumBufferSizeInBytes"); | |||
|
|||
this.resultFuture.whenComplete((r, e) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test case for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding a unit test for this. Can we add a end-to-end functional test (wiremock test) to verify the cancellation behavior?
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Outdated
Show resolved
Hide resolved
publisherToUpstream.send(byteBuffer).whenComplete((r, t) -> { | ||
if (t != null) { | ||
handleError(t); | ||
return; | ||
} | ||
subscription.request(1); | ||
if (!isCancelled.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need the check here assuming subscription is cancelled properly or will be cancelled at this point?
https://github.com/reactive-streams/reactive-streams-jvm
If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.
Note "eventually" not "right away"
…zed tests that take tm as argument
Rerunning integ tests after fixing parameterized tests. They are passing on my machine now at least. 🤞🏼 |
Quality Gate failedFailed conditions See analysis details on SonarCloud Catch issues before they fail your Quality Gate with our IDE extension SonarLint |
.../sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/SplittingTransformer.java
Show resolved
Hide resolved
publisherToUpstream.send(byteBuffer).whenComplete((r, t) -> { | ||
if (t != null) { | ||
handleError(t); | ||
return; | ||
} | ||
subscription.request(1); | ||
if (!isCancelled.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if we don't check it here and continue to send request demand? It should be no-op right?
@@ -232,14 +267,23 @@ private class IndividualTransformer implements AsyncResponseTransformer<Response | |||
public CompletableFuture<ResponseT> prepare() { | |||
this.individualFuture = new CompletableFuture<>(); | |||
if (preparedCalled.compareAndSet(false, true)) { | |||
if (isCancelled.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this logic here?
individualFuture.whenComplete((r, e) -> { | ||
if (isCancelled.get()) { | ||
handleCancelState(); | ||
handleSubscriptionCancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remind me why we need to check isCancelled here for every individual future?
The
S3TransferManagerDownloadPauseResumeIntegrationTest
was failing with S3 Multipart client for 2 reasons..pause()
This PR is to fix both of those issues.
Motivation and Context
Errors were raised during integration test execution.
Modifications
Fix for the errors consist in
Testing
Failing integration test run locally.
Added unit test.