Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent table scans #373

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

Concurrent table scans #373

wants to merge 4 commits into from

Conversation

sdd
Copy link
Contributor

@sdd sdd commented May 13, 2024

This is a bit of an experiment to see how things could look if we tried to:

  • process Manifest Lists and Manifest Files concurrently rather than sequentially
  • process the stream of file scan tasks concurrently, streaming record batches from multiple files at the same time.

I'd like to add some unit tests to confirm that this behaves as expected beyond the existing tests that we have for TableScan, and add an integration / performance test that can quantify any performance improvements (or regressions 😅 ) that we get from these changes.

Let me know what you all think.

Copy link
Contributor

@marvinlanhenke marvinlanhenke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sdd for doing this draft.

Just in case you missed it we had a similiar discussion in #124 about a possible approach.
Left some comments (mostly questions).

In general I think we should wait for the runtime and the Evaluators (Manifest, Expression, etc.) to land. Add more tests in scan.rs - and then refactor into the async/ multi-threaded version. Curious for more comments on this, though.

partition_spec_id,
&context,
)?;
spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to spawn here or is the try_for_each_concurrent in run(...) already enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with using an mpsc channel to avoid having 2 nested try_for_each_concurrent macros. On reflection the inner try_for_each_concurrent is probably unnecessary, since there's only one async operation needed for each manifest file, despite that manifest file producing n DataFileTasks. So I can probably ditch the channel too. The channel in reader.rs is also overkill as I've only got a single try_for_each_concurrent there.

I just wanted to get the ball rolling on this while we're waiting for the filtering code to get signed off, it's fun writing this kind of stuff 😁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted reader.rs to be essentially the same as before, without any concurrent processing of batches from within the same file. I removed the nested try_for_each_concurrent from scan.rs but kept the mpsc channel.

}

#[derive(Debug)]
struct ConcurrentFileScanStreamContext {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once we have a runtime and the async approach is approved, we can get rid of the FileScanStreamContext and merge the struct into ConcurrentFileScanStreamContext

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, completely agree. Kept them separate for now as it made it easier to have the old non-concurrent version in the code base at the same time.

CONCURRENCY_LIMIT_MANIFEST_FILES,
Self::process_manifest_file,
)
.await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this yield the first FileScanTask when its available - or do we have to iterate over the complete stream, before we can return any result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, everything proceeds fully concurrently - whatever manifest is ready first will start yielding tasks up to the stream first

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for explaining

CONCURRENCY_LIMIT_MANIFEST_ENTRIES,
Self::process_manifest_entry,
)
.await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we had a discussion on this in #124 (in case you missed it) to not go overboard with the task spawning

@sdd
Copy link
Contributor Author

sdd commented May 14, 2024

I've updated this to ditch the concurrency when processing ManifestEntry items within a single Manifest, producing them asynchronously but sequentially instead. I've kept the limited concurrency when processing ManifestFiles within the scan's snapshot's ManifestList.

I've kept the approach of using an mpsc channel with a spawned task, with that task using try_for_each_concurrent to achieve the concurrency. This is because without the channel and spawned task, we'd need to use an async closure, which is unstable rust. With the spawned task we only need to use an async block, which is in stable rust.

@sdd sdd force-pushed the scan-concurrent branch 3 times, most recently from 6cb340c to 3947796 Compare May 22, 2024 06:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants