Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backfill] allow externally partitioned segment uploads for upsert tables #13107

Open
wants to merge 7 commits into
base: master
Choose a base branch
from

Conversation

rohityadav1993
Copy link
Contributor

@rohityadav1993 rohityadav1993 commented May 8, 2024

feature release-notes

Summary

This change will allow users to specify the partitionId for an externally generated segment as part of segment metadata. This enables support for backfilling/bootstrapping an upsert realtime table with externally generated segments:

  1. where the upstream (e.g. Kafka) is partitioned based on an algorithm(custom) not supported in Pinot via PartitionFunctionType (Modulo, Murmur, Murmur3, ByteArray, HashCode, BoundedColumnValue)
  2. If the partitioning is done on a column not indexed in the table. e.g. upstream defines a compound PK and partitions on concatenated PK value and writes to stream but does not propagate the concatenated PK, so a single column can not be defined in table config for partitioning.

Background

Why should the data be partitioned
Upsert table requires the upstream data to be partitioned so data of a primary key is in a single partition. This ensures each host responsible for PK --> recordLocation has all the rows of PKs upserted in a single host and ensures unique row per PK.

How does realtime ingestion handle partitions
The stream ingesting to Pinot should already be parittioned externally. Pinot generates partitions to instnace mapping and persists in ZK. Each server instance responsible for partition will consume from the stream partition and generate realtime segments named using convention, <tablename>__<partitionId>__<seqId>__<createTS>. This naming convention helps to identify segment partitions when needed for various operations, e.g. recomputing segment assignment during rebalance. Pinot essentially is agnostic of any partitioning done in the upstream.(ref: design doc)

Upsert table backfill support
A prevoiusly implementation enhanced the architecture to allow uploading batch generated segments to Upsert tables (ref: design doc). It provides support to generate a segment using offline data and identifies such segments with Segment.Status = UPLOADED
Partition infromation in an uploaded segment is captured by utlising column partitioning config which is primarily used for query routing optimisation(pinot docs).

Proposal

Similar to realtime ingestion, the way streams provide partitionId, uploaded segments should provide the partition they belong to. Changes part of the PR:

  1. New segment naming convention for segments being uploaded to upsert table with partition id and sequence id encoded in name UploadedRealtimeSegmentName. Similar to LLCSegmentName.
  2. Use partitionId in the UploadedRealtimeSegmentName to assign segment to instance
  3. For comparison tie during addOrReplaceSegment(). Resolve comparison value tie based on below logic. Currently this is missing and replace is inconsistent if an upsert table has LLCSegment and batch generated segments.
    a. When the replacing segment and current segment are of {@link LLCSegmentName} then the PK should resolve to row in segment with higher sequence id.
    b. When the replacing segment and current segment are of {@link UploadedRealtimeSegmentName} then the PK should resolve to row in segment with higher sequence id, creation time.
    c. When either is of type {@link UploadedRealtimeSegmentName} then resolve on creation time, if same(rare scenario) then give preference to uploaded time
  4. Augment SegmentUtils.getRealtimeSegmentPartitionId() to assign an uploaded realtime segment correctly.

Changes

  1. Add field _uploadedSegmentPartitionId to SegmentGeneratorConfig class to provide external partition id for segment generation. If not set defaults to -1 and is used to infer segment name generation property if not provided.
  2. New segment name generator UploadedRealtimeSegmentNameGenerator
  3. Update getRealtimeSegmentPartitionId() to parse uploaded segment partitionId from segment name.
  4. SegmentGeneratorConfig defines _uploadedSegmentPartitionId to infer generator for UploadedRealtimeSegmentName and generate segments with provided partitionId

Note: The uploaded segment naming convention is decided as uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}. This ensures, similar naming as LLCSegmentName, removes need to do ZK lookup in various critical paths, uploaded prefix enforces the type of segment and ab optional suffix can be used to encode additional info, e.g. source of segment(spark, flink, minion, etc)

