Skip to content

Commit

Permalink
feat: add SystemSchemaProvider to QueryExecutor (#24990)
Browse files Browse the repository at this point in the history
A shell for the `system` table provider was added to the QueryExecutorImpl
which currently does not do anything, but will enable us to tie the
different system table providers into it.

The QueryLog was elevated from the `Database`, i.e., namespace provider,
to the QueryExecutorImpl, so that it lives accross queries.
  • Loading branch information
hiltontj committed May 17, 2024
1 parent 2381cc6 commit 1cb3652
Showing 1 changed file with 67 additions and 25 deletions.
92 changes: 67 additions & 25 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct QueryExecutorImpl<W> {
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
query_log: Arc<QueryLog>,
}

impl<W: WriteBuffer> QueryExecutorImpl<W> {
Expand All @@ -72,12 +73,19 @@ impl<W: WriteBuffer> QueryExecutorImpl<W> {
));
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(concurrent_query_limit));
// TODO Fine tune this number or make configurable
const QUERY_LOG_LIMIT: usize = 1_000;
let query_log = Arc::new(QueryLog::new(
QUERY_LOG_LIMIT,
Arc::new(iox_time::SystemProvider::new()),
));
Self {
catalog,
write_buffer,
exec,
datafusion_config,
query_execution_semaphore,
query_log,
}
}
}
Expand Down Expand Up @@ -282,7 +290,7 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
&self,
name: &str,
span: Option<Span>,
_include_debug_info_tables: bool,
include_debug_info_tables: bool,
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
let _span_recorder = SpanRecorder::new(span);

Expand All @@ -297,6 +305,8 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
Arc::clone(&self.write_buffer) as _,
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
Arc::clone(&self.query_log),
include_debug_info_tables,
))))
}

Expand All @@ -312,13 +322,14 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
}
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Database<B> {
db_schema: Arc<DatabaseSchema>,
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
system_schema_provider: Arc<SystemSchemaProvider>,
}

impl<B: WriteBuffer> Database<B> {
Expand All @@ -327,20 +338,32 @@ impl<B: WriteBuffer> Database<B> {
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
include_debug_info_tables: bool,
) -> Self {
// TODO Fine tune this number
const QUERY_LOG_LIMIT: usize = 10;

let query_log = Arc::new(QueryLog::new(
QUERY_LOG_LIMIT,
Arc::new(iox_time::SystemProvider::new()),
let system_schema_provider = Arc::new(SystemSchemaProvider::new(
write_buffer.catalog(),
Arc::clone(&query_log),
include_debug_info_tables,
));
Self {
db_schema,
write_buffer,
exec,
datafusion_config,
query_log,
system_schema_provider,
}
}

fn from_namespace(db: &Self) -> Self {
Self {
db_schema: Arc::clone(&db.db_schema),
write_buffer: Arc::clone(&db.write_buffer),
exec: Arc::clone(&db.exec),
datafusion_config: Arc::clone(&db.datafusion_config),
query_log: Arc::clone(&db.query_log),
system_schema_provider: Arc::clone(&db.system_schema_provider),
}
}

Expand Down Expand Up @@ -404,17 +427,10 @@ impl<B: WriteBuffer> QueryNamespace for Database<B> {
span_ctx: Option<SpanContext>,
_config: Option<&QueryConfig>,
) -> IOxSessionContext {
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);

let mut cfg = self
.exec
.new_session_config()
.with_default_catalog(Arc::new(qdb))
.with_default_catalog(Arc::new(Self::from_namespace(self)))
.with_span_context(span_ctx);

for (k, v) in self.datafusion_config.as_ref() {
Expand All @@ -437,15 +453,8 @@ impl<B: WriteBuffer> CatalogProvider for Database<B> {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
info!("CatalogProvider schema {}", name);
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);

match name {
DEFAULT_SCHEMA => Some(Arc::new(qdb)),
DEFAULT_SCHEMA => Some(Arc::new(Self::from_namespace(self))),
_ => None,
}
}
Expand Down Expand Up @@ -486,7 +495,6 @@ impl<B: WriteBuffer> QueryTable<B> {
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
// TODO - this is only pulling from write buffer, and not parquet?
self.write_buffer.get_table_chunks(
&self.db_schema.name,
self.name.as_ref(),
Expand Down Expand Up @@ -545,3 +553,37 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> {
provider.scan(ctx, projection, &filters, limit).await
}
}

const _QUERIES_TABLE: &str = "queries";
const _PARQUET_FILES_TABLE: &str = "parquet_files";

struct SystemSchemaProvider {
tables: HashMap<&'static str, Arc<dyn TableProvider>>,
}

impl std::fmt::Debug for SystemSchemaProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut keys = self.tables.keys().copied().collect::<Vec<_>>();
keys.sort_unstable();

f.debug_struct("SystemSchemaProvider")
.field("tables", &keys.join(", "))
.finish()
}
}

impl SystemSchemaProvider {
fn new(_catalog: Arc<Catalog>, _query_log: Arc<QueryLog>, include_debug_info: bool) -> Self {
let tables = HashMap::new();
if include_debug_info {
// Using todo!() here causes gRPC integration tests to fail, likely because they
// enable debug mode by default, thus entering this if block. So, just leaving this
// here in lieu of todo!().
//
// Eventually, we will implement the queries and parquet_files tables and they will
// be injected to the provider's table hashmap here...
info!("TODO - gather system tables");
}
Self { tables }
}
}

0 comments on commit 1cb3652

Please sign in to comment.