diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 89bc48b1e634..445356324106 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -255,7 +255,7 @@ async fn sort_preserving_merge() { "| plan_type | plan |", "+---------------+--------------------------------------------------------------------------------------------------------------------------+", "| logical_plan | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |", - "| | TableScan: t projection=[a, b] |", + "| | TableScan: t projection=[a, b] preferred_ordering=[t.a ASC NULLS LAST, t.b ASC NULLS LAST] |", "| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10 |", "| | DataSourceExec: partitions=2, partition_sizes=[5, 5], fetch=10, output_ordering=a@0 ASC NULLS LAST, b@1 ASC NULLS LAST |", "| | |", @@ -303,7 +303,7 @@ async fn sort_spill_reservation() { "| plan_type | plan |", "+---------------+-------------------------------------------------------------------------------------------------------------+", "| logical_plan | Sort: t.a ASC NULLS LAST, t.b DESC NULLS FIRST |", - "| | TableScan: t projection=[a, b] |", + "| | TableScan: t projection=[a, b] preferred_ordering=[t.a ASC NULLS LAST, t.b DESC NULLS FIRST] |", "| physical_plan | SortExec: expr=[a@0 ASC NULLS LAST, b@1 DESC], preserve_partitioning=[false] |", "| | DataSourceExec: partitions=1, partition_sizes=[5], output_ordering=a@0 ASC NULLS LAST, b@1 ASC NULLS LAST |", "| | |", diff --git a/datafusion/core/tests/user_defined/user_defined_plan.rs b/datafusion/core/tests/user_defined/user_defined_plan.rs index f0bf15d3483b..1bf43a98be30 100644 --- a/datafusion/core/tests/user_defined/user_defined_plan.rs +++ b/datafusion/core/tests/user_defined/user_defined_plan.rs @@ -262,8 +262,9 @@ async fn topk_query() -> Result<()> { async fn topk_plan() -> Result<()> { let ctx = setup_table(make_topk_context()).await?; + #[rustfmt::skip] let mut expected = ["| logical_plan after topk | TopK: k=3 |", - "| | TableScan: sales projection=[customer_id,revenue] |"].join("\n"); + "| | TableScan:salesprojection=[customer_id,revenue]preferred_ordering=[sales.revenueDESCNULLSFIRST] |"].join("\n"); let explain_query = format!("EXPLAIN VERBOSE {QUERY}"); let actual_output = exec_sql(&ctx, &explain_query).await?; diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 7de2fd117487..41e9467d6b27 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -41,8 +41,9 @@ pub use plan::{ projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, DistinctOn, EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, - Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery, - SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, + Projection, RecursiveQuery, Repartition, ScanOrdering, SkipType, Sort, + StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, + Unnest, Values, Window, }; pub use statement::{ Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode, @@ -54,3 +55,5 @@ pub use datafusion_common::format::ExplainFormat; pub use display::display_schema; pub use extension::{UserDefinedLogicalNode, UserDefinedLogicalNodeCore}; + +pub use tree_node::LogicalPlanContext; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b8200ab8a48c..d325be67fd32 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -1757,6 +1757,7 @@ impl LogicalPlan { ref projection, ref filters, ref fetch, + ref ordering, .. }) => { let projected_fields = match projection { @@ -1824,6 +1825,20 @@ impl LogicalPlan { write!(f, ", fetch={n}")?; } + if let Some(ordering) = ordering { + if let Some(preferred_ordering) = &ordering.preferred_ordering { + write!( + f, + " preferred_ordering=[{}]", + preferred_ordering + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", ") + )?; + } + } + Ok(()) } LogicalPlan::Projection(Projection { ref expr, .. }) => { @@ -2593,6 +2608,68 @@ impl PartialOrd for Window { } } +/// Communicates the desired ordering of the output of a scan operation. +/// +/// Preferred orderings can potentially help DataFusion optimize queries, even in cases +/// when the output does not completely follow that order. This is information passed +/// to the scan about what might help. +/// +/// For example, a query with `ORDER BY time DESC LIMIT 10`, DataFusion's dynamic +/// predicates and TopK operator will work better if the data is roughly ordered by descending +/// time (more recent data first). +/// +/// Implementers of [`TableProvider`] should use this information to optimize the order in which data is output from the scan. +/// +/// It is a hint and not a requirement: +/// - If this information is completely ignored, e.g. data is scanned randomly, the query will still be correct because a sort will be applied to the data. +/// - Partially ordered data will also be re-sorted but this may result in optimizations like early stopping, additional data pruning, reduced memory usage during the sort, etc. +/// - If the scan produces exactly the requested ordering, and sets it's properties to reflect this, upstream sorts may be optimized away. +/// +/// Actually removing unnecessary sorts is done at the physical plan level: logical operators like a join may or may not preserve ordering +/// depending on what physical operator is chosen (e.g. HashJoin vs. SortMergeJoin). +/// If you as a [`TableProvider`] implementer would like to eliminiate unnecessary sorts you should make sure the [`ExecutionPlan`] +/// you produce reflects the ordering in it's properties. +/// +/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProvider.html +/// [`ExecutionPlan`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Default)] +pub struct ScanOrdering { + /// Optional preferred ordering for the scan that matches the output order of upstream query nodes. + /// It is optional / best effort for the scan to produce this ordering. + /// If the scan produces this exact ordering and sets it's properties to reflect this upstream sorts may be optimized away. + /// Otherwise the sorts may remain in place but partial ordering may be exploited e.g. to do early stopping or reduce complexity of the sort. + /// Thus it is recommended for the scan to also do a best effort to produce partially sorted data if possible. + pub preferred_ordering: Option>, +} + +impl ScanOrdering { + /// Attach a preferred ordering to the scan ordering. + /// See [`ScanOrdering`] for details on how this is used. + pub fn with_preferred_ordering(mut self, preferred_ordering: Vec) -> Self { + self.preferred_ordering = Some(preferred_ordering); + self + } +} + +impl Debug for ScanOrdering { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let ordering_display = self + .preferred_ordering + .as_ref() + .map(|ordering| { + ordering + .iter() + .map(|e| e.to_string()) + .collect::>() + .join(", ") + }) + .unwrap_or_else(|| "None".to_string()); + f.debug_struct("ScanOrdering") + .field("preferred_ordering", &ordering_display) + .finish_non_exhaustive() + } +} + /// Produces rows from a table provider by reference or from the context #[derive(Clone)] pub struct TableScan { @@ -2608,6 +2685,8 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Ordering for the scan + pub ordering: Option, } impl Debug for TableScan { @@ -2619,6 +2698,7 @@ impl Debug for TableScan { .field("projected_schema", &self.projected_schema) .field("filters", &self.filters) .field("fetch", &self.fetch) + .field("ordering", &self.ordering) .finish_non_exhaustive() } } @@ -2630,6 +2710,7 @@ impl PartialEq for TableScan { && self.projected_schema == other.projected_schema && self.filters == other.filters && self.fetch == other.fetch + && self.ordering == other.ordering } } @@ -2649,18 +2730,22 @@ impl PartialOrd for TableScan { pub filters: &'a Vec, /// Optional number of rows to read pub fetch: &'a Option, + /// Ordering information passed from the query to the scan. + pub ordering: &'a Option, } let comparable_self = ComparableTableScan { table_name: &self.table_name, projection: &self.projection, filters: &self.filters, fetch: &self.fetch, + ordering: &self.ordering, }; let comparable_other = ComparableTableScan { table_name: &other.table_name, projection: &other.projection, filters: &other.filters, fetch: &other.fetch, + ordering: &other.ordering, }; comparable_self .partial_cmp(&comparable_other) @@ -2676,6 +2761,7 @@ impl Hash for TableScan { self.projected_schema.hash(state); self.filters.hash(state); self.fetch.hash(state); + self.ordering.hash(state); } } @@ -2729,8 +2815,16 @@ impl TableScan { projected_schema, filters, fetch, + ordering: None, }) } + + /// Sets the ordering information for the scan. + /// See [`ScanOrdering`] for details on how this is used. + pub fn with_ordering(mut self, ordering: ScanOrdering) -> Self { + self.ordering = Some(ordering); + self + } } // Repartition the plan based on a partitioning scheme. @@ -4823,6 +4917,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + ordering: None, })); let col = schema.field_names()[0].clone(); @@ -4853,6 +4948,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + ordering: None, })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 47088370a1d9..3119f9e60052 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -599,6 +599,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + ordering, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -607,6 +608,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + ordering, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { @@ -868,3 +870,106 @@ impl LogicalPlan { }) } } + +/// A node context object beneficial for writing optimizer rules. +/// This context encapsulates a [`LogicalPlan`] node with a payload. +/// +/// Since each wrapped node has its children within both the [`LogicalPlanContext.plan.inputs()`], +/// as well as separately within the [`LogicalPlanContext.children`] (which are child nodes wrapped in the context), +/// it's important to keep these child plans in sync when performing mutations. +/// +/// Since there are two ways to access child plans directly — it's recommended +/// to perform mutable operations via [`Self::update_plan_from_children`]. +/// After mutating the `LogicalPlanContext.children`, or after creating the `LogicalPlanContext`, +/// call `update_plan_from_children` to sync. +/// +/// See also: +/// - [`datafusion_common::tree_node::TreeNode`] trait for tree traversal and mutation utilities. +/// - [`datafusion_common::tree_node::ConcreteTreeNode`] trait for integrating with the tree node utilities. +/// - [`datafusion::physical_plan::tree_node::PlanContext`] for a similar context for physical plans. +/// +/// [`datafusion::physical_plan::tree_node::PlanContext`]: https://docs.rs/datafusion/latest/datafusion/physical_plan/tree_node/struct.PlanContext.html +#[derive(Debug, Clone)] +pub struct LogicalPlanContext { + /// The logical plan associated with this context. + pub plan: LogicalPlan, + /// Custom data payload of the node. + pub data: T, + /// Child contexts of this node. + pub children: Vec, +} + +impl LogicalPlanContext { + pub fn new(plan: LogicalPlan, data: T, children: Vec) -> Self { + Self { + plan, + data, + children, + } + } + + /// Update the [`LogicalPlanContext.plan.inputs()`] from the [`LogicalPlanContext.children`], + /// if the `LogicalPlanContext.children` have been changed. + pub fn update_plan_from_children(mut self) -> Result { + // Get the plans from all children + let children_plans = self + .children + .iter() + .map(|c| c.plan.clone()) + .collect::>(); + + // Use TreeNode's map_children to reconstruct the plan with new children + let mut child_iter = children_plans.into_iter(); + self.plan = self + .plan + .clone() + .map_children(|_| { + // Replace each child with the corresponding child from our context + child_iter.next().map(Transformed::no).ok_or_else(|| { + datafusion_common::DataFusionError::Internal( + "Mismatch between plan children and context children".to_string(), + ) + }) + })? + .data; + + Ok(self) + } +} + +impl LogicalPlanContext { + pub fn new_default(plan: LogicalPlan) -> Self { + let children = plan + .inputs() + .into_iter() + .cloned() + .map(Self::new_default) + .collect(); + Self::new(plan, Default::default(), children) + } +} + +impl std::fmt::Display for LogicalPlanContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let node_string = self.plan.display_indent(); + write!(f, "Node plan: {node_string}")?; + write!(f, "Node data: {}", self.data)?; + write!(f, "") + } +} + +impl datafusion_common::tree_node::ConcreteTreeNode for LogicalPlanContext { + fn children(&self) -> &[Self] { + &self.children + } + + fn take_children(mut self) -> (Self, Vec) { + let children = std::mem::take(&mut self.children); + (self, children) + } + + fn with_new_children(mut self, children: Vec) -> Result { + self.children = children; + self.update_plan_from_children() + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 85fa9493f449..bcb2e68eabdd 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -58,6 +58,7 @@ pub mod optimizer; pub mod propagate_empty_relation; pub mod push_down_filter; pub mod push_down_limit; +pub mod push_down_sort; pub mod replace_distinct_aggregate; pub mod scalar_subquery_to_join; pub mod simplify_expressions; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 5db71417bc8f..ce41dfc7db2e 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -260,6 +260,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, + ordering, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -275,6 +276,10 @@ fn optimize_projections( filters, fetch, ) + .map(|s| match ordering { + Some(ordering) => s.with_ordering(ordering), + None => s, + }) .map(LogicalPlan::TableScan) .map(Transformed::yes); } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 084152d40e92..ce27d1537da7 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -51,6 +51,7 @@ use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::push_down_filter::PushDownFilter; use crate::push_down_limit::PushDownLimit; +use crate::push_down_sort::PushDownSort; use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; @@ -247,6 +248,8 @@ impl Optimizer { Arc::new(EliminateOuterJoin::new()), // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), + // Sort pushdown should happen before filter pushdown to maximize optimization opportunities + Arc::new(PushDownSort::new()), Arc::new(PushDownFilter::new()), Arc::new(SingleDistinctToGroupBy::new()), // The previous optimizations added expressions and projections, diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index a8251d669002..9781d6c6965e 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -24,9 +24,7 @@ use arrow::datatypes::DataType; use indexmap::IndexSet; use itertools::Itertools; -use datafusion_common::tree_node::{ - Transformed, TransformedResult, TreeNode, TreeNodeRecursion, -}; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{ internal_err, plan_err, qualified_name, Column, DFSchema, Result, }; @@ -42,9 +40,14 @@ use datafusion_expr::{ use crate::optimizer::ApplyOrder; use crate::simplify_expressions::simplify_predicates; -use crate::utils::{has_all_column_refs, is_restrict_null_predicate}; +use crate::utils::{ + build_schema_remapping, has_all_column_refs, is_restrict_null_predicate, +}; use crate::{OptimizerConfig, OptimizerRule}; +// Re-export from here for backwards compatibility; it was moved from this module into `utils`. +pub use crate::utils::replace_cols_by_name; + /// Optimizer rule for pushing (moving) filter expressions down in a plan so /// they are applied as early as possible. /// @@ -836,17 +839,10 @@ impl OptimizerRule for PushDownFilter { insert_below(LogicalPlan::Sort(sort), new_filter) } LogicalPlan::SubqueryAlias(subquery_alias) => { - let mut replace_map = HashMap::new(); - for (i, (qualifier, field)) in - subquery_alias.input.schema().iter().enumerate() - { - let (sub_qualifier, sub_field) = - subquery_alias.schema.qualified_field(i); - replace_map.insert( - qualified_name(sub_qualifier, sub_field.name()), - Expr::Column(Column::new(qualifier.cloned(), field.name())), - ); - } + let replace_map = build_schema_remapping( + &subquery_alias.schema, + subquery_alias.input.schema(), + ); let new_predicate = replace_cols_by_name(filter.predicate, &replace_map)?; let new_filter = LogicalPlan::Filter(Filter::try_new( @@ -951,15 +947,8 @@ impl OptimizerRule for PushDownFilter { LogicalPlan::Union(ref union) => { let mut inputs = Vec::with_capacity(union.inputs.len()); for input in &union.inputs { - let mut replace_map = HashMap::new(); - for (i, (qualifier, field)) in input.schema().iter().enumerate() { - let (union_qualifier, union_field) = - union.schema.qualified_field(i); - replace_map.insert( - qualified_name(union_qualifier, union_field.name()), - Expr::Column(Column::new(qualifier.cloned(), field.name())), - ); - } + let replace_map = + build_schema_remapping(&union.schema, input.schema()); let push_predicate = replace_cols_by_name(filter.predicate.clone(), &replace_map)?; @@ -1388,24 +1377,6 @@ impl PushDownFilter { } } -/// replaces columns by its name on the projection. -pub fn replace_cols_by_name( - e: Expr, - replace_map: &HashMap, -) -> Result { - e.transform_up(|expr| { - Ok(if let Expr::Column(c) = &expr { - match replace_map.get(&c.flat_name()) { - Some(new_c) => Transformed::yes(new_c.clone()), - None => Transformed::no(expr), - } - } else { - Transformed::no(expr) - }) - }) - .data() -} - /// check whether the expression uses the columns in `check_map`. fn contain(e: &Expr, check_map: &HashMap) -> bool { let mut is_contain = false; @@ -3123,6 +3094,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + ordering: None, }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs new file mode 100644 index 000000000000..eaf62e667e40 --- /dev/null +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -0,0 +1,804 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PushDownSort`] pushes sort expressions into table scans to enable +//! sort pushdown optimizations by table providers + +use std::collections::HashMap; + +use crate::optimizer::ApplyOrder; +use crate::utils::{build_schema_remapping, replace_cols_by_name}; +use crate::{OptimizerConfig, OptimizerRule}; + +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{qualified_name, Result}; +use datafusion_expr::logical_plan::LogicalPlan; +use datafusion_expr::{Expr, LogicalPlanContext, ScanOrdering, SortExpr}; + +/// Optimization rule that pushes sort expressions down to table scans +/// when the sort can potentially be optimized by the table provider. +/// +/// This rule carries sort expressions down through nodes that we know are safe to push +/// sorts though such as `Projection`, `Filter`, `Repartition` and `Limit`. +/// It stops when it hits a `TableScan` (where it attaches the sort expressions to the scan) +/// or any other node type that cannot pass down sort expressions (e.g. `Aggregate`, `Join`, `Union`, etc). +/// +/// The optimizer preserves the original `Sort` node; this optimizer does not remove any sorts. +/// This means that the [`TableProvider`] can choose to ignore the preferred ordering +/// or only partially satisfy it. The original `Sort` node ensures that the final output +/// ordering is always correct. +/// +/// Physical optimizer rules can later remove redundant sorts if they can prove +/// that the output is already sorted as required. +/// +/// # Examples +/// +/// ```text +/// Before optimization: +/// Sort: test.a ASC NULLS LAST +/// TableScan: test +/// +/// After optimization: +/// Sort: test.a ASC NULLS LAST +/// TableScan: test preferred_ordering=[test.a ASC NULLS LAST] +/// ``` +/// +/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html +#[derive(Default, Debug)] +pub struct PushDownSort {} + +impl PushDownSort { + /// Creates a new instance of the `PushDownSort` optimizer rule. + /// + /// # Returns + /// + /// A new `PushDownSort` optimizer rule that can be added to the optimization pipeline. + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for PushDownSort { + fn supports_rewrite(&self) -> bool { + true + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + /// Recursively push down sort expressions through the logical plan tree. + /// + /// We stop when we hit: + /// 1. A TableScan leaf. In this case we bind the preferred ordering + /// to the TableScan node and return a new plan tree. + /// 2. Any node that is not a Filter, Projection or SubqueryAlias. In this case + /// we clear the sort expressions and continue the recursion with no preferred + /// ordering. + /// 3. A Sort node. In this case we replace the current sort expressions + /// with the new ones and continue the recursion. + /// + /// # Arguments + /// + /// * `plan` - The current logical plan node being processed. + /// * `sort_exprs` - The current list of sort expressions to push down. + /// + /// # Returns + /// + /// A `Result` containing the transformed logical plan with sort expressions + /// pushed down where possible. + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let ctx = SortPushdownContext::new_default(plan); + ctx.transform_down(|mut ctx| { + match &ctx.plan { + LogicalPlan::TableScan(table_scan) => { + if let Some(sort_exprs) = &ctx.data { + // Create new TableScan with preferred ordering + let new_table_scan = table_scan.clone().with_ordering( + ScanOrdering::default() + .with_preferred_ordering(sort_exprs.to_vec()), + ); + // Return new TableScan with preferred ordering + return Ok(Transformed::yes(SortPushdownContext::new_default( + LogicalPlan::TableScan(new_table_scan), + ))); + } + // No sort expressions to push down or cannot push down, return original plan + Ok(Transformed::no(ctx)) + } + LogicalPlan::Sort(ref sort) => { + // Update current sort expressions to the new ones + ctx.data = Some(sort.expr.clone()); + // Propagate sort expressions to all children + let sort_exprs = ctx.data.clone(); + for child in ctx.children.iter_mut() { + child.data = sort_exprs.clone(); + } + // Continue recursion with updated sort expressions + Ok(Transformed::no(ctx)) + } + LogicalPlan::Projection(ref projection) => { + // We can only push down sort expressions through a projection if the expression we are sorting on was not created by the projection itself. + // We may also need to re-write sort expressions to reverse aliasing done by the projection. + + if let Some(sort_exprs) = &ctx.data { + // Build projection mapping: output column name -> underlying input expression + let projection_map: HashMap = projection + .schema + .iter() + .zip(projection.expr.iter()) + .map(|((qualifier, field), expr)| { + // Strip alias, as they should not be part of sort expressions + ( + qualified_name(qualifier, field.name()), + expr.clone().unalias(), + ) + }) + .collect(); + + // Rewrite sort expressions through the projection, stopping at first failure. + // We push down whatever prefix we can, but if any expression cannot be rewritten + // we drop them from the sort pushdown. + // For example, given the projection `a as a, b as b, c + d + 1 as cd1` and the sort expression `a, cd1, b` + // we will only be able to push down the sort expression `a,` but not `cd1` as it is not a simple column reference and not `b` as it comes after `cd1`. + let mut rewritten_sorts = Vec::new(); + for sort_expr in sort_exprs { + match replace_cols_by_name( + sort_expr.expr.clone(), + &projection_map, + ) { + Ok(rewritten_expr) => { + // Successfully rewritten, keep it in the pushdown list + rewritten_sorts.push(SortExpr { + expr: rewritten_expr, + asc: sort_expr.asc, + nulls_first: sort_expr.nulls_first, + }); + } + Err(_) => { + // Cannot rewrite this expression, stop here (partial pushdown) + break; + } + } + } + + // Update context with the rewritten sort expressions (or None if empty) + ctx.data = if rewritten_sorts.is_empty() { + None + } else { + Some(rewritten_sorts.clone()) + }; + + // Propagate rewritten sort expressions to children + let data_to_propagate = ctx.data.clone(); + for child in ctx.children.iter_mut() { + child.data = data_to_propagate.clone(); + } + + return Ok(Transformed::no(ctx)); + } + + // Continue recursion with potentially updated sort expressions + Ok(Transformed::no(ctx)) + } + LogicalPlan::SubqueryAlias(ref subquery_alias) => { + // Similar to Projection, we need to rewrite sort expressions through the SubqueryAlias + // by replacing column references that use the alias qualifier with the underlying + // table qualifier. + + if let Some(sort_exprs) = &ctx.data { + // Build mapping: alias.field_name -> underlying Column(qualifier, field_name) + let replace_map = build_schema_remapping( + &subquery_alias.schema, + subquery_alias.input.schema(), + ); + + // Rewrite sort expressions to use underlying qualifiers + let mut rewritten_sorts = Vec::new(); + for sort_expr in sort_exprs { + match replace_cols_by_name( + sort_expr.expr.clone(), + &replace_map, + ) { + Ok(rewritten_expr) => { + rewritten_sorts.push(SortExpr { + expr: rewritten_expr, + asc: sort_expr.asc, + nulls_first: sort_expr.nulls_first, + }); + } + Err(_) => { + // Cannot rewrite, stop here + break; + } + } + } + + // Update context with rewritten sort expressions + ctx.data = if rewritten_sorts.is_empty() { + None + } else { + Some(rewritten_sorts.clone()) + }; + + // Propagate rewritten sort expressions to children + let data_to_propagate = ctx.data.clone(); + for child in ctx.children.iter_mut() { + child.data = data_to_propagate.clone(); + } + } else { + // No sort expressions to propagate + for child in ctx.children.iter_mut() { + child.data = None; + } + } + + Ok(Transformed::no(ctx)) + } + LogicalPlan::Filter(_) + | LogicalPlan::Repartition(_) + | LogicalPlan::Limit(_) => { + // Propagate current sort expressions to children without modification + let current_data = ctx.data.clone(); + for child in ctx.children.iter_mut() { + child.data = current_data.clone(); + } + // Continue recursion without modifying current sort expressions + Ok(Transformed::no(ctx)) + } + _ => { + // Cannot push sort expressions through this node type + // Clear sort expressions from both context and children + ctx.data = None; + for child in ctx.children.iter_mut() { + child.data = None; + } + Ok(Transformed::no(ctx)) + } + } + }) + .and_then(|transformed_ctx| transformed_ctx.map_data(|ctx| Ok(ctx.plan))) + } + + fn name(&self) -> &str { + "push_down_sort" + } +} + +type SortPushdownContext = LogicalPlanContext>>; + +#[cfg(test)] +mod tests { + use super::*; + use crate::assert_optimized_plan_eq_snapshot; + use crate::test::*; + use crate::OptimizerContext; + use datafusion_common::Result; + use datafusion_expr::test::function_stub::sum; + use datafusion_expr::{ + col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, ExprFunctionExt, + }; + use std::sync::Arc; + + macro_rules! assert_optimized_plan_equal { + ( + $plan:expr, + @ $expected:literal $(,)? + ) => {{ + let optimizer_ctx = OptimizerContext::new().with_max_passes(1); + let rules: Vec> = + vec![Arc::new(PushDownSort::new())]; + assert_optimized_plan_eq_snapshot!( + optimizer_ctx, + rules, + $plan, + @ $expected, + ) + }}; + } + + // ===== Basic Sort Pushdown Tests ===== + + #[test] + fn sort_before_table_scan() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, false)])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_with_multiple_columns() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + col("a").sort(true, false), + col("b").sort(false, true), + col("c").sort(true, true), + ])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST, test.b DESC NULLS FIRST, test.c ASC NULLS FIRST + TableScan: test preferred_ordering=[test.a ASC NULLS LAST, test.b DESC NULLS FIRST, test.c ASC NULLS FIRST] + " + ) + } + + #[test] + fn sort_with_options() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![ + col("a").sort(false, true), // DESC NULLS FIRST + col("b").sort(true, true), // ASC NULLS FIRST + ])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a DESC NULLS FIRST, test.b ASC NULLS FIRST + TableScan: test preferred_ordering=[test.a DESC NULLS FIRST, test.b ASC NULLS FIRST] + " + ) + } + + #[test] + fn no_sort_no_pushdown() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan).build()?; + + assert_optimized_plan_equal!( + plan, + @"TableScan: test" + ) + } + + // ===== Sort Through Projection Tests ===== + + #[test] + fn sort_through_simple_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Projection: test.a, test.b + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_through_projection_with_alias() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a").alias("b"), col("c")])? + .sort(vec![col("b").sort(true, false)])? + .build()?; + + // Sort on aliased column 'b' should be rewritten to sort on 'a' + assert_optimized_plan_equal!( + plan, + @r" + Sort: b ASC NULLS LAST + Projection: test.a AS b, test.c + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_through_complex_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), (col("b") + col("c")).alias("bc")])? + .sort(vec![col("bc").sort(true, false)])? + .build()?; + + // Sort on computed column can push down as complex expression + assert_optimized_plan_equal!( + plan, + @r" + Sort: bc ASC NULLS LAST + Projection: test.a, test.b + test.c AS bc + TableScan: test preferred_ordering=[test.b + test.c ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_through_multiple_projections() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b"), col("c")])? + .project(vec![col("a"), col("c")])? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Projection: test.a, test.c + Projection: test.a, test.b, test.c + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_rewrites_expression_to_source_columns() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![ + col("a"), + (col("b") + col("c") + lit(1)).alias("bc1"), + col("c"), + ])? + .sort(vec![ + col("bc1").sort(true, false), + col("c").sort(true, false), + ])? + .build()?; + + // We can successfully rewrite the sort expression `bc1` to `b + c + 1` and push it down + assert_optimized_plan_equal!( + plan, + @r" + Sort: bc1 ASC NULLS LAST, test.c ASC NULLS LAST + Projection: test.a, test.b + test.c + Int32(1) AS bc1, test.c + TableScan: test preferred_ordering=[test.b + test.c + Int32(1) ASC NULLS LAST, test.c ASC NULLS LAST] + " + ) + } + + // ===== Sort Through Filter Tests ===== + + #[test] + fn sort_through_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("a").gt(lit(10i64)))? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Filter: test.a > Int64(10) + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_through_filter_and_projection() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .filter(col("a").gt(lit(10i64)))? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Filter: test.a > Int64(10) + Projection: test.a, test.b + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + // ===== Sort Through Repartition Tests ===== + + #[test] + fn sort_through_repartition() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .repartition(datafusion_expr::Partitioning::RoundRobinBatch(4))? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Repartition: RoundRobinBatch partition_count=4 + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + // ===== Sort Replacement Tests ===== + + #[test] + fn multiple_sorts() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, false)])? + .sort(vec![col("b").sort(false, true)])? + .build()?; + + // The innermost sort should be what gets pushed down + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.b DESC NULLS FIRST + Sort: test.a ASC NULLS LAST + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_updates_existing_sort() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, false), col("b").sort(true, false)])? + .project(vec![col("a"), col("b"), col("c")])? + .sort(vec![col("c").sort(false, true)])? + .build()?; + + // Innermost sort should be what gets pushed down + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.c DESC NULLS FIRST + Projection: test.a, test.b, test.c + Sort: test.a ASC NULLS LAST, test.b ASC NULLS LAST + TableScan: test preferred_ordering=[test.a ASC NULLS LAST, test.b ASC NULLS LAST] + " + ) + } + + // ===== Boundary Case Tests ===== + + #[test] + fn sort_blocked_by_limit() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .limit(0, Some(10))? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + // Sort can push through limit + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Limit: skip=0, fetch=10 + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_blocked_by_aggregate() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + // Sort cannot push through aggregate + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]] + TableScan: test + " + ) + } + + #[test] + fn sort_blocked_by_join() -> Result<()> { + let left = test_table_scan()?; + let right = test_table_scan_with_name("test2")?; + let plan = LogicalPlanBuilder::from(left) + .join( + right, + datafusion_expr::JoinType::Inner, + ( + vec![datafusion_common::Column::from_name("a")], + vec![datafusion_common::Column::from_name("a")], + ), + None, + )? + .sort(vec![col("test.a").sort(true, false)])? + .build()?; + + // Sort cannot push through join + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + Inner Join: test.a = test2.a + TableScan: test + TableScan: test2 + " + ) + } + + #[test] + fn sort_blocked_by_window() -> Result<()> { + let table_scan = test_table_scan()?; + + let window = Expr::from(datafusion_expr::expr::WindowFunction::new( + datafusion_expr::WindowFunctionDefinition::WindowUDF( + datafusion_functions_window::rank::rank_udwf(), + ), + vec![], + )) + .partition_by(vec![col("a")]) + .order_by(vec![col("b").sort(true, true)]) + .build() + .unwrap(); + + let plan = LogicalPlanBuilder::from(table_scan) + .window(vec![window])? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + // Sort cannot push through window + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.a ASC NULLS LAST + WindowAggr: windowExpr=[[rank() PARTITION BY [test.a] ORDER BY [test.b ASC NULLS FIRST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] + TableScan: test + " + ) + } + + // ===== Edge Case Tests ===== + + #[test] + fn sort_with_special_column_names() -> Result<()> { + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_expr::logical_plan::table_scan; + + let schema = Schema::new(vec![ + Field::new("$a", DataType::UInt32, false), + Field::new("$b", DataType::UInt32, false), + ]); + let table_scan = table_scan(Some("test"), &schema, None)?.build()?; + + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("$a").sort(true, false)])? + .build()?; + + assert_optimized_plan_equal!( + plan, + @r" + Sort: test.$a ASC NULLS LAST + TableScan: test preferred_ordering=[test.$a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_through_subquery_alias() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .alias("subquery")? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + // Sort should pass through SubqueryAlias + assert_optimized_plan_equal!( + plan, + @r" + Sort: subquery.a ASC NULLS LAST + SubqueryAlias: subquery + TableScan: test preferred_ordering=[test.a ASC NULLS LAST] + " + ) + } + + #[test] + fn sort_with_union() -> Result<()> { + let left = test_table_scan()?; + let right = test_table_scan_with_name("test2")?; + let plan = LogicalPlanBuilder::from(left) + .union(right)? + .sort(vec![col("a").sort(true, false)])? + .build()?; + + // Sort cannot push through union + assert_optimized_plan_equal!( + plan, + @r" + Sort: a ASC NULLS LAST + Union + TableScan: test + TableScan: test2 + " + ) + } + + // ===== Integration Tests ===== + + #[test] + fn complex_plan_with_sort() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b"), col("c")])? + .filter(col("b").gt(lit(5i64)))? + .project(vec![col("a").alias("x"), col("c")])? + .sort(vec![col("x").sort(true, false), col("c").sort(false, true)])? + .build()?; + + // Sort should push through multiple projections and filter + assert_optimized_plan_equal!( + plan, + @r" + Sort: x ASC NULLS LAST, test.c DESC NULLS FIRST + Projection: test.a AS x, test.c + Filter: test.b > Int64(5) + Projection: test.a, test.b, test.c + TableScan: test preferred_ordering=[test.a ASC NULLS LAST, test.c DESC NULLS FIRST] + " + ) + } + + #[test] + fn sort_preserves_original_node() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, false)])? + .build()?; + + // Verify the original Sort node is preserved + let optimized = { + let optimizer_ctx = OptimizerContext::new().with_max_passes(1); + let rules: Vec> = + vec![Arc::new(PushDownSort::new())]; + let optimizer = crate::Optimizer::with_rules(rules); + optimizer.optimize(plan, &optimizer_ctx, |_, _| {})? + }; + + // Check that the optimized plan still has a Sort node at the top + assert!(matches!(optimized, LogicalPlan::Sort(_))); + + Ok(()) + } +} diff --git a/datafusion/optimizer/src/utils.rs b/datafusion/optimizer/src/utils.rs index 81763fa0552f..3e4af69a644f 100644 --- a/datafusion/optimizer/src/utils.rs +++ b/datafusion/optimizer/src/utils.rs @@ -23,8 +23,11 @@ use crate::analyzer::type_coercion::TypeCoercionRewriter; use arrow::array::{new_null_array, Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::cast::as_boolean_array; +use datafusion_common::tree_node::Transformed; use datafusion_common::tree_node::{TransformedResult, TreeNode}; -use datafusion_common::{Column, DFSchema, Result, ScalarValue}; +use datafusion_common::{ + qualified_name, Column, DFSchema, DFSchemaRef, Result, ScalarValue, +}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr_rewriter::replace_col; use datafusion_expr::{logical_plan::LogicalPlan, ColumnarValue, Expr}; @@ -151,6 +154,73 @@ fn coerce(expr: Expr, schema: &DFSchema) -> Result { expr.rewrite(&mut expr_rewrite).data() } +/// Replaces columns by their name in the provided expression. +/// +/// Replaces all column references in the expression tree by looking up their names +/// in the provided HashMap and replacing them with the corresponding expression. +/// +/// # Arguments +/// +/// * `e` - The expression to transform +/// * `replace_map` - A map from column names (flat_name) to replacement expressions +/// +/// # Returns +/// +/// The transformed expression with columns replaced according to the map +pub fn replace_cols_by_name( + e: Expr, + replace_map: &HashMap, +) -> Result { + e.transform_up(|expr| { + Ok(if let Expr::Column(c) = &expr { + match replace_map.get(&c.flat_name()) { + Some(new_c) => Transformed::yes(new_c.clone()), + None => Transformed::no(expr), + } + } else { + Transformed::no(expr) + }) + }) + .data() +} + +/// Builds a replace map for rewriting column qualifiers from an output schema to an input schema. +/// +/// This function creates a mapping from qualified column names in the output schema +/// to their corresponding Column expressions in the input schema. This is useful when +/// pushing expressions through operators like SubqueryAlias or Union that change +/// column qualifiers. +/// +/// # Arguments +/// +/// * `output_schema` - The schema with output qualifiers (e.g., alias or union name) +/// * `input_schema` - The schema with input qualifiers (e.g., underlying table name) +/// +/// # Returns +/// +/// A HashMap mapping qualified output column names to input Column expressions +/// +/// # Example +/// +/// For a SubqueryAlias "subquery" over table "test": +/// - Input: "test.a" (from input_schema) +/// - Output: "subquery.a" (from output_schema) +/// - Map: {"subquery.a" -> Column("test", "a")} +pub(crate) fn build_schema_remapping( + output_schema: &DFSchemaRef, + input_schema: &DFSchemaRef, +) -> HashMap { + let mut replace_map = HashMap::new(); + for (i, (input_qualifier, input_field)) in input_schema.iter().enumerate() { + let (output_qualifier, output_field) = output_schema.qualified_field(i); + replace_map.insert( + qualified_name(output_qualifier, output_field.name()), + Expr::Column(Column::new(input_qualifier.cloned(), input_field.name())), + ); + } + replace_map +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index ee9ac0e7902d..ee51094c9603 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -82,6 +82,10 @@ message SortExprNodeCollection { repeated SortExprNode sort_expr_nodes = 1; } +message ScanOrderingNode { + SortExprNodeCollection preferred_ordering = 1; +} + message ListingTableScanNode { reserved 1; // was string table_name TableReference table_name = 14; @@ -101,6 +105,7 @@ message ListingTableScanNode { datafusion_common.ArrowFormat arrow = 16; } repeated SortExprNodeCollection file_sort_order = 13; + ScanOrderingNode ordering = 17; } message ViewTableScanNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 29967d812000..a1b759073417 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -10766,6 +10766,9 @@ impl serde::Serialize for ListingTableScanNode { if !self.file_sort_order.is_empty() { len += 1; } + if self.ordering.is_some() { + len += 1; + } if self.file_format_type.is_some() { len += 1; } @@ -10800,6 +10803,9 @@ impl serde::Serialize for ListingTableScanNode { if !self.file_sort_order.is_empty() { struct_ser.serialize_field("fileSortOrder", &self.file_sort_order)?; } + if let Some(v) = self.ordering.as_ref() { + struct_ser.serialize_field("ordering", v)?; + } if let Some(v) = self.file_format_type.as_ref() { match v { listing_table_scan_node::FileFormatType::Csv(v) => { @@ -10845,6 +10851,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { "targetPartitions", "file_sort_order", "fileSortOrder", + "ordering", "csv", "parquet", "avro", @@ -10864,6 +10871,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { CollectStat, TargetPartitions, FileSortOrder, + Ordering, Csv, Parquet, Avro, @@ -10900,6 +10908,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { "collectStat" | "collect_stat" => Ok(GeneratedField::CollectStat), "targetPartitions" | "target_partitions" => Ok(GeneratedField::TargetPartitions), "fileSortOrder" | "file_sort_order" => Ok(GeneratedField::FileSortOrder), + "ordering" => Ok(GeneratedField::Ordering), "csv" => Ok(GeneratedField::Csv), "parquet" => Ok(GeneratedField::Parquet), "avro" => Ok(GeneratedField::Avro), @@ -10934,6 +10943,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { let mut collect_stat__ = None; let mut target_partitions__ = None; let mut file_sort_order__ = None; + let mut ordering__ = None; let mut file_format_type__ = None; while let Some(k) = map_.next_key()? { match k { @@ -10999,6 +11009,12 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { } file_sort_order__ = Some(map_.next_value()?); } + GeneratedField::Ordering => { + if ordering__.is_some() { + return Err(serde::de::Error::duplicate_field("ordering")); + } + ordering__ = map_.next_value()?; + } GeneratedField::Csv => { if file_format_type__.is_some() { return Err(serde::de::Error::duplicate_field("csv")); @@ -11047,6 +11063,7 @@ impl<'de> serde::Deserialize<'de> for ListingTableScanNode { collect_stat: collect_stat__.unwrap_or_default(), target_partitions: target_partitions__.unwrap_or_default(), file_sort_order: file_sort_order__.unwrap_or_default(), + ordering: ordering__, file_format_type: file_format_type__, }) } @@ -20078,6 +20095,98 @@ impl<'de> serde::Deserialize<'de> for ScanLimit { deserializer.deserialize_struct("datafusion.ScanLimit", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for ScanOrderingNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if self.preferred_ordering.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.ScanOrderingNode", len)?; + if let Some(v) = self.preferred_ordering.as_ref() { + struct_ser.serialize_field("preferredOrdering", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for ScanOrderingNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "preferred_ordering", + "preferredOrdering", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + PreferredOrdering, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "preferredOrdering" | "preferred_ordering" => Ok(GeneratedField::PreferredOrdering), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = ScanOrderingNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.ScanOrderingNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut preferred_ordering__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::PreferredOrdering => { + if preferred_ordering__.is_some() { + return Err(serde::de::Error::duplicate_field("preferredOrdering")); + } + preferred_ordering__ = map_.next_value()?; + } + } + } + Ok(ScanOrderingNode { + preferred_ordering: preferred_ordering__, + }) + } + } + deserializer.deserialize_struct("datafusion.ScanOrderingNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for SelectionExecNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d3b5f566e98b..d2ba30d01e27 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -102,6 +102,11 @@ pub struct SortExprNodeCollection { pub sort_expr_nodes: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScanOrderingNode { + #[prost(message, optional, tag = "1")] + pub preferred_ordering: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct ListingTableScanNode { #[prost(message, optional, tag = "14")] pub table_name: ::core::option::Option, @@ -123,6 +128,8 @@ pub struct ListingTableScanNode { pub target_partitions: u32, #[prost(message, repeated, tag = "13")] pub file_sort_order: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "17")] + pub ordering: ::core::option::Option, #[prost( oneof = "listing_table_scan_node::FileFormatType", tags = "10, 11, 12, 15, 16" diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index fd9e07914b07..855a31552277 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -57,8 +57,8 @@ use datafusion::{ }; use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ - context, internal_datafusion_err, internal_err, not_impl_err, plan_err, Result, - TableReference, ToDFSchema, + context, internal_datafusion_err, internal_err, not_impl_err, plan_err, + DataFusionError, Result, TableReference, ToDFSchema, }; use datafusion_expr::{ dml, @@ -271,6 +271,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, + ordering: None, }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) @@ -494,13 +495,30 @@ impl AsLogicalPlan for LogicalPlanNode { projection = Some(column_indices); } - LogicalPlanBuilder::scan_with_filters( + let ordering = scan + .ordering + .as_ref() + .and_then(|o| o.preferred_ordering.as_ref()) + .map(|so| { + from_proto::parse_sorts(&so.sort_expr_nodes, ctx, extension_codec) + }) + .transpose()?; + + let mut scan = TableScan::try_new( table_name, provider_as_source(Arc::new(provider)), projection, filters, - )? - .build() + None, + )?; + if let Some(preferred_ordering) = ordering { + let scan_ordering = datafusion_expr::logical_plan::ScanOrdering { + preferred_ordering: Some(preferred_ordering), + }; + scan = scan.with_ordering(scan_ordering); + } + + Ok(LogicalPlan::TableScan(scan)) } LogicalPlanType::CustomScan(scan) => { let schema: Schema = convert_required!(scan.schema)?; @@ -1002,6 +1020,7 @@ impl AsLogicalPlan for LogicalPlanNode { source, filters, projection, + ordering, .. }) => { let provider = source_as_provider(source)?; @@ -1117,6 +1136,26 @@ impl AsLogicalPlan for LogicalPlanNode { }) .collect::>>()?; + let ordering = ordering + .as_ref() + .map(|ordering| { + Ok::<_, DataFusionError>(protobuf::ScanOrderingNode { + preferred_ordering: ordering + .preferred_ordering + .as_ref() + .map(|order| { + Ok::<_, DataFusionError>(SortExprNodeCollection { + sort_expr_nodes: serialize_sorts( + order, + extension_codec, + )?, + }) + }) + .transpose()?, + }) + }) + .transpose()?; + Ok(LogicalPlanNode { logical_plan_type: Some(LogicalPlanType::ListingScan( protobuf::ListingTableScanNode { @@ -1135,6 +1174,7 @@ impl AsLogicalPlan for LogicalPlanNode { filters, target_partitions: options.target_partitions as u32, file_sort_order: exprs_vec, + ordering, }, )), }) diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 1e6183f48bac..cb49d4143fb4 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -252,7 +252,7 @@ EXPLAIN SELECT id FROM t ORDER BY id ASC; ---- logical_plan 01)Sort: t.id ASC NULLS LAST -02)--TableScan: t projection=[id] +02)--TableScan: t projection=[id] preferred_ordering=[t.id ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet ## Test a DESC order and verify that output_ordering is ASC from the previous OBRDER BY @@ -261,7 +261,7 @@ EXPLAIN SELECT id FROM t ORDER BY id DESC; ---- logical_plan 01)Sort: t.id DESC NULLS FIRST -02)--TableScan: t projection=[id] +02)--TableScan: t projection=[id] preferred_ordering=[t.id DESC NULLS FIRST] physical_plan 01)SortExec: expr=[id@0 DESC], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id], output_ordering=[id@0 ASC NULLS LAST], file_type=parquet diff --git a/datafusion/sqllogictest/test_files/csv_files.slt b/datafusion/sqllogictest/test_files/csv_files.slt index 5a7fa309dbfa..bfbdb6f332f5 100644 --- a/datafusion/sqllogictest/test_files/csv_files.slt +++ b/datafusion/sqllogictest/test_files/csv_files.slt @@ -161,7 +161,7 @@ EXPLAIN SELECT * FROM partitioned_table ORDER BY int_col; ---- logical_plan 01)Sort: partitioned_table.int_col ASC NULLS LAST -02)--TableScan: partitioned_table projection=[int_col, string_col, bigint_col, partition_col] +02)--TableScan: partitioned_table projection=[int_col, string_col, bigint_col, partition_col] preferred_ordering=[partitioned_table.int_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST] 02)--SortExec: expr=[int_col@0 ASC NULLS LAST], preserve_partitioning=[true] diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index a581bcb539a9..14c9873463a5 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -1040,7 +1040,7 @@ logical_plan 04)------Projection: Int64(0) AS k, Int64(0) AS v 05)--------EmptyRelation: rows=1 06)------Sort: r.v ASC NULLS LAST, fetch=1 -07)--------TableScan: r projection=[k, v] +07)--------TableScan: r projection=[k, v] preferred_ordering=[r.v ASC NULLS LAST] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--RecursiveQueryExec: name=r, is_distinct=false diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index a3b6d40aea2d..456ce2510006 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -76,7 +76,7 @@ explain SELECT c1 FROM aggregate_test_100_with_order order by c1 ASC limit 10 ---- logical_plan 01)Sort: aggregate_test_100_with_order.c1 ASC NULLS LAST, fetch=10 -02)--TableScan: aggregate_test_100_with_order projection=[c1] +02)--TableScan: aggregate_test_100_with_order projection=[c1] preferred_ordering=[aggregate_test_100_with_order.c1 ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_test_100_order_by_c1_asc.csv]]}, projection=[c1], limit=10, output_ordering=[c1@0 ASC NULLS LAST], file_type=csv, has_header=true ## explain_physical_plan_only @@ -161,7 +161,7 @@ logical_plan 01)Dml: op=[Insert Into] table=[sink_table] 02)--Projection: aggregate_test_100.c1 AS c1, aggregate_test_100.c2 AS c2, aggregate_test_100.c3 AS c3, aggregate_test_100.c4 AS c4, aggregate_test_100.c5 AS c5, aggregate_test_100.c6 AS c6, aggregate_test_100.c7 AS c7, aggregate_test_100.c8 AS c8, aggregate_test_100.c9 AS c9, aggregate_test_100.c10 AS c10, aggregate_test_100.c11 AS c11, aggregate_test_100.c12 AS c12, aggregate_test_100.c13 AS c13 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST -04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] +04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] preferred_ordering=[aggregate_test_100.c1 ASC NULLS LAST] physical_plan 01)DataSinkExec: sink=StreamWrite { location: "../../testing/data/csv/aggregate_test_100.csv", batch_size: 8192, encoding: Csv, header: true, .. } 02)--SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false] @@ -194,6 +194,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE @@ -216,6 +217,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE @@ -558,6 +560,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE @@ -580,6 +583,7 @@ logical_plan after eliminate_one_union SAME TEXT AS ABOVE logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt index a09d8ce26ddf..659ff8492502 100644 --- a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt +++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt @@ -35,7 +35,7 @@ ORDER BY "date", "time"; logical_plan 01)Sort: data.date ASC NULLS LAST, data.time ASC NULLS LAST 02)--Filter: data.ticker = Utf8View("A") -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.date ASC NULLS LAST, data.time ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST, time@2 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -52,7 +52,7 @@ ORDER BY "time" logical_plan 01)Sort: data.time ASC NULLS LAST 02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = data.date -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.time ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -69,7 +69,7 @@ ORDER BY "date" logical_plan 01)Sort: data.date ASC NULLS LAST 02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = data.date -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.date ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [date@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -86,7 +86,7 @@ ORDER BY "ticker" logical_plan 01)Sort: data.ticker ASC NULLS LAST 02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = data.date -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.ticker ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -103,7 +103,7 @@ ORDER BY "time", "date"; logical_plan 01)Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST 02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) = data.date -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.time ASC NULLS LAST, data.date ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [time@2 ASC NULLS LAST, date@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -121,7 +121,7 @@ ORDER BY "time" logical_plan 01)Sort: data.time ASC NULLS LAST 02)--Filter: data.ticker = Utf8View("A") AND CAST(data.time AS Date32) != data.date -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.time ASC NULLS LAST] # no relation between time & date # should also be pipeline breaking @@ -133,7 +133,7 @@ ORDER BY "time" logical_plan 01)Sort: data.time ASC NULLS LAST 02)--Filter: data.ticker = Utf8View("A") -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.time ASC NULLS LAST] # query query TT @@ -144,7 +144,7 @@ ORDER BY "ticker", "time"; logical_plan 01)Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST 02)--Filter: data.date = Date32("2006-01-02") -03)----TableScan: data projection=[date, ticker, time] +03)----TableScan: data projection=[date, ticker, time] preferred_ordering=[data.ticker ASC NULLS LAST, data.time ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [ticker@1 ASC NULLS LAST, time@2 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 diff --git a/datafusion/sqllogictest/test_files/group_by.slt b/datafusion/sqllogictest/test_files/group_by.slt index b72f73d44698..e1cd6ffb0cc5 100644 --- a/datafusion/sqllogictest/test_files/group_by.slt +++ b/datafusion/sqllogictest/test_files/group_by.slt @@ -2066,7 +2066,7 @@ EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY b DESC; logical_plan 01)Projection: multiple_ordered_table.a 02)--Sort: multiple_ordered_table.b DESC NULLS FIRST -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[multiple_ordered_table.b DESC NULLS FIRST] physical_plan 01)ProjectionExec: expr=[a@0 as a] 02)--SortExec: expr=[b@1 DESC], preserve_partitioning=[false] @@ -2080,7 +2080,7 @@ EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY c ASC; logical_plan 01)Projection: multiple_ordered_table.a 02)--Sort: multiple_ordered_table.c ASC NULLS LAST -03)----TableScan: multiple_ordered_table projection=[a, c] +03)----TableScan: multiple_ordered_table projection=[a, c] preferred_ordering=[multiple_ordered_table.c ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true # Final plan shouldn't have SortExec a ASC, b ASC, @@ -2091,7 +2091,7 @@ EXPLAIN SELECT a FROM multiple_ordered_table ORDER BY a ASC, b ASC; logical_plan 01)Projection: multiple_ordered_table.a 02)--Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true # test_window_agg_sort @@ -2248,7 +2248,7 @@ ORDER BY a, b, d; ---- logical_plan 01)Sort: annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.d ASC NULLS LAST -02)--TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d] +02)--TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d] preferred_ordering=[annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.d ASC NULLS LAST] physical_plan 01)PartialSortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], common_prefix_length=[2] 02)--StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] @@ -2261,7 +2261,7 @@ LIMIT 50; ---- logical_plan 01)Sort: annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.d ASC NULLS LAST, fetch=50 -02)--TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d] +02)--TableScan: annotated_data_infinite2 projection=[a0, a, b, c, d] preferred_ordering=[annotated_data_infinite2.a ASC NULLS LAST, annotated_data_infinite2.b ASC NULLS LAST, annotated_data_infinite2.d ASC NULLS LAST] physical_plan 01)PartialSortExec: TopK(fetch=50), expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], common_prefix_length=[2] 02)--StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] @@ -2273,7 +2273,7 @@ ORDER BY a, b, d; ---- logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST -02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] +02)--TableScan: multiple_ordered_table projection=[a0, a, b, c, d] preferred_ordering=[multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST, multiple_ordered_table.d ASC NULLS LAST] physical_plan 01)SortExec: expr=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, d@4 ASC NULLS LAST], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_orderings=[[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], [c@3 ASC NULLS LAST]], file_type=csv, has_header=true @@ -2535,7 +2535,7 @@ logical_plan 02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]] 03)----SubqueryAlias: s 04)------Sort: sales_global.country ASC NULLS LAST, fetch=10 -05)--------TableScan: sales_global projection=[country, amount] +05)--------TableScan: sales_global projection=[country, amount] preferred_ordering=[sales_global.country ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1] 02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)], ordering_mode=Sorted @@ -2573,7 +2573,7 @@ logical_plan 02)--Aggregate: groupBy=[[s.country, s.zip_code]], aggr=[[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]] 03)----SubqueryAlias: s 04)------Sort: sales_global.country ASC NULLS LAST, fetch=10 -05)--------TableScan: sales_global projection=[zip_code, country, amount] +05)--------TableScan: sales_global projection=[zip_code, country, amount] preferred_ordering=[sales_global.country ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[country@0 as country, zip_code@1 as zip_code, array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST]@2 as amounts, sum(s.amount)@3 as sum1] 02)--AggregateExec: mode=Single, gby=[country@1 as country, zip_code@0 as zip_code], aggr=[array_agg(s.amount) ORDER BY [s.amount DESC NULLS FIRST], sum(s.amount)], ordering_mode=PartiallySorted([0]) @@ -2610,7 +2610,7 @@ logical_plan 02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]] 03)----SubqueryAlias: s 04)------Sort: sales_global.country ASC NULLS LAST, fetch=10 -05)--------TableScan: sales_global projection=[country, amount] +05)--------TableScan: sales_global projection=[country, amount] preferred_ordering=[sales_global.country ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1] 02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST], sum(s.amount)], ordering_mode=Sorted @@ -2646,7 +2646,7 @@ logical_plan 02)--Aggregate: groupBy=[[s.country]], aggr=[[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], sum(CAST(s.amount AS Float64))]] 03)----SubqueryAlias: s 04)------Sort: sales_global.country ASC NULLS LAST, fetch=10 -05)--------TableScan: sales_global projection=[country, amount] +05)--------TableScan: sales_global projection=[country, amount] preferred_ordering=[sales_global.country ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[country@0 as country, array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST]@1 as amounts, sum(s.amount)@2 as sum1] 02)--AggregateExec: mode=Single, gby=[country@0 as country], aggr=[array_agg(s.amount) ORDER BY [s.country DESC NULLS FIRST, s.amount DESC NULLS FIRST], sum(s.amount)], ordering_mode=Sorted @@ -3906,7 +3906,7 @@ ORDER BY c ASC; ---- logical_plan 01)Sort: multiple_ordered_table.c ASC NULLS LAST -02)--TableScan: multiple_ordered_table projection=[c] +02)--TableScan: multiple_ordered_table projection=[c] preferred_ordering=[multiple_ordered_table.c ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c], output_ordering=[c@0 ASC NULLS LAST], file_type=csv, has_header=true statement ok @@ -4392,7 +4392,7 @@ EXPLAIN SELECT name, date_bin('15 minutes', ts) as time_chunks logical_plan 01)Sort: unbounded_csv_with_timestamps2.name DESC NULLS FIRST, time_chunks DESC NULLS FIRST, fetch=5 02)--Projection: unbounded_csv_with_timestamps2.name, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), unbounded_csv_with_timestamps2.ts) AS time_chunks -03)----TableScan: unbounded_csv_with_timestamps2 projection=[name, ts] +03)----TableScan: unbounded_csv_with_timestamps2 projection=[name, ts] preferred_ordering=[unbounded_csv_with_timestamps2.name DESC NULLS FIRST, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), unbounded_csv_with_timestamps2.ts) DESC NULLS FIRST] physical_plan 01)SortPreservingMergeExec: [name@0 DESC, time_chunks@1 DESC], fetch=5 02)--ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index 9a3c959884aa..e113e153c6d1 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -215,7 +215,7 @@ logical_plan 01)Dml: op=[Insert Into] table=[table_without_values] 02)--Projection: aggregate_test_100.c1 AS c1 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST -04)------TableScan: aggregate_test_100 projection=[c1] +04)------TableScan: aggregate_test_100 projection=[c1] preferred_ordering=[aggregate_test_100.c1 ASC NULLS LAST] physical_plan 01)DataSinkExec: sink=MemoryTable (partitions=1) 02)--SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false] diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 075256ae4b92..67d692c12195 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -521,7 +521,7 @@ logical_plan 01)Dml: op=[Insert Into] table=[table_without_values] 02)--Projection: aggregate_test_100.c1 AS c1 03)----Sort: aggregate_test_100.c1 ASC NULLS LAST -04)------TableScan: aggregate_test_100 projection=[c1] +04)------TableScan: aggregate_test_100 projection=[c1] preferred_ordering=[aggregate_test_100.c1 ASC NULLS LAST] physical_plan 01)DataSinkExec: sink=ParquetSink(file_groups=[]) 02)--SortExec: expr=[c1@0 ASC NULLS LAST], preserve_partitioning=[false] diff --git a/datafusion/sqllogictest/test_files/joins.slt b/datafusion/sqllogictest/test_files/joins.slt index 96d2bad086e6..8997b5740452 100644 --- a/datafusion/sqllogictest/test_files/joins.slt +++ b/datafusion/sqllogictest/test_files/joins.slt @@ -3890,7 +3890,7 @@ logical_plan 03)----TableScan: left_table_no_nulls projection=[a, b] 04)--SubqueryAlias: rhs 05)----Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10 -06)------TableScan: right_table_no_nulls projection=[a, b] +06)------TableScan: right_table_no_nulls projection=[a, b] preferred_ordering=[right_table_no_nulls.b ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] 02)--CoalesceBatchesExec: target_batch_size=3 @@ -4008,7 +4008,7 @@ logical_plan 03)----TableScan: left_table_no_nulls projection=[a, b] 04)--SubqueryAlias: rhs 05)----Sort: right_table_no_nulls.b ASC NULLS LAST, fetch=10 -06)------TableScan: right_table_no_nulls projection=[a, b] +06)------TableScan: right_table_no_nulls projection=[a, b] preferred_ordering=[right_table_no_nulls.b ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b] 02)--CoalesceBatchesExec: target_batch_size=3 diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index ae82aee5e155..d92e9de36bec 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -562,7 +562,7 @@ explain SELECT * FROM data ORDER BY column1 LIMIT 3,3; logical_plan 01)Limit: skip=3, fetch=3 02)--Sort: data.column1 ASC NULLS LAST, fetch=6 -03)----TableScan: data projection=[column1, column2] +03)----TableScan: data projection=[column1, column2] preferred_ordering=[data.column1 ASC NULLS LAST] physical_plan 01)GlobalLimitExec: skip=3, fetch=3 02)--StreamingTableExec: partition_sizes=1, projection=[column1, column2], infinite_source=true, fetch=6, output_ordering=[column1@0 ASC NULLS LAST, column2@1 ASC NULLS LAST] @@ -848,7 +848,7 @@ logical_plan 03)----Projection: Int64(1) AS foo, selection.part_key 04)------SubqueryAlias: selection 05)--------Sort: test_limit_with_partitions.part_key ASC NULLS LAST, fetch=1 -06)----------TableScan: test_limit_with_partitions projection=[part_key] +06)----------TableScan: test_limit_with_partitions projection=[part_key] preferred_ordering=[test_limit_with_partitions.part_key ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[1 as foo] 02)--SortPreservingMergeExec: [part_key@0 ASC NULLS LAST], fetch=1 diff --git a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt index 9c806cfa0d8a..fd091dbdbf80 100644 --- a/datafusion/sqllogictest/test_files/monotonic_projection_test.slt +++ b/datafusion/sqllogictest/test_files/monotonic_projection_test.slt @@ -42,7 +42,7 @@ ORDER BY a_big ASC, b ASC; logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: CAST(multiple_ordered_table.a AS Int64) AS a_big, multiple_ordered_table.b -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[CAST(multiple_ordered_table.a AS Int64) ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a_big@0 ASC NULLS LAST, b@1 ASC NULLS LAST] 02)--ProjectionExec: expr=[CAST(a@0 AS Int64) as a_big, b@1 as b] @@ -58,7 +58,7 @@ ORDER BY a ASC, b ASC; logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, CAST(multiple_ordered_table.a AS Int64) AS a_big, multiple_ordered_table.b -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST, b@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] @@ -79,7 +79,7 @@ ORDER BY a_big ASC, b ASC; logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, CAST(multiple_ordered_table.a AS Int64) AS a_big, multiple_ordered_table.b -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[CAST(multiple_ordered_table.a AS Int64) ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a_big@1 ASC NULLS LAST, b@2 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, CAST(a@0 AS Int64) as a_big, b@1 as b] @@ -96,7 +96,7 @@ ORDER BY a_big ASC, b ASC; logical_plan 01)Sort: a_big ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@0 as a, a@0 as a_big, b@1 as b] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true @@ -110,7 +110,7 @@ ORDER BY a ASC, b ASC; logical_plan 01)Sort: multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: multiple_ordered_table.a, multiple_ordered_table.a AS a_big, multiple_ordered_table.b -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[multiple_ordered_table.a ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@0 as a, a@0 as a_big, b@1 as b] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true @@ -130,7 +130,7 @@ ORDER BY a_str ASC, b ASC; logical_plan 01)Sort: a_str ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST 02)--Projection: CAST(multiple_ordered_table.a AS Utf8View) AS a_str, multiple_ordered_table.b -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[CAST(multiple_ordered_table.a AS Utf8View) ASC NULLS LAST, multiple_ordered_table.b ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST] 02)--SortExec: expr=[a_str@0 ASC NULLS LAST, b@1 ASC NULLS LAST], preserve_partitioning=[true] @@ -149,7 +149,7 @@ ORDER BY a + b ASC; ---- logical_plan 01)Sort: multiple_ordered_table.a + multiple_ordered_table.b ASC NULLS LAST -02)--TableScan: multiple_ordered_table projection=[a, b] +02)--TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[multiple_ordered_table.a + multiple_ordered_table.b ASC NULLS LAST] physical_plan 01)SortExec: expr=[a@0 + b@1 ASC NULLS LAST], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true @@ -165,7 +165,7 @@ ORDER BY sum_expr ASC; logical_plan 01)Sort: sum_expr ASC NULLS LAST 02)--Projection: CAST(multiple_ordered_table.a + multiple_ordered_table.b AS Int64) AS sum_expr, multiple_ordered_table.a, multiple_ordered_table.b -03)----TableScan: multiple_ordered_table projection=[a, b] +03)----TableScan: multiple_ordered_table projection=[a, b] preferred_ordering=[CAST(multiple_ordered_table.a + multiple_ordered_table.b AS Int64) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [sum_expr@0 ASC NULLS LAST] 02)--SortExec: expr=[sum_expr@0 ASC NULLS LAST], preserve_partitioning=[true] diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 04a7615c764b..c6b22581ee2d 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -254,7 +254,7 @@ explain SELECT c1, c2 FROM aggregate_test_100 ORDER BY c2, c3, c2 logical_plan 01)Projection: aggregate_test_100.c1, aggregate_test_100.c2 02)--Sort: aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST -03)----TableScan: aggregate_test_100 projection=[c1, c2, c3] +03)----TableScan: aggregate_test_100 projection=[c1, c2, c3] preferred_ordering=[aggregate_test_100.c2 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[c1@0 as c1, c2@1 as c2] 02)--SortExec: expr=[c2@1 ASC NULLS LAST, c3@2 ASC NULLS LAST], preserve_partitioning=[false] @@ -557,7 +557,7 @@ ORDER BY result; logical_plan 01)Sort: result ASC NULLS LAST 02)--Projection: multiple_ordered_table.b + multiple_ordered_table.a + multiple_ordered_table.c AS result -03)----TableScan: multiple_ordered_table projection=[a, b, c] +03)----TableScan: multiple_ordered_table projection=[a, b, c] preferred_ordering=[multiple_ordered_table.b + multiple_ordered_table.a + multiple_ordered_table.c ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [result@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[b@1 + a@0 + c@2 as result] @@ -588,7 +588,7 @@ ORDER BY db15; logical_plan 01)Sort: db15 ASC NULLS LAST 02)--Projection: date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) AS db15 -03)----TableScan: csv_with_timestamps projection=[ts] +03)----TableScan: csv_with_timestamps projection=[ts] preferred_ordering=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }"), csv_with_timestamps.ts, TimestampNanosecond(1659537600000000000, None)) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [db15@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0, 1659537600000000000) as db15] @@ -603,7 +603,7 @@ ORDER BY dt_day; logical_plan 01)Sort: dt_day ASC NULLS LAST 02)--Projection: date_trunc(Utf8("DAY"), csv_with_timestamps.ts) AS dt_day -03)----TableScan: csv_with_timestamps projection=[ts] +03)----TableScan: csv_with_timestamps projection=[ts] preferred_ordering=[date_trunc(Utf8("DAY"), csv_with_timestamps.ts) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [dt_day@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[date_trunc(DAY, ts@0) as dt_day] @@ -646,7 +646,7 @@ ORDER BY atan_c11; logical_plan 01)Sort: atan_c11 ASC NULLS LAST 02)--Projection: atan(aggregate_test_100.c11) AS atan_c11 -03)----TableScan: aggregate_test_100 projection=[c11] +03)----TableScan: aggregate_test_100 projection=[c11] preferred_ordering=[atan(aggregate_test_100.c11) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [atan_c11@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[atan(c11@0) as atan_c11] @@ -661,7 +661,7 @@ ORDER BY ceil_c11; logical_plan 01)Sort: ceil_c11 ASC NULLS LAST 02)--Projection: ceil(aggregate_test_100.c11) AS ceil_c11 -03)----TableScan: aggregate_test_100 projection=[c11] +03)----TableScan: aggregate_test_100 projection=[c11] preferred_ordering=[ceil(aggregate_test_100.c11) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [ceil_c11@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[ceil(c11@0) as ceil_c11] @@ -676,7 +676,7 @@ query TT logical_plan 01)Sort: log_c11_base_c12 ASC NULLS LAST 02)--Projection: log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) AS log_c11_base_c12 -03)----TableScan: aggregate_test_100 projection=[c11, c12] +03)----TableScan: aggregate_test_100 projection=[c11, c12] preferred_ordering=[log(aggregate_test_100.c12, CAST(aggregate_test_100.c11 AS Float64)) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [log_c11_base_c12@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[log(c12@1, CAST(c11@0 AS Float64)) as log_c11_base_c12] @@ -691,7 +691,7 @@ ORDER BY log_c12_base_c11 DESC NULLS LAST; logical_plan 01)Sort: log_c12_base_c11 DESC NULLS LAST 02)--Projection: log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) AS log_c12_base_c11 -03)----TableScan: aggregate_test_100 projection=[c11, c12] +03)----TableScan: aggregate_test_100 projection=[c11, c12] preferred_ordering=[log(CAST(aggregate_test_100.c11 AS Float64), aggregate_test_100.c12) DESC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [log_c12_base_c11@0 DESC NULLS LAST] 02)--ProjectionExec: expr=[log(CAST(c11@0 AS Float64), c12@1) as log_c12_base_c11] @@ -757,7 +757,7 @@ EXPLAIN SELECT o_orderkey, o_orderstatus FROM orders ORDER BY o_orderkey ASC ---- logical_plan 01)Sort: orders.o_orderkey ASC NULLS LAST -02)--TableScan: orders projection=[o_orderkey, o_orderstatus] +02)--TableScan: orders projection=[o_orderkey, o_orderstatus] preferred_ordering=[orders.o_orderkey ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/tpch-csv/orders.csv]]}, projection=[o_orderkey, o_orderstatus], output_ordering=[o_orderkey@0 ASC NULLS LAST], file_type=csv, has_header=true @@ -801,7 +801,7 @@ EXPLAIN SELECT * FROM t1 ORDER BY id DESC, id, name, id ASC; ---- logical_plan 01)Sort: t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST -02)--TableScan: t1 projection=[id, name] +02)--TableScan: t1 projection=[id, name] preferred_ordering=[t1.id DESC NULLS FIRST, t1.name ASC NULLS LAST] physical_plan 01)SortExec: expr=[id@0 DESC, name@1 ASC NULLS LAST], preserve_partitioning=[false] 02)--DataSourceExec: partitions=1, partition_sizes=[1] @@ -820,7 +820,7 @@ EXPLAIN SELECT * FROM t1 ORDER BY id ASC, id, name, id DESC; ---- logical_plan 01)Sort: t1.id ASC NULLS LAST, t1.name ASC NULLS LAST -02)--TableScan: t1 projection=[id, name] +02)--TableScan: t1 projection=[id, name] preferred_ordering=[t1.id ASC NULLS LAST, t1.name ASC NULLS LAST] physical_plan 01)SortExec: expr=[id@0 ASC NULLS LAST, name@1 ASC NULLS LAST], preserve_partitioning=[false] 02)--DataSourceExec: partitions=1, partition_sizes=[1] @@ -1140,7 +1140,7 @@ limit 5; logical_plan 01)Sort: c_str ASC NULLS LAST, fetch=5 02)--Projection: CAST(ordered_table.c AS Utf8View) AS c_str -03)----TableScan: ordered_table projection=[c] +03)----TableScan: ordered_table projection=[c] preferred_ordering=[CAST(ordered_table.c AS Utf8View) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [c_str@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[c_str@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -1171,7 +1171,7 @@ limit 5; logical_plan 01)Sort: c_bigint ASC NULLS LAST, fetch=5 02)--Projection: CAST(ordered_table.c AS Int64) AS c_bigint -03)----TableScan: ordered_table projection=[c] +03)----TableScan: ordered_table projection=[c] preferred_ordering=[CAST(ordered_table.c AS Int64) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [c_bigint@0 ASC NULLS LAST], fetch=5 02)--ProjectionExec: expr=[CAST(c@0 AS Int64) as c_bigint] @@ -1206,7 +1206,7 @@ limit 5; logical_plan 01)Sort: abs_c ASC NULLS LAST, fetch=5 02)--Projection: abs(ordered_table.c) AS abs_c -03)----TableScan: ordered_table projection=[c] +03)----TableScan: ordered_table projection=[c] preferred_ordering=[abs(ordered_table.c) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 02)--SortExec: TopK(fetch=5), expr=[abs_c@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -1241,7 +1241,7 @@ limit 5; logical_plan 01)Sort: abs_c ASC NULLS LAST, fetch=5 02)--Projection: abs(ordered_table.c) AS abs_c -03)----TableScan: ordered_table projection=[c] +03)----TableScan: ordered_table projection=[c] preferred_ordering=[abs(ordered_table.c) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [abs_c@0 ASC NULLS LAST], fetch=5 02)--ProjectionExec: expr=[abs(c@0) as abs_c] @@ -1267,7 +1267,7 @@ EXPLAIN SELECT CAST((inc_col>desc_col) as integer) as c from annotated_data_fini logical_plan 01)Sort: c ASC NULLS LAST 02)--Projection: CAST(annotated_data_finite.inc_col > annotated_data_finite.desc_col AS Int32) AS c -03)----TableScan: annotated_data_finite projection=[inc_col, desc_col] +03)----TableScan: annotated_data_finite projection=[inc_col, desc_col] preferred_ordering=[CAST(annotated_data_finite.inc_col > annotated_data_finite.desc_col AS Int32) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [c@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[CAST(inc_col@0 > desc_col@1 AS Int32) as c] @@ -1291,7 +1291,7 @@ EXPLAIN SELECT a + b as sum1 FROM (SELECT a, b logical_plan 01)Projection: ordered_table.a + ordered_table.b AS sum1 02)--Sort: ordered_table.a ASC NULLS LAST, fetch=1 -03)----TableScan: ordered_table projection=[a, b] +03)----TableScan: ordered_table projection=[a, b] preferred_ordering=[ordered_table.a ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@0 + b@1 as sum1] 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -1311,7 +1311,7 @@ EXPLAIN SELECT a + b as sum1 FROM (SELECT a, b logical_plan 01)Projection: ordered_table.a + ordered_table.b AS sum1 02)--Sort: ordered_table.a ASC NULLS LAST, fetch=1 -03)----TableScan: ordered_table projection=[a, b] +03)----TableScan: ordered_table projection=[a, b] preferred_ordering=[ordered_table.a ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@0 + b@1 as sum1] 02)--SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] @@ -1330,7 +1330,7 @@ EXPLAIN SELECT a + b as sum1 FROM (SELECT a, b logical_plan 01)Projection: ordered_table.a + ordered_table.b AS sum1 02)--Sort: ordered_table.a ASC NULLS LAST, fetch=1 -03)----TableScan: ordered_table projection=[a, b] +03)----TableScan: ordered_table projection=[a, b] preferred_ordering=[ordered_table.a ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@0 + b@1 as sum1] 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 @@ -1444,7 +1444,7 @@ EXPLAIN SELECT c1, c2 FROM table_with_ordered_pk ORDER BY c1, c2; ---- logical_plan 01)Sort: table_with_ordered_pk.c1 ASC NULLS LAST, table_with_ordered_pk.c2 ASC NULLS LAST -02)--TableScan: table_with_ordered_pk projection=[c1, c2] +02)--TableScan: table_with_ordered_pk projection=[c1, c2] preferred_ordering=[table_with_ordered_pk.c1 ASC NULLS LAST, table_with_ordered_pk.c2 ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2], output_ordering=[c1@0 ASC NULLS LAST], constraints=[PrimaryKey([0])], file_type=csv, has_header=true statement ok diff --git a/datafusion/sqllogictest/test_files/parquet.slt b/datafusion/sqllogictest/test_files/parquet.slt index e722005bf0f0..ed681f568833 100644 --- a/datafusion/sqllogictest/test_files/parquet.slt +++ b/datafusion/sqllogictest/test_files/parquet.slt @@ -87,7 +87,7 @@ ORDER BY string_col, int_col; ---- logical_plan 01)Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST -02)--TableScan: test_table projection=[int_col, string_col] +02)--TableScan: test_table projection=[int_col, string_col] preferred_ordering=[test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] 02)--SortExec: expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -117,7 +117,7 @@ ORDER BY string_col, int_col; ---- logical_plan 01)Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST -02)--TableScan: test_table projection=[int_col, string_col] +02)--TableScan: test_table projection=[int_col, string_col] preferred_ordering=[test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet @@ -138,7 +138,7 @@ ORDER BY string_col, int_col; ---- logical_plan 01)Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST -02)--TableScan: test_table projection=[int_col, string_col] +02)--TableScan: test_table projection=[int_col, string_col] preferred_ordering=[test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST] 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/2.parquet]]}, projection=[int_col, string_col], output_ordering=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], file_type=parquet diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 6dc2c264aeb8..8a005353abaf 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -91,7 +91,7 @@ logical_plan 01)Sort: t.a ASC NULLS LAST 02)--Projection: t.a 03)----Filter: t.b > Int32(2) -04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] +04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] preferred_ordering=[t.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -107,7 +107,7 @@ logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST 02)--Projection: t_pushdown.a 03)----Filter: t_pushdown.b > Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] preferred_ordering=[t_pushdown.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -130,7 +130,7 @@ logical_plan 01)Projection: t.a 02)--Sort: t.b ASC NULLS LAST 03)----Filter: t.b = Int32(2) -04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)] +04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)] preferred_ordering=[t.b ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--ProjectionExec: expr=[a@0 as a] @@ -146,7 +146,7 @@ logical_plan 01)Projection: t_pushdown.a 02)--Sort: t_pushdown.b ASC NULLS LAST 03)----Filter: t_pushdown.b = Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] preferred_ordering=[t_pushdown.b ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)] @@ -180,7 +180,7 @@ logical_plan 01)Sort: t.a ASC NULLS LAST 02)--Projection: t.a 03)----Filter: t.b > Int32(2) -04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] +04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] preferred_ordering=[t.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -193,7 +193,7 @@ logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST 02)--Projection: t_pushdown.a 03)----Filter: t_pushdown.b > Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] preferred_ordering=[t_pushdown.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -216,7 +216,7 @@ logical_plan 01)Projection: t.a 02)--Sort: t.b ASC NULLS LAST 03)----Filter: t.b = Int32(2) -04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)] +04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)] preferred_ordering=[t.b ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)] @@ -228,7 +228,7 @@ logical_plan 01)Projection: t_pushdown.a 02)--Sort: t_pushdown.b ASC NULLS LAST 03)----Filter: t_pushdown.b = Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] preferred_ordering=[t_pushdown.b ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)] @@ -262,7 +262,7 @@ logical_plan 01)Sort: t.a ASC NULLS LAST 02)--Projection: t.a 03)----Filter: t.b > Int32(2) -04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] +04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] preferred_ordering=[t.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -278,7 +278,7 @@ logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST 02)--Projection: t_pushdown.a 03)----Filter: t_pushdown.b > Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2)] preferred_ordering=[t_pushdown.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -301,7 +301,7 @@ logical_plan 01)Projection: t.a 02)--Sort: t.b ASC NULLS LAST 03)----Filter: t.b = Int32(2) -04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)] +04)------TableScan: t projection=[a, b], partial_filters=[t.b = Int32(2)] preferred_ordering=[t.b ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--ProjectionExec: expr=[a@0 as a] @@ -317,7 +317,7 @@ logical_plan 01)Projection: t_pushdown.a 02)--Sort: t_pushdown.b ASC NULLS LAST 03)----Filter: t_pushdown.b = Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] preferred_ordering=[t_pushdown.b ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)] @@ -340,7 +340,7 @@ logical_plan 01)Sort: t.a ASC NULLS LAST 02)--Projection: t.a 03)----Filter: t.b > Int32(2) -04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] +04)------TableScan: t projection=[a, b], partial_filters=[t.b > Int32(2)] preferred_ordering=[t.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -361,7 +361,7 @@ logical_plan 01)Projection: t_pushdown.a 02)--Sort: t_pushdown.b ASC NULLS LAST 03)----Filter: t_pushdown.b = Int32(2) -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b = Int32(2)] preferred_ordering=[t_pushdown.b ASC NULLS LAST] physical_plan 01)CoalescePartitionsExec 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 = 2, pruning_predicate=b_null_count@2 != row_count@3 AND b_min@0 <= 2 AND 2 <= b_max@1, required_guarantees=[b in (2)] @@ -380,7 +380,7 @@ logical_plan 01)Sort: t_pushdown.a ASC NULLS LAST 02)--Projection: t_pushdown.a 03)----Filter: t_pushdown.b > Int32(2) AND t_pushdown.a IS NOT NULL -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.b > Int32(2), t_pushdown.a IS NOT NULL] preferred_ordering=[t_pushdown.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -399,7 +399,7 @@ logical_plan 01)Sort: t_pushdown.b ASC NULLS LAST 02)--Projection: t_pushdown.b 03)----Filter: t_pushdown.a = Utf8View("bar") -04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.a = Utf8View("bar")] +04)------TableScan: t_pushdown projection=[a, b], partial_filters=[t_pushdown.a = Utf8View("bar")] preferred_ordering=[t_pushdown.b ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [b@0 ASC NULLS LAST] 02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true] diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index fe909e70ffb0..01362a9cc29c 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -117,7 +117,7 @@ ORDER BY int_col, bigint_col, nulls_first_col NULLS FIRST, nulls_last_col NULLS ---- logical_plan 01)Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST -02)--TableScan: test_table projection=[int_col, bigint_col, nulls_first_col, nulls_last_col] +02)--TableScan: test_table projection=[int_col, bigint_col, nulls_first_col, nulls_last_col] preferred_ordering=[test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST] 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, nulls_first_col, nulls_last_col], output_ordering=[int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST, nulls_first_col@2 ASC, nulls_last_col@3 ASC NULLS LAST], file_type=parquet @@ -133,7 +133,7 @@ logical_plan 01)Projection: test_table.string_col 02)--Sort: test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST 03)----Projection: test_table.string_col, test_table.int_col, test_table.bigint_col, test_table.nulls_first_col, test_table.nulls_last_col -04)------TableScan: test_table projection=[int_col, string_col, bigint_col, nulls_first_col, nulls_last_col] +04)------TableScan: test_table projection=[int_col, string_col, bigint_col, nulls_first_col, nulls_last_col] preferred_ordering=[test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST, test_table.nulls_first_col ASC NULLS FIRST, test_table.nulls_last_col ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[string_col@0 as string_col] 02)--SortPreservingMergeExec: [int_col@1 ASC NULLS LAST, bigint_col@2 ASC NULLS LAST, nulls_first_col@3 ASC, nulls_last_col@4 ASC NULLS LAST] @@ -168,7 +168,7 @@ ORDER BY descending_col DESC NULLS LAST, bigint_col ASC NULLS LAST; ---- logical_plan 01)Sort: test_table.descending_col DESC NULLS LAST, test_table.bigint_col ASC NULLS LAST -02)--TableScan: test_table projection=[descending_col, bigint_col] +02)--TableScan: test_table projection=[descending_col, bigint_col] preferred_ordering=[test_table.descending_col DESC NULLS LAST, test_table.bigint_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST] 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[descending_col, bigint_col], output_ordering=[descending_col@0 DESC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet @@ -204,7 +204,7 @@ ORDER BY partition_col, int_col, bigint_col; ---- logical_plan 01)Sort: test_table.partition_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST -02)--TableScan: test_table projection=[int_col, bigint_col, partition_col] +02)--TableScan: test_table projection=[int_col, bigint_col, partition_col] preferred_ordering=[test_table.partition_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST, test_table.bigint_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST] 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, partition_col], output_ordering=[partition_col@2 ASC NULLS LAST, int_col@0 ASC NULLS LAST, bigint_col@1 ASC NULLS LAST], file_type=parquet @@ -237,7 +237,7 @@ ORDER BY overlapping_col; ---- logical_plan 01)Sort: test_table.overlapping_col ASC NULLS LAST -02)--TableScan: test_table projection=[int_col, bigint_col, overlapping_col] +02)--TableScan: test_table projection=[int_col, bigint_col, overlapping_col] preferred_ordering=[test_table.overlapping_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [overlapping_col@2 ASC NULLS LAST] 02)--DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[int_col, bigint_col, overlapping_col], output_ordering=[overlapping_col@2 ASC NULLS LAST], file_type=parquet @@ -271,7 +271,7 @@ ORDER BY constant_col; ---- logical_plan 01)Sort: test_table.constant_col ASC NULLS LAST -02)--TableScan: test_table projection=[constant_col] +02)--TableScan: test_table projection=[constant_col] preferred_ordering=[test_table.constant_col ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST] 02)--SortExec: expr=[constant_col@0 ASC NULLS LAST], preserve_partitioning=[true] diff --git a/datafusion/sqllogictest/test_files/references.slt b/datafusion/sqllogictest/test_files/references.slt index 0e72c5e5a29e..607db17f1e7f 100644 --- a/datafusion/sqllogictest/test_files/references.slt +++ b/datafusion/sqllogictest/test_files/references.slt @@ -103,7 +103,7 @@ EXPLAIN (SELECT "....", "...." AS c3 FROM test ORDER BY "...."); logical_plan 01)Sort: test..... ASC NULLS LAST 02)--Projection: test....., test..... AS c3 -03)----TableScan: test projection=[....] +03)----TableScan: test projection=[....] preferred_ordering=[test..... ASC NULLS LAST] physical_plan 01)SortExec: expr=[....@0 ASC NULLS LAST], preserve_partitioning=[false] 02)--ProjectionExec: expr=[....@0 as ...., ....@0 as c3] diff --git a/datafusion/sqllogictest/test_files/repartition_scan.slt b/datafusion/sqllogictest/test_files/repartition_scan.slt index c536c8165c5a..5f02bed7218d 100644 --- a/datafusion/sqllogictest/test_files/repartition_scan.slt +++ b/datafusion/sqllogictest/test_files/repartition_scan.slt @@ -96,7 +96,7 @@ EXPLAIN SELECT column1 FROM parquet_table WHERE column1 <> 42 ORDER BY column1; logical_plan 01)Sort: parquet_table.column1 ASC NULLS LAST 02)--Filter: parquet_table.column1 != Int32(42) -03)----TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] +03)----TableScan: parquet_table projection=[column1], partial_filters=[parquet_table.column1 != Int32(42)] preferred_ordering=[parquet_table.column1 ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--SortExec: expr=[column1@0 ASC NULLS LAST], preserve_partitioning=[true] @@ -133,7 +133,7 @@ EXPLAIN SELECT column1 FROM parquet_table_with_order WHERE column1 <> 42 ORDER B logical_plan 01)Sort: parquet_table_with_order.column1 ASC NULLS LAST 02)--Filter: parquet_table_with_order.column1 != Int32(42) -03)----TableScan: parquet_table_with_order projection=[column1], partial_filters=[parquet_table_with_order.column1 != Int32(42)] +03)----TableScan: parquet_table_with_order projection=[column1], partial_filters=[parquet_table_with_order.column1 != Int32(42)] preferred_ordering=[parquet_table_with_order.column1 ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [column1@0 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 diff --git a/datafusion/sqllogictest/test_files/select.slt b/datafusion/sqllogictest/test_files/select.slt index cd1f90c42efd..f7f80c9b03be 100644 --- a/datafusion/sqllogictest/test_files/select.slt +++ b/datafusion/sqllogictest/test_files/select.slt @@ -1195,7 +1195,7 @@ EXPLAIN SELECT a FROM annotated_data_finite2 ---- logical_plan 01)Sort: annotated_data_finite2.a ASC NULLS LAST, fetch=5 -02)--TableScan: annotated_data_finite2 projection=[a] +02)--TableScan: annotated_data_finite2 projection=[a] preferred_ordering=[annotated_data_finite2.a ASC NULLS LAST] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a], limit=5, output_ordering=[a@0 ASC NULLS LAST], file_type=csv, has_header=true query I @@ -1376,7 +1376,7 @@ EXPLAIN select a as a FROM table1 order by a ---- logical_plan 01)Sort: table1.a ASC NULLS LAST -02)--TableScan: table1 projection=[a] +02)--TableScan: table1 projection=[a] preferred_ordering=[table1.a ASC NULLS LAST] physical_plan 01)SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 02)--DataSourceExec: partitions=1, partition_sizes=[1] @@ -1400,7 +1400,7 @@ ORDER BY a ASC; logical_plan 01)Sort: annotated_data_finite2.a ASC NULLS LAST 02)--Projection: annotated_data_finite2.a, annotated_data_finite2.a + annotated_data_finite2.b -03)----TableScan: annotated_data_finite2 projection=[a, b] +03)----TableScan: annotated_data_finite2 projection=[a, b] preferred_ordering=[annotated_data_finite2.a ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@0 ASC NULLS LAST] 02)--ProjectionExec: expr=[a@0 as a, a@0 + b@1 as annotated_data_finite2.a + annotated_data_finite2.b] @@ -1419,7 +1419,7 @@ ORDER BY a ASC; logical_plan 01)Sort: annotated_data_finite2.a ASC NULLS LAST 02)--Projection: annotated_data_finite2.a, annotated_data_finite2.b, Int64(2) -03)----TableScan: annotated_data_finite2 projection=[a, b] +03)----TableScan: annotated_data_finite2 projection=[a, b] preferred_ordering=[annotated_data_finite2.a ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[a@0 as a, b@1 as b, 2 as Int64(2)] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST], file_type=csv, has_header=true @@ -1437,7 +1437,7 @@ ORDER BY b, c; logical_plan 01)Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST 02)--Filter: annotated_data_finite2.a = Int32(0) -03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0)] +03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0)] preferred_ordering=[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -1458,7 +1458,7 @@ ORDER BY c; logical_plan 01)Sort: annotated_data_finite2.c ASC NULLS LAST 02)--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = Int32(0) -03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] +03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] preferred_ordering=[annotated_data_finite2.c ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [c@3 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -1479,7 +1479,7 @@ ORDER BY b, c; logical_plan 01)Sort: annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST 02)--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = Int32(0) -03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] +03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] preferred_ordering=[annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -1500,7 +1500,7 @@ ORDER BY a, b, c; logical_plan 01)Sort: annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST 02)--Filter: annotated_data_finite2.a = Int32(0) AND annotated_data_finite2.b = Int32(0) -03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] +03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0), annotated_data_finite2.b = Int32(0)] preferred_ordering=[annotated_data_finite2.a ASC NULLS LAST, annotated_data_finite2.b ASC NULLS LAST, annotated_data_finite2.c ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 @@ -1521,7 +1521,7 @@ ORDER BY c; logical_plan 01)Sort: annotated_data_finite2.c ASC NULLS LAST 02)--Filter: annotated_data_finite2.a = Int32(0) OR annotated_data_finite2.b = Int32(0) -03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0) OR annotated_data_finite2.b = Int32(0)] +03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[annotated_data_finite2.a = Int32(0) OR annotated_data_finite2.b = Int32(0)] preferred_ordering=[annotated_data_finite2.c ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [c@3 ASC NULLS LAST] 02)--SortExec: expr=[c@3 ASC NULLS LAST], preserve_partitioning=[true] @@ -1546,7 +1546,7 @@ logical_plan 03)----Projection: aggregate_test_100.c2 04)------Sort: aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST, fetch=4 05)--------Projection: aggregate_test_100.c2, aggregate_test_100.c1 -06)----------TableScan: aggregate_test_100 projection=[c1, c2] +06)----------TableScan: aggregate_test_100 projection=[c1, c2] preferred_ordering=[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[c2@0 as c2, count(Int64(1))@1 as count(*)] 02)--AggregateExec: mode=FinalPartitioned, gby=[c2@0 as c2], aggr=[count(Int64(1))] @@ -1570,7 +1570,7 @@ ORDER BY CAST(ROUND(b) as INT); logical_plan 01)Sort: CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) ASC NULLS LAST 02)--Filter: CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) = annotated_data_finite2.a -03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) = annotated_data_finite2.a] +03)----TableScan: annotated_data_finite2 projection=[a0, a, b, c, d], partial_filters=[CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) = annotated_data_finite2.a] preferred_ordering=[CAST(round(CAST(annotated_data_finite2.b AS Float64)) AS Int32) ASC NULLS LAST] physical_plan 01)SortPreservingMergeExec: [CAST(round(CAST(b@2 AS Float64)) AS Int32) ASC NULLS LAST] 02)--CoalesceBatchesExec: target_batch_size=8192 diff --git a/datafusion/sqllogictest/test_files/subquery.slt b/datafusion/sqllogictest/test_files/subquery.slt index dec935749535..47bb020812d3 100644 --- a/datafusion/sqllogictest/test_files/subquery.slt +++ b/datafusion/sqllogictest/test_files/subquery.slt @@ -608,7 +608,7 @@ logical_plan 03)----Sort: outer_ref(t1.t1_id) ASC NULLS LAST 04)------Projection: t2.t2_id, t2.t2_name, t2.t2_int 05)--------Filter: t2.t2_id >= outer_ref(t1.t1_id) -06)----------TableScan: t2 +06)----------TableScan: t2 preferred_ordering=[outer_ref(t1.t1_id) ASC NULLS LAST] 07)--TableScan: t1 projection=[t1_id, t1_name] #exists_subquery_with_select_null diff --git a/datafusion/sqllogictest/test_files/subquery_sort.slt b/datafusion/sqllogictest/test_files/subquery_sort.slt index 1e5a3c8f526a..ce64cc17beb8 100644 --- a/datafusion/sqllogictest/test_files/subquery_sort.slt +++ b/datafusion/sqllogictest/test_files/subquery_sort.slt @@ -62,7 +62,7 @@ logical_plan 02)--Sort: t2.c1 ASC NULLS LAST, t2.c3 ASC NULLS LAST, t2.c9 ASC NULLS LAST 03)----SubqueryAlias: t2 04)------Sort: sink_table.c1 DESC NULLS FIRST, sink_table.c3 ASC NULLS LAST, fetch=2 -05)--------TableScan: sink_table projection=[c1, c2, c3, c9] +05)--------TableScan: sink_table projection=[c1, c2, c3, c9] preferred_ordering=[sink_table.c1 DESC NULLS FIRST, sink_table.c3 ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[c1@0 as c1, c2@1 as c2] 02)--SortExec: expr=[c1@0 ASC NULLS LAST, c3@2 ASC NULLS LAST, c9@3 ASC NULLS LAST], preserve_partitioning=[false] diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 8a08cc17d417..c07fbe8d8c32 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -82,7 +82,7 @@ explain select * from aggregate_test_100 ORDER BY c13 desc limit 5; ---- logical_plan 01)Sort: aggregate_test_100.c13 DESC NULLS FIRST, fetch=5 -02)--TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] +02)--TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] preferred_ordering=[aggregate_test_100.c13 DESC NULLS FIRST] physical_plan 01)SortExec: TopK(fetch=5), expr=[c13@12 DESC], preserve_partitioning=[false] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], file_type=csv, has_header=true diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index f1a708d84dd3..3bc1dcf743c1 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1382,7 +1382,7 @@ logical_plan 03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]] 04)------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]] 05)--------Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=100 -06)----------TableScan: aggregate_test_100 projection=[c9] +06)----------TableScan: aggregate_test_100 projection=[c9] preferred_ordering=[aggregate_test_100.c9 DESC NULLS FIRST] physical_plan 01)ProjectionExec: expr=[c9@0 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2] 02)--GlobalLimitExec: skip=0, fetch=5 @@ -2144,7 +2144,7 @@ logical_plan 01)Projection: array_agg(aggregate_test_100.c13) AS array_agg1 02)--Aggregate: groupBy=[[]], aggr=[[array_agg(aggregate_test_100.c13)]] 03)----Sort: aggregate_test_100.c13 ASC NULLS LAST, fetch=1 -04)------TableScan: aggregate_test_100 projection=[c13] +04)------TableScan: aggregate_test_100 projection=[c13] preferred_ordering=[aggregate_test_100.c13 ASC NULLS LAST] physical_plan 01)ProjectionExec: expr=[array_agg(aggregate_test_100.c13)@0 as array_agg1] 02)--AggregateExec: mode=Final, gby=[], aggr=[array_agg(aggregate_test_100.c13)]