Alternatives

  1. Use LLCSegmentName for offline generated segments to upload:
    There are several assumptions with LLCSegmentName(did not exhaustively check). But one basic one is this when we try to load a segment named as a LLCSegmentName.
    Got invalid segment metadata when adding segment: testtable_REALTIME__0__0__19700101T0000Z for table: testtable_REALTIME, reason: New uploaded LLC segment must have start/end offset in the segment metadata
    
    PinotLLCRealtimeSegmentManager#ensureAllPartitionsConsuming() also assumes LLCSegmentNames are all realtime generated segments and tries to create consuming segment.
    Modifying the current assumptions on LLCSegment could be more complex to manage an uploaded segment.
  2. Extend segment metadata to capture uploaded segments partitionId, persist in ZK.
    a. Increases calls to ZK for getting partitionId.
  3. [Implemented] Naming scheme for uploaded segment e.g table_name_{starttime}{endtime}{partitionId}_{seqId}
    This has a similar implementation effort but introduces implicit logic to handle uploaded segments. Currently offline generated segments are not restricted to any naming convention and users can handle segment name clashes if any themselves.

Testing plan

  1. Unit tests
  2. Functional tests:
    Plan:
    a. Use upsertMeetupRsvp example table to backfill
    b. Generate a segment with CreateSegmentCommand with below config, segmentName = uploaded__upsertMeetupRsvp__1__3__1717357812344
    ....
    segmentGeneratorConfig.setUploadedSegmentPartitionId(1);
    
    c. Upload segment using: localhost:9001/segments?tableName=upsertMeetupRsvp&tableType=REALTIME&enableParallelPushProtection=false&allowRefresh=false
    Observations
    a. Segment uploaded in Healthy state:
    b. ZkMetadata:
    {
      "id": "uploaded__upsertMeetupRsvp__1__3__1717357812344",
      "simpleFields": {
        "segment.crc": "1901096753",
        "segment.creation.time": "1717357812344",
        "segment.download.url": "http://127.0.0.1:9001/segments/upsertMeetupRsvp/uploaded__upsertMeetupRsvp__1__3__1717357812344",
        "segment.end.time": "1712859397739",
        "segment.end.time.raw": "1712859397739",
        "segment.index.version": "v3",
        "segment.partition.metadata": "{\"columnPartitionMap\":{\"event_id\":{\"partitions\":[0],\"functionName\":\"HashCode\",\"functionConfig\":null,\"numPartitions\":2}}}",
        "segment.push.time": "1717358306969",
        "segment.realtime.status": "UPLOADED",
        "segment.size.in.bytes": "2151",
        "segment.start.time": "1712859397739",
        "segment.start.time.raw": "1712859397739",
        "segment.time.unit": "MILLISECONDS",
        "segment.total.docs": "1"
      },
      "mapFields": {},
      "listFields": {}
    }
    
    c. Uploaded segment in query response:

image

Relates to: #12987, #10896

@codecov-commenter
Copy link

codecov-commenter commented May 9, 2024

Codecov Report

Attention: Patch coverage is 68.75000% with 35 lines in your changes missing coverage. Please review.

Project coverage is 62.03%. Comparing base (59551e4) to head (b7621b1).
Report is 556 commits behind head on master.

Files Patch % Lines
...inot/common/utils/UploadedRealtimeSegmentName.java 73.01% 16 Missing and 1 partial ⚠️
...tion/batch/common/SegmentGenerationTaskRunner.java 9.09% 10 Missing ⚠️
...t/ConcurrentMapPartitionUpsertMetadataManager.java 64.70% 0 Missing and 6 partials ⚠️
...ot/segment/spi/creator/SegmentGeneratorConfig.java 87.50% 1 Missing ⚠️
...tor/name/UploadedRealtimeSegmentNameGenerator.java 90.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #13107      +/-   ##
============================================
+ Coverage     61.75%   62.03%   +0.28%     
+ Complexity      207      198       -9     
============================================
  Files          2436     2542     +106     
  Lines        133233   139698    +6465     
  Branches      20636    21616     +980     
============================================
+ Hits          82274    86658    +4384     
- Misses        44911    46543    +1632     
- Partials       6048     6497     +449     
Flag Coverage Δ
custom-integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration <0.01% <0.00%> (-0.01%) ⬇️
integration1 <0.01% <0.00%> (-0.01%) ⬇️
integration2 0.00% <0.00%> (ø)
java-11 61.99% <68.75%> (+0.28%) ⬆️
java-21 61.91% <68.75%> (+0.28%) ⬆️
skip-bytebuffers-false 62.02% <68.75%> (+0.27%) ⬆️
skip-bytebuffers-true 61.89% <68.75%> (+34.16%) ⬆️
temurin 62.03% <68.75%> (+0.28%) ⬆️
unittests 62.02% <68.75%> (+0.28%) ⬆️
unittests1 46.56% <64.35%> (-0.33%) ⬇️
unittests2 27.74% <10.71%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels May 11, 2024
@Jackie-Jiang Jackie-Jiang requested a review from klsince May 11, 2024 06:17
@rohityadav1993 rohityadav1993 force-pushed the upsert_backfill branch 2 times, most recently from c767baf to 83fc22e Compare May 12, 2024 20:41
@rohityadav1993 rohityadav1993 marked this pull request as ready for review May 13, 2024 05:44
Copy link
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rohityadav1993 for the PR, left a couple of minor comments.

