Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to track if offset needs to honoured or not in Kinesis #13112

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

KKcorps
Copy link
Contributor

@KKcorps KKcorps commented May 8, 2024

This will allow us to resume from LATEST or EARLIEST instead of always starting from a sequence number

@KKcorps
Copy link
Contributor Author

KKcorps commented May 8, 2024

cc: @Jackie-Jiang

@KKcorps KKcorps requested a review from Jackie-Jiang May 8, 2024 19:17
@codecov-commenter
Copy link

codecov-commenter commented May 9, 2024

Codecov Report

Attention: Patch coverage is 33.33333% with 12 lines in your changes are missing coverage. Please review.

Project coverage is 46.61%. Comparing base (59551e4) to head (3198ee4).
Report is 488 commits behind head on master.

Files Patch % Lines
...not/common/metadata/segment/SegmentZKMetadata.java 33.33% 3 Missing and 1 partial ⚠️
...e/pinot/spi/config/table/SegmentZKPropsConfig.java 0.00% 3 Missing ⚠️
...egment/spi/index/metadata/SegmentMetadataImpl.java 33.33% 2 Missing ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 75.00% 1 Missing ⚠️
...ment/creator/impl/SegmentColumnarIndexCreator.java 0.00% 1 Missing ⚠️
...pache/pinot/spi/stream/PartitionGroupConsumer.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master   #13112       +/-   ##
=============================================
- Coverage     61.75%   46.61%   -15.15%     
- Complexity      207     1103      +896     
=============================================
  Files          2436     1928      -508     
  Lines        133233   102058    -31175     
  Branches      20636    16459     -4177     
=============================================
- Hits          82274    47571    -34703     
- Misses        44911    50975     +6064     
+ Partials       6048     3512     -2536     
Flag Coverage Δ
custom-integration1 ?
integration ?
integration1 ?
integration2 ?
java-11 46.61% <33.33%> (-15.10%) ⬇️
java-21 ?
skip-bytebuffers-false 46.61% <33.33%> (-15.14%) ⬇️
skip-bytebuffers-true ?
temurin 46.61% <33.33%> (-15.15%) ⬇️
unittests 46.61% <33.33%> (-15.14%) ⬇️
unittests1 46.61% <33.33%> (-0.28%) ⬇️
unittests2 ?

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

*/
package org.apache.pinot.plugin.stream.kinesis;

public enum OffsetStartStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we want to have a flag to mark the first segment of a partition. I think adding a flag in the ZK metadata is more general. Without the flag, we can treat the segment with sequence id 0 as the first segment. The reason why we still want to have a flag:

  • When consumption failed on segment 0, we might recreate segment 1 which is also the start of the partition
  • We can check if the flag exist to determine whether we are migrating from the inclusive offset to exclusive offset for existing tables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really getting this point. If this is persisted in ZK, it serves the same purpose right?

@@ -40,10 +40,20 @@
public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset {
private final String _shardId;
private final String _sequenceNumber;
private final OffsetStartStatus _startStatus;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it is not persistent to the offset string. Is this intentional?
We need to think about how to migrate the current table to this new format

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a miss. Made the change.
I have ensured backward compatibility so for old table we don't need to worry. the start status would always be RESUME for them which is the current behavior as well.

@KKcorps
Copy link
Contributor Author

KKcorps commented May 16, 2024

@Jackie-Jiang please do another review of this

@@ -323,6 +324,15 @@ public void setEndOffset(String endOffset) {
setValue(Segment.Realtime.END_OFFSET, endOffset);
}

public StreamContinuationMode getContinuationMode() {
return _znRecord.getEnumField(Segment.Realtime.CONTINUATION_MODE, StreamContinuationMode.class,
StreamContinuationMode.RESUME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see we might add another mode in the future? I feel a boolean field of whether it is the first segment of a streaming partition is good enough.

@@ -73,11 +74,12 @@ public KinesisConsumer(KinesisConfig config, KinesisClient kinesisClient) {
* Fetch records from the Kinesis stream between the start and end KinesisCheckpoint
*/
@Override
public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) {
public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get #12806 in first if it works well?

Copy link
Contributor Author

@KKcorps KKcorps May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I already tested it, merging it now.

requestBuilder = requestBuilder.shardIteratorType(_config.getShardIteratorType());
break;
}
default: //
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bad practice to leave unexpected mode unhandled. Throw exception instead

@@ -41,18 +41,25 @@ public class KinesisPartitionGroupOffset implements StreamPartitionMsgOffset {
private final String _shardId;
private final String _sequenceNumber;

public static final String STATUS_SEPARATOR = "::";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert the changes in this file?

@@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) {
* @throws TimeoutException If the operation could not be completed within timeout
* @return A batch of messages from the stream partition group
*/
default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMs)
default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int timeoutMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel a boolean is easier to understand, and I don't see other possible mode

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the javadoc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) This is backward incompatible (this is a public facing interface). We need to add default impl for the new added API and not changing existing signature

@@ -430,7 +431,8 @@ protected boolean consumeLoop()
// Update _currentOffset upon return from this method
MessageBatch messageBatch;
try {
messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset, _streamConfig.getFetchTimeoutMillis());
messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset, _segmentZKMetadata.getContinuationMode(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(MAJOR) Only the first batch should be count as ingested from new stream.
Within the fetchMessages() API, we should just a boolean inclusive to mark whether we should consume the current offset

@Jackie-Jiang
Copy link
Contributor

@swaminathanmanish

@KKcorps KKcorps force-pushed the offset_start_tracking branch 3 times, most recently from 3ef926c to ebb7eda Compare May 20, 2024 16:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants