Skip to content

Commit e4ad5ed

Browse files
committed
initial stab
1 parent 541fdfb commit e4ad5ed

File tree

13 files changed

+408
-16
lines changed

13 files changed

+408
-16
lines changed

datafusion/catalog/src/default_table_source.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ impl TableSource for DefaultTableSource {
7373
&self,
7474
filter: &[&Expr],
7575
) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
76+
#[allow(deprecated)]
7677
self.table_provider.supports_filters_pushdown(filter)
7778
}
7879

datafusion/catalog/src/table.rs

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,14 @@ use arrow::datatypes::SchemaRef;
2525
use async_trait::async_trait;
2626
use datafusion_common::Result;
2727
use datafusion_common::{not_impl_err, Constraints, Statistics};
28-
use datafusion_expr::Expr;
28+
use datafusion_expr::{Expr, SortExpr};
2929

3030
use datafusion_expr::dml::InsertOp;
3131
use datafusion_expr::{
3232
CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
3333
};
3434
use datafusion_physical_plan::ExecutionPlan;
35+
use itertools::Itertools;
3536

3637
/// A table which can be queried and modified.
3738
///
@@ -163,6 +164,7 @@ pub trait TableProvider: Debug + Sync + Send {
163164
/// because inexact filters do not guarantee that every filtered row is
164165
/// removed, so applying the limit could lead to too few rows being available
165166
/// to return as a final result.
167+
#[deprecated(since = "50.0.0", note = "Use `scan_with_options` instead")]
166168
async fn scan(
167169
&self,
168170
state: &dyn Session,
@@ -171,6 +173,25 @@ pub trait TableProvider: Debug + Sync + Send {
171173
limit: Option<usize>,
172174
) -> Result<Arc<dyn ExecutionPlan>>;
173175

176+
async fn scan_with_options(
177+
&self,
178+
state: &dyn Session,
179+
options: ScanArgs,
180+
) -> Result<ScanResult> {
181+
let ScanArgs {
182+
preferred_ordering: _,
183+
filters,
184+
projection,
185+
limit,
186+
} = options;
187+
let filters = filters.unwrap_or_default();
188+
#[allow(deprecated)]
189+
let plan = self
190+
.scan(state, projection.as_ref(), &filters, limit)
191+
.await?;
192+
Ok(ScanResult::new(plan, vec![]))
193+
}
194+
174195
/// Specify if DataFusion should provide filter expressions to the
175196
/// TableProvider to apply *during* the scan.
176197
///
@@ -251,6 +272,7 @@ pub trait TableProvider: Debug + Sync + Send {
251272
/// }
252273
/// }
253274
/// ```
275+
#[deprecated(since = "50.0.0", note = "Use `scan_with_options` instead")]
254276
fn supports_filters_pushdown(
255277
&self,
256278
filters: &[&Expr],
@@ -299,6 +321,74 @@ pub trait TableProvider: Debug + Sync + Send {
299321
}
300322
}
301323

324+
#[derive(Debug, Clone, Default)]
325+
pub struct ScanArgs {
326+
preferred_ordering: Option<Vec<SortExpr>>,
327+
filters: Option<Vec<Expr>>,
328+
projection: Option<Vec<usize>>,
329+
limit: Option<usize>,
330+
}
331+
332+
impl ScanArgs {
333+
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
334+
self.projection = projection;
335+
self
336+
}
337+
338+
pub fn projection(&self) -> Option<Vec<usize>> {
339+
self.projection.clone()
340+
}
341+
342+
pub fn with_filters(mut self, filters: Option<Vec<Expr>>) -> Self {
343+
self.filters = filters;
344+
self
345+
}
346+
347+
pub fn filters(&self) -> Option<&[Expr]> {
348+
self.filters.as_deref()
349+
}
350+
351+
pub fn with_limit(mut self, limit: Option<usize>) -> Self {
352+
self.limit = limit;
353+
self
354+
}
355+
356+
pub fn limit(&self) -> Option<usize> {
357+
self.limit
358+
}
359+
360+
pub fn with_preferred_ordering(mut self, ordering: Option<Vec<SortExpr>>) -> Self {
361+
self.preferred_ordering = ordering;
362+
self
363+
}
364+
365+
pub fn preferred_ordering(&self) -> Option<&[SortExpr]> {
366+
self.preferred_ordering.as_deref()
367+
}
368+
}
369+
370+
#[derive(Debug, Clone)]
371+
pub struct ScanResult {
372+
/// The ExecutionPlan to run.
373+
plan: Arc<dyn ExecutionPlan>,
374+
// Remaining filters that were not completely evaluated during `scan_with_options()`.
375+
filters: Vec<Expr>,
376+
}
377+
378+
impl ScanResult {
379+
pub fn new(plan: Arc<dyn ExecutionPlan>, filters: Vec<Expr>) -> Self {
380+
Self { plan, filters }
381+
}
382+
383+
pub fn plan(&self) -> Arc<dyn ExecutionPlan> {
384+
Arc::clone(&self.plan)
385+
}
386+
387+
pub fn filters(&self) -> &[Expr] {
388+
&self.filters
389+
}
390+
}
391+
302392
/// A factory which creates [`TableProvider`]s at runtime given a URL.
303393
///
304394
/// For example, this can be used to create a table "on the fly"

datafusion/core/src/datasource/listing/table.rs

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
3030
use arrow_schema::Schema;
3131
use async_trait::async_trait;
32-
use datafusion_catalog::{Session, TableProvider};
32+
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
3333
use datafusion_common::{
3434
config_datafusion_err, config_err, internal_err, plan_err, project_schema,
3535
stats::Precision, Constraints, DataFusionError, Result, SchemaExt,
@@ -1166,6 +1166,22 @@ impl TableProvider for ListingTable {
11661166
filters: &[Expr],
11671167
limit: Option<usize>,
11681168
) -> Result<Arc<dyn ExecutionPlan>> {
1169+
let options = ScanArgs::default()
1170+
.with_projection(projection.cloned())
1171+
.with_filters(Some(filters.to_vec()))
1172+
.with_limit(limit);
1173+
Ok(self.scan_with_options(state, options).await?.plan())
1174+
}
1175+
1176+
async fn scan_with_options(
1177+
&self,
1178+
state: &dyn Session,
1179+
options: ScanArgs,
1180+
) -> Result<ScanResult> {
1181+
let projection = options.projection();
1182+
let filters = options.filters().map(|f| f.to_vec()).unwrap_or_default();
1183+
let limit = options.limit();
1184+
11691185
// extract types of partition columns
11701186
let table_partition_cols = self
11711187
.options
@@ -1195,21 +1211,36 @@ impl TableProvider for ListingTable {
11951211

11961212
// if no files need to be read, return an `EmptyExec`
11971213
if partitioned_file_lists.is_empty() {
1198-
let projected_schema = project_schema(&self.schema(), projection)?;
1199-
return Ok(Arc::new(EmptyExec::new(projected_schema)));
1214+
let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
1215+
return Ok(ScanResult::new(
1216+
Arc::new(EmptyExec::new(projected_schema)),
1217+
filters.clone(),
1218+
));
12001219
}
12011220

1202-
let output_ordering = self.try_create_output_ordering()?;
1221+
let known_file_ordering = self.try_create_output_ordering()?;
1222+
let desired_file_ordering = match options.preferred_ordering() {
1223+
Some(ordering) if !ordering.is_empty() => {
1224+
// Prefer the ordering requested by the query to any inherint file ordering
1225+
create_ordering(&self.table_schema, &[ordering.to_vec()])?
1226+
.first()
1227+
.cloned()
1228+
}
1229+
Some(_) | None => {
1230+
// If the query did not request a specific ordering, fall back to any inherent file ordering
1231+
known_file_ordering.first().cloned()
1232+
}
1233+
};
12031234
match state
12041235
.config_options()
12051236
.execution
12061237
.split_file_groups_by_statistics
12071238
.then(|| {
1208-
output_ordering.first().map(|output_ordering| {
1239+
desired_file_ordering.map(|ordering| {
12091240
FileScanConfig::split_groups_by_statistics_with_target_partitions(
12101241
&self.table_schema,
12111242
&partitioned_file_lists,
1212-
output_ordering,
1243+
&ordering,
12131244
self.options.target_partitions,
12141245
)
12151246
})
@@ -1230,13 +1261,17 @@ impl TableProvider for ListingTable {
12301261
let Some(object_store_url) =
12311262
self.table_paths.first().map(ListingTableUrl::object_store)
12321263
else {
1233-
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty()))));
1264+
return Ok(ScanResult::new(
1265+
Arc::new(EmptyExec::new(Arc::new(Schema::empty()))),
1266+
filters.clone(),
1267+
));
12341268
};
12351269

12361270
let file_source = self.create_file_source_with_schema_adapter()?;
12371271

12381272
// create the execution plan
1239-
self.options
1273+
let plan = self
1274+
.options
12401275
.format
12411276
.create_physical_plan(
12421277
state,
@@ -1248,14 +1283,16 @@ impl TableProvider for ListingTable {
12481283
.with_file_groups(partitioned_file_lists)
12491284
.with_constraints(self.constraints.clone())
12501285
.with_statistics(statistics)
1251-
.with_projection(projection.cloned())
1286+
.with_projection(projection)
12521287
.with_limit(limit)
1253-
.with_output_ordering(output_ordering)
1288+
.with_output_ordering(known_file_ordering)
12541289
.with_table_partition_cols(table_partition_cols)
12551290
.with_expr_adapter(self.expr_adapter_factory.clone())
12561291
.build(),
12571292
)
1258-
.await
1293+
.await?;
1294+
1295+
Ok(ScanResult::new(plan, filters.clone()))
12591296
}
12601297

12611298
fn supports_filters_pushdown(

datafusion/core/src/physical_planner.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by;
6060
use arrow::array::{builder::StringBuilder, RecordBatch};
6161
use arrow::compute::SortOptions;
6262
use arrow::datatypes::{Schema, SchemaRef};
63+
use datafusion_catalog::ScanArgs;
6364
use datafusion_common::display::ToStringifiedPlan;
6465
use datafusion_common::tree_node::{
6566
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
@@ -85,7 +86,7 @@ use datafusion_expr::{
8586
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8687
use datafusion_physical_expr::expressions::{Column, Literal};
8788
use datafusion_physical_expr::{
88-
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
89+
conjunction, create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
8990
};
9091
use datafusion_physical_optimizer::PhysicalOptimizerRule;
9192
use datafusion_physical_plan::empty::EmptyExec;
@@ -452,16 +453,22 @@ impl DefaultPhysicalPlanner {
452453
projection,
453454
filters,
454455
fetch,
456+
preferred_ordering,
455457
..
456458
}) => {
457459
let source = source_as_provider(source)?;
458460
// Remove all qualifiers from the scan as the provider
459461
// doesn't know (nor should care) how the relation was
460462
// referred to in the query
461463
let filters = unnormalize_cols(filters.iter().cloned());
462-
source
463-
.scan(session_state, projection.as_ref(), &filters, *fetch)
464-
.await?
464+
let opts = ScanArgs::default()
465+
.with_projection(projection.clone())
466+
.with_filters(Some(filters).clone())
467+
.with_preferred_ordering(preferred_ordering.clone())
468+
.with_limit(*fetch);
469+
let res = source.scan_with_options(session_state, opts).await?;
470+
// TODO: move FilterExec wrapping logic from filter pushdown rule to here?
471+
res.plan()
465472
}
466473
LogicalPlan::Values(Values { values, schema }) => {
467474
let exec_schema = schema.as_ref().to_owned().into();

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2522,6 +2522,8 @@ pub struct TableScan {
25222522
pub filters: Vec<Expr>,
25232523
/// Optional number of rows to read
25242524
pub fetch: Option<usize>,
2525+
/// Preferred read ordering of the table
2526+
pub preferred_ordering: Option<Vec<SortExpr>>,
25252527
}
25262528

25272529
impl Debug for TableScan {
@@ -2640,8 +2642,14 @@ impl TableScan {
26402642
projected_schema,
26412643
filters,
26422644
fetch,
2645+
preferred_ordering: None,
26432646
})
26442647
}
2648+
2649+
pub fn with_preferred_ordering(mut self, ordering: Option<Vec<SortExpr>>) -> Self {
2650+
self.preferred_ordering = ordering;
2651+
self
2652+
}
26452653
}
26462654

26472655
// Repartition the plan based on a partitioning scheme.
@@ -4823,6 +4831,7 @@ mod tests {
48234831
projected_schema: Arc::clone(&schema),
48244832
filters: vec![],
48254833
fetch: None,
4834+
preferred_ordering: None,
48264835
}));
48274836
let col = schema.field_names()[0].clone();
48284837

@@ -4853,6 +4862,7 @@ mod tests {
48534862
projected_schema: Arc::clone(&unique_schema),
48544863
filters: vec![],
48554864
fetch: None,
4865+
preferred_ordering: None,
48564866
}));
48574867
let col = schema.field_names()[0].clone();
48584868

datafusion/expr/src/logical_plan/tree_node.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ impl LogicalPlan {
599599
projected_schema,
600600
filters,
601601
fetch,
602+
preferred_ordering,
602603
}) => filters.map_elements(f)?.update_data(|filters| {
603604
LogicalPlan::TableScan(TableScan {
604605
table_name,
@@ -607,6 +608,7 @@ impl LogicalPlan {
607608
projected_schema,
608609
filters,
609610
fetch,
611+
preferred_ordering,
610612
})
611613
}),
612614
LogicalPlan::Distinct(Distinct::On(DistinctOn {

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub mod optimizer;
5858
pub mod propagate_empty_relation;
5959
pub mod push_down_filter;
6060
pub mod push_down_limit;
61+
pub mod push_down_sort;
6162
pub mod replace_distinct_aggregate;
6263
pub mod scalar_subquery_to_join;
6364
pub mod simplify_expressions;

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ fn optimize_projections(
242242
filters,
243243
fetch,
244244
projected_schema: _,
245+
preferred_ordering,
245246
} = table_scan;
246247

247248
// Get indices referred to in the original (schema with all fields)
@@ -257,6 +258,7 @@ fn optimize_projections(
257258
filters,
258259
fetch,
259260
)
261+
.map(|s| s.with_preferred_ordering(preferred_ordering))
260262
.map(LogicalPlan::TableScan)
261263
.map(Transformed::yes);
262264
}

datafusion/optimizer/src/optimizer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use crate::plan_signature::LogicalPlanSignature;
5151
use crate::propagate_empty_relation::PropagateEmptyRelation;
5252
use crate::push_down_filter::PushDownFilter;
5353
use crate::push_down_limit::PushDownLimit;
54+
use crate::push_down_sort::PushDownSort;
5455
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
5556
use crate::scalar_subquery_to_join::ScalarSubqueryToJoin;
5657
use crate::simplify_expressions::SimplifyExpressions;
@@ -243,6 +244,8 @@ impl Optimizer {
243244
// Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit
244245
Arc::new(PushDownLimit::new()),
245246
Arc::new(PushDownFilter::new()),
247+
// Push down sort requirements to TableScan preferred_ordering
248+
Arc::new(PushDownSort::new()),
246249
Arc::new(SingleDistinctToGroupBy::new()),
247250
// The previous optimizations added expressions and projections,
248251
// that might benefit from the following rules

0 commit comments

Comments
 (0)