@@ -48,6 +53,21 @@ public SegmentPartitionMetadata(
@Nonnull @JsonProperty("columnPartitionMap") Map<String, ColumnPartitionMetadata> columnPartitionMap) {
Preconditions.checkNotNull(columnPartitionMap);
_columnPartitionMap = columnPartitionMap;
_uploadedSegmentPartitionId = -1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to define a constant that can be used at other places directly checking for -1, may be with a better name.

public static final int NON_EXTERNAL_PARTITION_ID = -1;

@@ -35,4 +47,32 @@ public void testGetSegmentCreationTimeMs() {
segmentZKMetadata.setPushTime(2000L);
assertEquals(SegmentUtils.getSegmentCreationTimeMs(segmentZKMetadata), 2000L);
}

@Test
public void testGetUploadedRealtimeSegmentPartitionId() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to add a UT that checks for SegmentPartitionMetadata.getUploadedSegmentPartitionId -1 when it is not externally partitioned.

@Jackie-Jiang
Copy link
Contributor

cc @snleee @swaminathanmanish to take a look

@JsonCreator
public SegmentPartitionMetadata(
@Nullable @JsonProperty("columnPartitionMap") Map<String, ColumnPartitionMetadata> columnPartitionMap,
@Nullable @JsonProperty(value = "uploadedSegmentPartitionId", defaultValue = "-1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rohityadav1993 : we could just upload segments with the appropriate name format right? Can you add some reasoning in the PR Description about why that doesn't work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the description with an Alternatives section. There are assumptions in multiple places in Pinot codebase when using LLCSegmentName which require a much more complex handling of uploaded segments from realtime segments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are a few places that directly get columnPartitionMap and iterate it to get partitionId (you can find those places by searching usage of method getColumnPartitionMap() in this class), and this new field might fail those places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@klsince, I validated that getColumnPartitionMap() is being called from broker code, which is needed for query routing. This should still hold good if partition column is configured for the table.

Adding additional metadata field for parititionId is to make sure the uploaded segments are placed together with the stream partitions assigned to hosts by controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see one usage in SegmentAssignmentUtils.getPartitionId() used by ReplicaGroupSegmentAssignmentStrategy for assigning offline segments.

I think we can fail fast here if column partition and uploadedSegmentPartitionId both are provided as uploadedSegmentPartitionId is more for keeping the assignment same as stream partitions for upserts but it does not make sense for an offline table.

Open to better name suggestion for uploadedSegmentPartitionId in case it sounds misleading.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense. thanks for the explanation.

Another concern is the comparison logic (as in ConcurrentMapPartitionUpsertMetadataManager) to break tie is not be deterministic for segments not in LLC naming pattern. While processing segments not in LLC name pattern, segments added first are favored, but we can not control which segments get processed first next time when server restarts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you have proposed in description that we can use a new naming pattern for uploaded segment. Similarly, I'd propose something a bit simpler: uploaded_{tableName}__{partitionId}__{creationTime}, with 3 fields as separated by __ (double underscore), in order to provide {partitionId} info for segment assignment and help break tie with {creationTime}.

The uploaded_ prefix can be anything or none, but better to have it to lookup uploaded segments easily, e.g. MergeRollup task names the segments it generates with a prefix merged_ when uploading them to the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uploaded_{tableName}__{partitionId}__{creationTime}

We will have to put a seqId in case multiple segments are to be uploaded. Something like uploaded_{tableName}__{partitionId}__{sequenceId}__{creationTime}

Do we enforce naming conventions? I only see LLCSegmentName. I can add one for uploaded realtime segments but wasn't preferring this approach if this is not something strictly enforced.

Additionally, we will have to add a SegmentNameGenerator implementation so that the user workload generating and uploading segments can create these segments.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually dropped {sequenceId} intentionally, otherwise, the segment name becomes LLC format, and would cause the complexities you mentioned in another alternative proposal, like the checks on start/end offsets. In fact, the endOffset of latest LLC segment (the one with max seqId) is used to resume stream consumption, and we probably don't want to affect that logic with uploaded segments.

Do we enforce naming conventions? ... ... add a SegmentNameGenerator implementation ...

I don't think so. We may need one just for uploading segments to upsert tables, due to its special requirements on partition id and to break comparison ties. And good point for the need of a new name generator.

btw, from what I learnt while reviewing this PR, there seems a design choice for RT segments that the {partitionId} encoded in the segment name is the source of truth, rather than the one kept in ZK metadata as there might no partition info at all in ZK metadata. So following on that design principle, I'd prefer to keep encoding the {partitionId} in segment name for uploaded segments as well. In this way, we can 1) avoid the cost of reading ZK metadata every time we need partitionId for the uploaded segments; 2) avoid changes on persistent segment metadata in ZK or on disk, which might make things a bit easier when to consider upgrade/downgrade.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summarising the chat discussion with @klsince here:
Plan to use naming convention: uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}

  1. The sequenceId is needed for scenarios where bulk data can be loaded per day and creation time is constant. e.g. backfilling data per day(creation time last second of the day) and each partition will have multiple segments each differentiated by sequenceId.
  2. _ delimiter is used to avoid LLCSegment like treatment to uploaded segments.
    Updating the PR accordingly.

Comment on lines 68 to 70
_segmentName =
UPLOADED_PREFIX + SEPARATOR + tableName + SEPARATOR + partitionId + SEPARATOR + sequenceId + SEPARATOR
+ creationTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can use Joiner for better readability.

Comment on lines +172 to +173
Preconditions.checkState(segmentGeneratorConfig.getUploadedSegmentPartitionId() != -1,
"Valid partition id must be set for uploaded realtime segment name generator");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we also add sequence_id here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for validation?

The sequenceId is sent as part of SegmentNameGenerator method interface

String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue);

@@ -44,6 +44,16 @@ public static Integer getRealtimeSegmentPartitionId(String segmentName, String r
if (llcSegmentName != null) {
return llcSegmentName.getPartitionGroupId();
}

try {
UploadedRealtimeSegmentName uploadedRealtimeSegmentName = new UploadedRealtimeSegmentName(segmentName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may add a util method UploadedRealtimeSegmentName.of(segmentName) like LLCSegmentName.of()

*/
public class UploadedRealtimeSegmentName implements Comparable<UploadedRealtimeSegmentName> {

public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = "^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. why not just split the fields out from the segment name via StringUtils.split()? In case the table name might include _, we can get the required fields as counted from the end of name. And for simplicity, we can enforce to have seqId field (which seems optional in this regex pattern).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The regex helped avoid parsing numbers during validation. Using separator based approch becomes more complex with handling number parsing.

Regex uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$ is handling sequenceId as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@klsince, I see LLC segment name does not validate the integer and long fields encoded in name:

public static boolean isLLCSegment(String segmentName) {
    int numSeparators = 0;
    int index = 0;
    while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
      numSeparators++;
      index += 2; // SEPARATOR.length()
    }
    return numSeparators == 3;
  }

We can have a similar string split based checks and parsing for uniformity but I think it would be good to validate the int and long fields as well, as they get used later in the code, the changes would look like this.

public UploadedRealtimeSegmentName(String segmentName) {

    // split the segment name by the separator and get creation time, sequence id, partition id and table name from
    // the end and validate segment name starts with prefix uploaded_
    try {
      String[] parts = segmentName.split(SEPARATOR);
      Preconditions.checkState(parts.length >= 5 && parts[0].equals(UPLOADED_PREFIX),
          "Uploaded segment name must be of the format uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}");
      _creationTime = Long.parseLong(parts[parts.length - 1]);
      _sequenceId = Integer.parseInt(parts[parts.length - 2]);
      _partitionId = Integer.parseInt(parts[parts.length - 3]);
      _tableName = Joiner.on(SEPARATOR).join(Arrays.copyOfRange(parts, 1, parts.length - 3));
      _segmentName = segmentName;
    } catch (NumberFormatException e) {
      throw new IllegalArgumentException("Invalid segment name: " + segmentName, e);
    }
  }
public static boolean isUploadedRealtimeSegmentName(String segmentName) {
    String[] parts = segmentName.split(SEPARATOR);
    if(parts.length < 5) {
      return false;
    }
    if (!parts[0].equals(UPLOADED_PREFIX)) {
      return false;
    }
    // return false if parts[-1] is not an integer
    try {
      Long.parseLong(parts[parts.length - 1]);
      Integer.parseInt(parts[parts.length - 2]);
      Integer.parseInt(parts[parts.length - 3]);
    } catch (NumberFormatException e) {
      return false;
    }
    return true;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be a bit concerned on the cost of evaluating regex, as the method is used on the upsert comparison logic path as well. In worst case, the regex may be evaluated for every doc ingested (as the upload segment might be the winner all the time). We may do a micro benchmark on the two ways: regex vs. split (the code changes you proposed above, but better use StringUtils.split() as String.split() does regex internally as well...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As to the naming pattern, I wonder it may be more flexible to change uploaded as an optional suffix and not enforce the suffix value.

e.g. {tableName}_{partId}_{seqId}_{ctime}_{optional suffix}. e.g. I may want some minion tasks to upload segments but to differentiate from those uploaded manually, I may use the task name as the suffix. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick benchmarking with 10 million strings and String.split() method is takes 0.57 amount of time compared to regex based. Sounds good to go with String.split() based approach.

Test 1

Method 1 (String.split()) took: 2253611542 ns
Method 2 (regex match) took: 3891154500 ns
Method 2 is 0.5791626988853822 times faster than Method 1

Test 2

Method 1 (String.split()) took: 2156181167 ns
Method 2 (regex match) took: 3929024375 ns
Method 2 is 0.5487828430690253 times faster than Method 1

Test 3

Method 1 (String.split()) took: 2167354959 ns
Method 2 (regex match) took: 3646892125 ns
Method 2 is 0.5943019109730316 times faster than Method 1

*/
public class UploadedRealtimeSegmentName implements Comparable<UploadedRealtimeSegmentName> {

public static final String UPLOADED_REALTIME_SEGMENT_NAME_REGEX = "^uploaded_(.+)_(\\d+)_(\\d+)_(\\d+)$";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be a bit concerned on the cost of evaluating regex, as the method is used on the upsert comparison logic path as well. In worst case, the regex may be evaluated for every doc ingested (as the upload segment might be the winner all the time). We may do a micro benchmark on the two ways: regex vs. split (the code changes you proposed above, but better use StringUtils.split() as String.split() does regex internally as well...)

private boolean shouldReplaceOnComparisonTie(String segmentName, String currentSegmentName,
long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {

if (LLCSegmentName.isLLCSegment(segmentName) && LLCSegmentName.isLLCSegment(currentSegmentName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the comparison of seqIds shouldn't be part of the if-check, as that'd miss a case when both are LLC but seqIds are not a > b.

if (LLC && LLC) {
   return seqId_a > seqId_b;
}

And we can do LLC.of() and check nulls to save some parsing cost, as both isLLCSegemnt() and getSequenceNumber() does parsing inside. Same for the checks on uploaded names below, we can do Uploaded.of() and check nulls to save some cost of regex evaluation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the correction and optimization sounds good. Will update.

@rohityadav1993 rohityadav1993 force-pushed the upsert_backfill branch 2 times, most recently from 61ba36e to 05c076b Compare May 30, 2024 14:10
}

public static boolean isUploadedRealtimeSegmentNameMethod1(String segmentName) {
String[] parts = segmentName.split(SEPARATOR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about StringUtils.split()? because String.split() does some regex matches internally too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you, I wasn't aware of the subtelty. This file was also not supposed to be checked in, removing.
Did some benchmark with 3 methods:

Method 1 (String.split()) took: 2266343500 ns
Method 2 (regex match) took: 3760987625 ns
Method 3 (StringUtils.split()) took: 2412549250 ns

Method 3 is 0.939397817474607 times faster than Method 1
Method 3 is 1.5589267763134784 times faster than Method 2
Method 1 (String.split()) took: 2462188708 ns
Method 2 (regex match) took: 4130721000 ns
Method 3 (StringUtils.split()) took: 2082269750 ns

Method 3 is 1.1824542463818628 times faster than Method 1
Method 3 is 1.983758828557155 times faster than Method 2
Method 1 (String.split()) took: 2275362125 ns
Method 2 (regex match) took: 4017714167 ns
Method 3 (StringUtils.split()) took: 2003299125 ns

Method 3 is 1.1358074770785915 times faster than Method 1
Method 3 is 2.0055488053986945 times faster than Method 2



/**
* Class to represent segment names like: uploaded_{tableName}_{partitionId}_{sequenceId}_{creationTime}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment might be missed: #13107 (comment)

basically, I think it can be more flexible to use an optional suffix and not enforce the value of the suffix, e.g. {tableName}_{partitionId}_{sequenceId}_{creationTime}_{optional suffix} as users can use suffix value to differentiate segments uploaded in different ways (e.g. via minion tasks)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to enforce prefix was to quickly identify an uploaded segment. With optional suffix we would have to fall back to segment status in ZK metadta to know if it is an uploaded segment(do suggest if there is a better way).

It may even be complex to parse the segment name to figure out partitionId and segmentId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Offline discussion: The final semgent name convention has been decided as uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}
This ensures, similar naming as LLCSegmentName, removes need to do ZK lookup in various critical paths, uploaded prefix enforces the type of segment and an optional suffix can be used to encode additional info, e.g. source of segment(spark, flink, minion, etc)

@rohityadav1993 rohityadav1993 force-pushed the upsert_backfill branch 3 times, most recently from e858faa to 1e3e2e0 Compare June 3, 2024 08:51
if (parts.length == 6) {
_suffix = parts[idx--];
}
_creationTime = parts[idx--];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems more intuitive to use index number directly, assuming no double underscore in table name (as done in LLCSegmentName.java)

_tableName = parts[1];
_partitionGroupId = Integer.parseInt(parts[2]);
...

If assuming table name may contain double underscores, the checkState() on 5 or 6 fields would already fail.

* @param suffix
*/
public UploadedRealtimeSegmentName(String tableName, int partitionId, int sequenceId, long msSinceEpoch,
String suffix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nullable on suffix


@Override
public int compareTo(UploadedRealtimeSegmentName other) {
Preconditions.checkState(_tableName.equals(other._tableName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preconditions.checkArgument(_tableName.equals(other._tableName),
        "Cannot compare segment names from different table: %s, %s", _segmentName, other.getSegmentName()); 

(borrowed from LLCSegmentName class)

}

// create a enum for source: externalUplaod, minion
public enum Source {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need for this? as this would limit the potential suffix user wants to set

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, no need, slipped from testing.

UploadedRealtimeSegmentName currentUploadedSegmentName = UploadedRealtimeSegmentName.of(currentSegmentName);

if (uploadedSegmentName != null && currentUploadedSegmentName != null) {
int comparisonResult = uploadedSegmentName.compareTo(currentUploadedSegmentName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may skip compareTo(), since we didn't do this for LLCSegmentName either. The compareTo() checks table name, partition id which are checked while the segment is uploaded already. I think we can just compare seqId and ctime here.

* should resolve to row in segment with higher sequence id, creation time.
* <li> When either is of type {@link UploadedRealtimeSegmentName} then resolve on creation time, if same(rare
* scenario) then give preference to uploaded time
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose, for all possible comparison between LLC, Uploaded, Unknown:

  1. if both LLC, compare seqId
  2. if both Uploaded, compare seqid+ctime
  3. if one segment is Uploaded and one is LLC, compare ctime but upon tie, favor Uploaded
  4. if there is a segment with Unknown format, return false, i.e. favor the one already set in metadata as of today's comparison logic. But we may favor the segment with known format, unless both segments are of Unknown format.

thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

afaik, we can create segments using SegmentIndexCreationDriverImpl and creationTime is always populated. The segment name format may be unknown but the segment is still uploaded and can be for say, data correction/addition.

So we can rely on creation time as first comparison criterion followed by below criteria:
LLC, Unknown: creation time check or prefer unknown as this is an uploaded segment too (deviates from current behaviour)
Uploaded, Unknown: creation time check or prefer known name format
Unknown, Unknown: creation time check or false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, basically:

  1. if both LLC, compare seqId
  2. if both Uploaded, compare ctime, then seqId (btw, I refined this point based on one unit test in this PR. I commented on that unit test as well.)
  3. otherwise, simply check ctimes. If ctimes are same, favor uploaded segment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if both Uploaded, compare ctime, then seqId (btw, I refined this point based on one unit test in this PR. I commented on that unit test as well.)

This makes sense, thinking from the backfill perspective(corrective/bootstrap backfill). Batch jobs run at a time T and the segments for that time T will be generated sequentially.

The naming convention java docs can call this out. Let me update the UTs accordingly.

* convention has been kept similar to {@LLCSegmentName} to but differentiates between stream generated LLCSegments
* based on the prefix "uploaded" and an optional suffix.
*/
public class UploadedRealtimeSegmentName implements Comparable<UploadedRealtimeSegmentName> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be helpful to that "Not like the seqId in LLCSemgentName, the seqId here is not ensured to monotonically increase but more for avoiding name conflicts when uploading segments with same ctime suffix. As to why using same ctime, it would allow one to overwrite previously segments more easily."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, the sequence numbers are externally generated and may not have same meaning. Updating.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: s/uplaoded/uploaded

int comparisonResult =
Integer.compare(uploadedSegmentName.getSequenceId(), currentUploadedSegmentName.getSequenceId());
if (comparisonResult == 0) {
Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missed return ?

btw, can inline into return res == 0? Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs) : res > 0


public UploadedRealtimeSegmentNameGenerator(String tableName, int partitionId, long creationTimeMillis,
String suffix) {
Preconditions.checkState(creationTimeMillis > 0, "Creation time must be positive");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use checkArgument() as those are params passed in

and also check on the partitionid value to be >=0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump ^ as this might be missed.

trackedSegments.add(uploadedSegment3);

// newUploadedSegment2: 2 -> {2, 120}, 3 -> {3, 80}
// segment3: 0 -> {0, 100}, 1 -> {1, 120}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 0 -> {0, 100} segment3 won over newUploadedSegment2? But segment3 had smaller ctime (1000)

So I wonder that we probably want to compare ctime then seqId for uploadedRealtimeSegment, as seqId is used to break tie (or avoid name conflicts) among uploadedRealtTimesegments with same creation times, and it's not quite comparable for uploaded segments with different ctimes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, responded here: #13107 (comment)

Copy link
Contributor

@klsince klsince left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

* convention has been kept similar to {@LLCSegmentName} to but differentiates between stream generated LLCSegments
* based on the prefix "uploaded" and an optional suffix.
*/
public class UploadedRealtimeSegmentName implements Comparable<UploadedRealtimeSegmentName> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: s/uplaoded/uploaded

* <li> When the replacing segment and current segment are of {@link UploadedRealtimeSegmentName} then the PK
* should resolve to row in segment with higher creation time followed by sequence id.
* <li> For other cases resolve based on creation time of segment. In case the creation time is same, give
* preference to an uplaoded segment. A segment which is not LLCSegment can be assumed to be uploaded segment and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: uplaoded

}

if (creationTimeComparisonRes == 0) {
return llcSegmentName == null || uploadedSegmentName != null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment that uploadedSegmentName != null is to favor the segment with formatted name

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature ingestion release-notes Referenced by PRs that need attention when compiling the next release notes upsert
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants