Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add intermediate bookmarking implementation #40

Open
wants to merge 12 commits into
base: master
Choose a base branch
from

Conversation

RushiT0122
Copy link

@RushiT0122 RushiT0122 commented May 15, 2024

Description of change

Support ticket TDL-25734:
Tap was writing stream bookmark after complete extraction of all stream records. Because of this issue, in some cases we observed that long running historical syncs, after interruption were restarting from the start data and re-extracting duplicate records.

To fix this issue,

  • For incremental steams, we will order the response and bookmark the max replication value after every offset_bookmark_limit=1000 records.
  • For full table streams, we will save the offset after every offset_bookmark_limit=1000 records

This will allow interrupted extractions resume from the last bookmark value and reduce duplicate record extraction.

Manual QA steps

  • Verified the fix with locally for incremental and full table syncs
  • Verified fix with different offset_bookmark_limit values

Risks

Rollback steps

  • revert this branch

@shantanu73
Copy link
Contributor

Please add associated JIRA ticket with this PR


if next(iter(self.replication_keys or []), None):
return state.get('bookmarks', {}).get(stream, default), 0
else:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't need this else statement

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

self.stream_name, bookmark_field, last_datetime))
now_datetime = utils.now()
last_dttm = strptime_to_utc(last_datetime)
LOGGER.info('stream: {}, bookmark_field: {}, last_datetime: {}, offset: {}'.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use f-strings instead of format.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@RushiT0122
Copy link
Author

Addressed the review comments.

@shantanu73
Copy link
Contributor

Verified interrupted sync and intermediate bookmarking at my end. Apart from my previous comments everything looks fine.
Approving this.
Please merge after the CCi passes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants