diff --git a/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/explain.txt b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/explain.txt index 8b11a662f..7899a44d0 100644 --- a/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/explain.txt +++ b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/explain.txt @@ -1,255 +1,43 @@ == Physical Plan == -TakeOrderedAndProject (44) -+- * Project (43) - +- * BroadcastHashJoin Inner BuildRight (42) - :- * Project (37) - : +- * BroadcastHashJoin Inner BuildRight (36) - : :- * Project (30) - : : +- * BroadcastHashJoin Inner BuildRight (29) - : : :- * Filter (14) - : : : +- * HashAggregate (13) - : : : +- Exchange (12) - : : : +- * HashAggregate (11) - : : : +- * Project (10) - : : : +- * BroadcastHashJoin Inner BuildRight (9) - : : : :- * Filter (3) - : : : : +- * ColumnarToRow (2) - : : : : +- Scan parquet default.store_returns (1) - : : : +- BroadcastExchange (8) - : : : +- * Project (7) - : : : +- * Filter (6) - : : : +- * ColumnarToRow (5) - : : : +- Scan parquet default.date_dim (4) - : : +- BroadcastExchange (28) - : : +- * Filter (27) - : : +- * HashAggregate (26) - : : +- Exchange (25) - : : +- * HashAggregate (24) - : : +- * HashAggregate (23) - : : +- Exchange (22) - : : +- * HashAggregate (21) - : : +- * Project (20) - : : +- * BroadcastHashJoin Inner BuildRight (19) - : : :- * Filter (17) - : : : +- * ColumnarToRow (16) - : : : +- Scan parquet default.store_returns (15) - : : +- ReusedExchange (18) - : +- BroadcastExchange (35) - : +- * Project (34) - : +- * Filter (33) - : +- * ColumnarToRow (32) - : +- Scan parquet default.store (31) - +- BroadcastExchange (41) - +- * Filter (40) - +- * ColumnarToRow (39) - +- Scan parquet default.customer (38) - - -(1) Scan parquet default.store_returns -Output [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Batched: true -Location [not included in comparison]/{warehouse_dir}/store_returns] -PushedFilters: [IsNotNull(sr_returned_date_sk), IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)] -ReadSchema: struct - -(2) ColumnarToRow [codegen id : 2] -Input [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] - -(3) Filter [codegen id : 2] -Input [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Condition : ((isnotnull(sr_returned_date_sk#1) AND isnotnull(sr_store_sk#3)) AND isnotnull(sr_customer_sk#2)) - -(4) Scan parquet default.date_dim -Output [2]: [d_date_sk#5, d_year#6] -Batched: true -Location [not included in comparison]/{warehouse_dir}/date_dim] -PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2000), IsNotNull(d_date_sk)] -ReadSchema: struct - -(5) ColumnarToRow [codegen id : 1] -Input [2]: [d_date_sk#5, d_year#6] - -(6) Filter [codegen id : 1] -Input [2]: [d_date_sk#5, d_year#6] -Condition : ((isnotnull(d_year#6) AND (d_year#6 = 2000)) AND isnotnull(d_date_sk#5)) - -(7) Project [codegen id : 1] -Output [1]: [d_date_sk#5] -Input [2]: [d_date_sk#5, d_year#6] - -(8) BroadcastExchange -Input [1]: [d_date_sk#5] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#7] - -(9) BroadcastHashJoin [codegen id : 2] -Left keys [1]: [sr_returned_date_sk#1] -Right keys [1]: [cast(d_date_sk#5 as bigint)] -Join condition: None - -(10) Project [codegen id : 2] -Output [3]: [sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Input [5]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4, d_date_sk#5] - -(11) HashAggregate [codegen id : 2] -Input [3]: [sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Keys [2]: [sr_customer_sk#2, sr_store_sk#3] -Functions [1]: [partial_sum(UnscaledValue(sr_return_amt#4))] -Aggregate Attributes [1]: [sum#8] -Results [3]: [sr_customer_sk#2, sr_store_sk#3, sum#9] - -(12) Exchange -Input [3]: [sr_customer_sk#2, sr_store_sk#3, sum#9] -Arguments: hashpartitioning(sr_customer_sk#2, sr_store_sk#3, 5), true, [id=#10] - -(13) HashAggregate [codegen id : 9] -Input [3]: [sr_customer_sk#2, sr_store_sk#3, sum#9] -Keys [2]: [sr_customer_sk#2, sr_store_sk#3] -Functions [1]: [sum(UnscaledValue(sr_return_amt#4))] -Aggregate Attributes [1]: [sum(UnscaledValue(sr_return_amt#4))#11] -Results [3]: [sr_customer_sk#2 AS ctr_customer_sk#12, sr_store_sk#3 AS ctr_store_sk#13, MakeDecimal(sum(UnscaledValue(sr_return_amt#4))#11,17,2) AS ctr_total_return#14] - -(14) Filter [codegen id : 9] -Input [3]: [ctr_customer_sk#12, ctr_store_sk#13, ctr_total_return#14] -Condition : isnotnull(ctr_total_return#14) - -(15) Scan parquet default.store_returns -Output [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Batched: true -Location [not included in comparison]/{warehouse_dir}/store_returns] -PushedFilters: [IsNotNull(sr_returned_date_sk), IsNotNull(sr_store_sk)] -ReadSchema: struct - -(16) ColumnarToRow [codegen id : 4] -Input [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] - -(17) Filter [codegen id : 4] -Input [4]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Condition : (isnotnull(sr_returned_date_sk#1) AND isnotnull(sr_store_sk#3)) - -(18) ReusedExchange [Reuses operator id: 8] -Output [1]: [d_date_sk#5] - -(19) BroadcastHashJoin [codegen id : 4] -Left keys [1]: [sr_returned_date_sk#1] -Right keys [1]: [cast(d_date_sk#5 as bigint)] -Join condition: None - -(20) Project [codegen id : 4] -Output [3]: [sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Input [5]: [sr_returned_date_sk#1, sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4, d_date_sk#5] - -(21) HashAggregate [codegen id : 4] -Input [3]: [sr_customer_sk#2, sr_store_sk#3, sr_return_amt#4] -Keys [2]: [sr_customer_sk#2, sr_store_sk#3] -Functions [1]: [partial_sum(UnscaledValue(sr_return_amt#4))] -Aggregate Attributes [1]: [sum#15] -Results [3]: [sr_customer_sk#2, sr_store_sk#3, sum#16] - -(22) Exchange -Input [3]: [sr_customer_sk#2, sr_store_sk#3, sum#16] -Arguments: hashpartitioning(sr_customer_sk#2, sr_store_sk#3, 5), true, [id=#17] - -(23) HashAggregate [codegen id : 5] -Input [3]: [sr_customer_sk#2, sr_store_sk#3, sum#16] -Keys [2]: [sr_customer_sk#2, sr_store_sk#3] -Functions [1]: [sum(UnscaledValue(sr_return_amt#4))] -Aggregate Attributes [1]: [sum(UnscaledValue(sr_return_amt#4))#18] -Results [2]: [sr_store_sk#3 AS ctr_store_sk#13, MakeDecimal(sum(UnscaledValue(sr_return_amt#4))#18,17,2) AS ctr_total_return#14] - -(24) HashAggregate [codegen id : 5] -Input [2]: [ctr_store_sk#13, ctr_total_return#14] -Keys [1]: [ctr_store_sk#13] -Functions [1]: [partial_avg(ctr_total_return#14)] -Aggregate Attributes [2]: [sum#19, count#20] -Results [3]: [ctr_store_sk#13, sum#21, count#22] - -(25) Exchange -Input [3]: [ctr_store_sk#13, sum#21, count#22] -Arguments: hashpartitioning(ctr_store_sk#13, 5), true, [id=#23] - -(26) HashAggregate [codegen id : 6] -Input [3]: [ctr_store_sk#13, sum#21, count#22] -Keys [1]: [ctr_store_sk#13] -Functions [1]: [avg(ctr_total_return#14)] -Aggregate Attributes [1]: [avg(ctr_total_return#14)#24] -Results [2]: [CheckOverflow((promote_precision(avg(ctr_total_return#14)#24) * 1.200000), DecimalType(24,7), true) AS (CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#25, ctr_store_sk#13 AS ctr_store_sk#13#26] - -(27) Filter [codegen id : 6] -Input [2]: [(CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#25, ctr_store_sk#13#26] -Condition : isnotnull((CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#25) - -(28) BroadcastExchange -Input [2]: [(CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#25, ctr_store_sk#13#26] -Arguments: HashedRelationBroadcastMode(List(input[1, bigint, true]),false), [id=#27] - -(29) BroadcastHashJoin [codegen id : 9] -Left keys [1]: [ctr_store_sk#13] -Right keys [1]: [ctr_store_sk#13#26] -Join condition: (cast(ctr_total_return#14 as decimal(24,7)) > (CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#25) - -(30) Project [codegen id : 9] -Output [2]: [ctr_customer_sk#12, ctr_store_sk#13] -Input [5]: [ctr_customer_sk#12, ctr_store_sk#13, ctr_total_return#14, (CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#25, ctr_store_sk#13#26] - -(31) Scan parquet default.store -Output [2]: [s_store_sk#28, s_state#29] -Batched: true -Location [not included in comparison]/{warehouse_dir}/store] -PushedFilters: [IsNotNull(s_state), EqualTo(s_state,TN), IsNotNull(s_store_sk)] -ReadSchema: struct - -(32) ColumnarToRow [codegen id : 7] -Input [2]: [s_store_sk#28, s_state#29] - -(33) Filter [codegen id : 7] -Input [2]: [s_store_sk#28, s_state#29] -Condition : ((isnotnull(s_state#29) AND (s_state#29 = TN)) AND isnotnull(s_store_sk#28)) - -(34) Project [codegen id : 7] -Output [1]: [s_store_sk#28] -Input [2]: [s_store_sk#28, s_state#29] - -(35) BroadcastExchange -Input [1]: [s_store_sk#28] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#30] - -(36) BroadcastHashJoin [codegen id : 9] -Left keys [1]: [ctr_store_sk#13] -Right keys [1]: [cast(s_store_sk#28 as bigint)] -Join condition: None - -(37) Project [codegen id : 9] -Output [1]: [ctr_customer_sk#12] -Input [3]: [ctr_customer_sk#12, ctr_store_sk#13, s_store_sk#28] - -(38) Scan parquet default.customer -Output [2]: [c_customer_sk#31, c_customer_id#32] -Batched: true -Location [not included in comparison]/{warehouse_dir}/customer] -PushedFilters: [IsNotNull(c_customer_sk)] -ReadSchema: struct - -(39) ColumnarToRow [codegen id : 8] -Input [2]: [c_customer_sk#31, c_customer_id#32] - -(40) Filter [codegen id : 8] -Input [2]: [c_customer_sk#31, c_customer_id#32] -Condition : isnotnull(c_customer_sk#31) - -(41) BroadcastExchange -Input [2]: [c_customer_sk#31, c_customer_id#32] -Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#33] - -(42) BroadcastHashJoin [codegen id : 9] -Left keys [1]: [ctr_customer_sk#12] -Right keys [1]: [cast(c_customer_sk#31 as bigint)] -Join condition: None - -(43) Project [codegen id : 9] -Output [1]: [c_customer_id#32] -Input [3]: [ctr_customer_sk#12, c_customer_sk#31, c_customer_id#32] - -(44) TakeOrderedAndProject -Input [1]: [c_customer_id#32] -Arguments: 100, [c_customer_id#32 ASC NULLS FIRST], [c_customer_id#32] - +TakeOrderedAndProject(limit=100, orderBy=[c_customer_id#1 ASC NULLS FIRST], output=[c_customer_id#1]) ++- *(9) Project [c_customer_id#1] + +- *(9) BroadcastHashJoin [ctr_customer_sk#2], [cast(c_customer_sk#3 as bigint)], Inner, BuildRight + :- *(9) Project [ctr_customer_sk#2] + : +- *(9) BroadcastHashJoin [ctr_store_sk#4], [cast(s_store_sk#5 as bigint)], Inner, BuildRight + : :- *(9) Project [ctr_customer_sk#2, ctr_store_sk#4] + : : +- *(9) BroadcastHashJoin [ctr_store_sk#4], [ctr_store_sk#4#6], Inner, BuildRight, (cast(ctr_total_return#7 as decimal(24,7)) > (CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#8) + : : :- *(9) Filter isnotnull(ctr_total_return#7) + : : : +- *(9) HashAggregate(keys=[sr_customer_sk#9, sr_store_sk#10], functions=[sum(UnscaledValue(sr_return_amt#11))]) + : : : +- Exchange hashpartitioning(sr_customer_sk#9, sr_store_sk#10, 5) + : : : +- *(2) HashAggregate(keys=[sr_customer_sk#9, sr_store_sk#10], functions=[partial_sum(UnscaledValue(sr_return_amt#11))]) + : : : +- *(2) Project [sr_customer_sk#9, sr_store_sk#10, sr_return_amt#11] + : : : +- *(2) BroadcastHashJoin [sr_returned_date_sk#12], [cast(d_date_sk#13 as bigint)], Inner, BuildRight + : : : :- *(2) Project [sr_returned_date_sk#12, sr_customer_sk#9, sr_store_sk#10, sr_return_amt#11] + : : : : +- *(2) Filter ((isnotnull(sr_returned_date_sk#12) && isnotnull(sr_store_sk#10)) && isnotnull(sr_customer_sk#9)) + : : : : +- *(2) FileScan parquet default.store_returns[sr_returned_date_sk#12,sr_customer_sk#9,sr_store_sk#10,sr_return_amt#11] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_returned_date_sk), IsNotNull(sr_store_sk), IsNotNull(sr_customer_sk)], ReadSchema: struct + : : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true])) + : : +- *(6) Filter isnotnull((CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))#8) + : : +- *(6) HashAggregate(keys=[ctr_store_sk#4], functions=[avg(ctr_total_return#7)]) + : : +- Exchange hashpartitioning(ctr_store_sk#4, 5) + : : +- *(5) HashAggregate(keys=[ctr_store_sk#4], functions=[partial_avg(ctr_total_return#7)]) + : : +- *(5) HashAggregate(keys=[sr_customer_sk#9, sr_store_sk#10], functions=[sum(UnscaledValue(sr_return_amt#11))]) + : : +- Exchange hashpartitioning(sr_customer_sk#9, sr_store_sk#10, 5) + : : +- *(4) HashAggregate(keys=[sr_customer_sk#9, sr_store_sk#10], functions=[partial_sum(UnscaledValue(sr_return_amt#11))]) + : : +- *(4) Project [sr_customer_sk#9, sr_store_sk#10, sr_return_amt#11] + : : +- *(4) BroadcastHashJoin [sr_returned_date_sk#12], [cast(d_date_sk#13 as bigint)], Inner, BuildRight + : : :- *(4) Project [sr_returned_date_sk#12, sr_customer_sk#9, sr_store_sk#10, sr_return_amt#11] + : : : +- *(4) Filter (isnotnull(sr_returned_date_sk#12) && isnotnull(sr_store_sk#10)) + : : : +- *(4) FileScan parquet default.store_returns[sr_returned_date_sk#12,sr_customer_sk#9,sr_store_sk#10,sr_return_amt#11] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/store_returns], PartitionFilters: [], PushedFilters: [IsNotNull(sr_returned_date_sk), IsNotNull(sr_store_sk)], ReadSchema: struct + +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) + +- *(8) Project [c_customer_sk#3, c_customer_id#1] + +- *(8) Filter isnotnull(c_customer_sk#3) + +- *(8) FileScan parquet default.customer[c_customer_sk#3,c_customer_id#1] Batched: true, Format: Parquet, Location [not included in comparison]/{warehouse_dir}/customer], PartitionFilters: [], PushedFilters: [IsNotNull(c_customer_sk)], ReadSchema: struct \ No newline at end of file diff --git a/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/simplified.txt b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/simplified.txt index 6868eb766..4f838850c 100644 --- a/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/simplified.txt +++ b/src/test/resources/tpcds/spark-2.4/approved-plans-v1_4/q1/simplified.txt @@ -1,65 +1,58 @@ TakeOrderedAndProject [c_customer_id] - WholeStageCodegen (9) + WholeStageCodegen Project [c_customer_id] - BroadcastHashJoin [ctr_customer_sk,c_customer_sk] + BroadcastHashJoin [c_customer_sk,ctr_customer_sk] Project [ctr_customer_sk] BroadcastHashJoin [ctr_store_sk,s_store_sk] Project [ctr_customer_sk,ctr_store_sk] - BroadcastHashJoin [ctr_store_sk,ctr_store_skL,ctr_total_return,(CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))] + BroadcastHashJoin [(CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6))),ctr_store_sk,ctr_store_skL,ctr_total_return] Filter [ctr_total_return] - HashAggregate [sr_customer_sk,sr_store_sk,sum] [sum(UnscaledValue(sr_return_amt)),ctr_customer_sk,ctr_store_sk,ctr_total_return,sum] + HashAggregate [sr_customer_sk,sr_store_sk,sum,sum(UnscaledValue(sr_return_amt))] [ctr_customer_sk,ctr_store_sk,ctr_total_return,sum,sum(UnscaledValue(sr_return_amt))] InputAdapter Exchange [sr_customer_sk,sr_store_sk] #1 - WholeStageCodegen (2) - HashAggregate [sr_customer_sk,sr_store_sk,sr_return_amt] [sum,sum] - Project [sr_customer_sk,sr_store_sk,sr_return_amt] - BroadcastHashJoin [sr_returned_date_sk,d_date_sk] - Filter [sr_returned_date_sk,sr_store_sk,sr_customer_sk] - ColumnarToRow - InputAdapter - Scan parquet default.store_returns [sr_returned_date_sk,sr_customer_sk,sr_store_sk,sr_return_amt] + WholeStageCodegen + HashAggregate [sr_customer_sk,sr_return_amt,sr_store_sk,sum,sum] [sum,sum] + Project [sr_customer_sk,sr_return_amt,sr_store_sk] + BroadcastHashJoin [d_date_sk,sr_returned_date_sk] + Project [sr_customer_sk,sr_return_amt,sr_returned_date_sk,sr_store_sk] + Filter [sr_customer_sk,sr_returned_date_sk,sr_store_sk] + Scan parquet default.store_returns [sr_customer_sk,sr_return_amt,sr_returned_date_sk,sr_store_sk] [sr_customer_sk,sr_return_amt,sr_returned_date_sk,sr_store_sk] InputAdapter BroadcastExchange #2 - WholeStageCodegen (1) + WholeStageCodegen Project [d_date_sk] - Filter [d_year,d_date_sk] - ColumnarToRow - InputAdapter - Scan parquet default.date_dim [d_date_sk,d_year] + Filter [d_date_sk,d_year] + Scan parquet default.date_dim [d_date_sk,d_year] [d_date_sk,d_year] InputAdapter BroadcastExchange #3 - WholeStageCodegen (6) + WholeStageCodegen Filter [(CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6)))] - HashAggregate [ctr_store_sk,sum,count] [avg(ctr_total_return),(CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6))),ctr_store_skL,sum,count] + HashAggregate [avg(ctr_total_return),count,ctr_store_sk,sum] [(CAST(avg(ctr_total_return) AS DECIMAL(21,6)) * CAST(1.2 AS DECIMAL(21,6))),avg(ctr_total_return),count,ctr_store_skL,sum] InputAdapter Exchange [ctr_store_sk] #4 - WholeStageCodegen (5) - HashAggregate [ctr_store_sk,ctr_total_return] [sum,count,sum,count] - HashAggregate [sr_customer_sk,sr_store_sk,sum] [sum(UnscaledValue(sr_return_amt)),ctr_store_sk,ctr_total_return,sum] + WholeStageCodegen + HashAggregate [count,count,ctr_store_sk,ctr_total_return,sum,sum] [count,count,sum,sum] + HashAggregate [sr_customer_sk,sr_store_sk,sum,sum(UnscaledValue(sr_return_amt))] [ctr_store_sk,ctr_total_return,sum,sum(UnscaledValue(sr_return_amt))] InputAdapter Exchange [sr_customer_sk,sr_store_sk] #5 - WholeStageCodegen (4) - HashAggregate [sr_customer_sk,sr_store_sk,sr_return_amt] [sum,sum] - Project [sr_customer_sk,sr_store_sk,sr_return_amt] - BroadcastHashJoin [sr_returned_date_sk,d_date_sk] - Filter [sr_returned_date_sk,sr_store_sk] - ColumnarToRow - InputAdapter - Scan parquet default.store_returns [sr_returned_date_sk,sr_customer_sk,sr_store_sk,sr_return_amt] + WholeStageCodegen + HashAggregate [sr_customer_sk,sr_return_amt,sr_store_sk,sum,sum] [sum,sum] + Project [sr_customer_sk,sr_return_amt,sr_store_sk] + BroadcastHashJoin [d_date_sk,sr_returned_date_sk] + Project [sr_customer_sk,sr_return_amt,sr_returned_date_sk,sr_store_sk] + Filter [sr_returned_date_sk,sr_store_sk] + Scan parquet default.store_returns [sr_customer_sk,sr_return_amt,sr_returned_date_sk,sr_store_sk] [sr_customer_sk,sr_return_amt,sr_returned_date_sk,sr_store_sk] InputAdapter - ReusedExchange [d_date_sk] #2 + ReusedExchange [d_date_sk] [d_date_sk] #2 InputAdapter BroadcastExchange #6 - WholeStageCodegen (7) + WholeStageCodegen Project [s_store_sk] Filter [s_state,s_store_sk] - ColumnarToRow - InputAdapter - Scan parquet default.store [s_store_sk,s_state] + Scan parquet default.store [s_state,s_store_sk] [s_state,s_store_sk] InputAdapter BroadcastExchange #7 - WholeStageCodegen (8) - Filter [c_customer_sk] - ColumnarToRow - InputAdapter - Scan parquet default.customer [c_customer_sk,c_customer_id] + WholeStageCodegen + Project [c_customer_id,c_customer_sk] + Filter [c_customer_sk] + Scan parquet default.customer [c_customer_id,c_customer_sk] [c_customer_id,c_customer_sk] diff --git a/src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala b/src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala new file mode 100644 index 000000000..8136bf467 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala @@ -0,0 +1,283 @@ +// scalastyle:off + +/** + * This trait was built upon: https://github.com/apache/spark/blob/v3.1.1/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala. + * + * The below license was copied from: https://github.com/FasterXML/jackson-module-scala/blob/2.10/src/main/resources/META-INF/LICENSE + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.goldstandard + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.mutable + +import org.apache.commons.io.FileUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.plans.SQLHelper +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} + +// scalastyle:off filelinelengthchecker +/** + * Check that TPC-DS SparkPlans don't change. + * If there are plan differences, the error message looks like this: + * Plans did not match: + * last approved simplified plan: /path/to/tpcds-plan-stability/approved-plans-xxx/q1/simplified.txt + * last approved explain plan: /path/to/tpcds-plan-stability/approved-plans-xxx/q1/explain.txt + * [last approved simplified plan] + * + * actual simplified plan: /path/to/tmp/q1.actual.simplified.txt + * actual explain plan: /path/to/tmp/q1.actual.explain.txt + * [actual simplified plan] + * + * The explain files are saved to help debug later, they are not checked. Only the simplified + * plans are checked (by string comparison). + * + * + * To run the entire test suite: + * {{{ + * sbt "test:testOnly *PlanStabilitySuite" + * }}} + * + * To run a single test file upon change: + * {{{ + * sbt "test:testOnly *PlanStabilitySuite -- -z (tpcds-v1.4/q49)" + * }}} + * + * To re-generate golden files for entire suite, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 sbt "test:testOnly *PlanStabilitySuite" + * }}} + * + * To re-generate golden file for a single test, run: + * {{{ + * SPARK_GENERATE_GOLDEN_FILES=1 sbt "test:testOnly *PlanStabilitySuite -- -z (tpcds-v1.4/q49)" + * }}} + */ +// scalastyle:on filelinelengthchecker + +trait PlanStabilitySuite extends TPCDSBase with SQLHelper with Logging { + private val regenerateGoldenFiles: Boolean = System.getenv("SPARK_GENERATE_GOLDEN_FILES") == "1" + + protected val baseResourcePath = { + // use the same way as `SQLQueryTestSuite` to get the resource path + java.nio.file.Paths.get("src", "test", "resources", "tpcds").toFile + } + + private val referenceRegex = "#\\d+".r + private val normalizeRegex = "#\\d+L?".r + + private val clsName = this.getClass.getCanonicalName + + def goldenFilePath: String + + private def getDirForTest(name: String): File = { + new File(goldenFilePath, name) + } + + private def isApproved(dir: File, actualSimplifiedPlan: String): Boolean = { + val file = new File(dir, "simplified.txt") + val expected = FileUtils.readFileToString(file, StandardCharsets.UTF_8) + expected == actualSimplifiedPlan + } + + /** + * Serialize and save this SparkPlan. + * The resulting file is used by [[checkWithApproved]] to check stability. + * + * @param plan the SparkPlan + * @param name the name of the query + * @param explain the full explain output; this is saved to help debug later as the simplified + * plan is not too useful for debugging + */ + private def generateGoldenFile(plan: SparkPlan, name: String, explain: String): Unit = { + val dir = getDirForTest(name) + val simplified = getSimplifiedPlan(plan) + val foundMatch = dir.exists() && isApproved(dir, simplified) + + if (!foundMatch) { + FileUtils.deleteDirectory(dir) + assert(dir.mkdirs()) + + val file = new File(dir, "simplified.txt") + FileUtils.writeStringToFile(file, simplified, StandardCharsets.UTF_8) + val fileOriginalPlan = new File(dir, "explain.txt") + FileUtils.writeStringToFile(fileOriginalPlan, explain, StandardCharsets.UTF_8) + logDebug(s"APPROVED: $file $fileOriginalPlan") + } + } + + private def checkWithApproved(plan: SparkPlan, name: String, explain: String): Unit = { + val dir = getDirForTest(name) + val tempDir = FileUtils.getTempDirectory + val actualSimplified = getSimplifiedPlan(plan) + val foundMatch = isApproved(dir, actualSimplified) + + if (!foundMatch) { + // show diff with last approved + val approvedSimplifiedFile = new File(dir, "simplified.txt") + val approvedExplainFile = new File(dir, "explain.txt") + + val actualSimplifiedFile = new File(tempDir, s"$name.actual.simplified.txt") + val actualExplainFile = new File(tempDir, s"$name.actual.explain.txt") + + val approvedSimplified = + FileUtils.readFileToString(approvedSimplifiedFile, StandardCharsets.UTF_8) + // write out for debugging + FileUtils.writeStringToFile(actualSimplifiedFile, actualSimplified, StandardCharsets.UTF_8) + FileUtils.writeStringToFile(actualExplainFile, explain, StandardCharsets.UTF_8) + + fail(s""" + |Plans did not match: + |last approved simplified plan: ${approvedSimplifiedFile.getAbsolutePath} + |last approved explain plan: ${approvedExplainFile.getAbsolutePath} + | + |$approvedSimplified + | + |actual simplified plan: ${actualSimplifiedFile.getAbsolutePath} + |actual explain plan: ${actualExplainFile.getAbsolutePath} + | + |$actualSimplified + """.stripMargin) + } + } + + /** + * Get the simplified plan for a specific SparkPlan. In the simplified plan, the node only has + * its name and all the sorted reference and produced attributes names(without ExprId) and its + * simplified children as well. And we'll only identify the performance sensitive nodes, e.g., + * Exchange, Subquery, in the simplified plan. Given such a identical but simplified plan, we'd + * expect to avoid frequent plan changing and catch the possible meaningful regression. + */ + private def getSimplifiedPlan(plan: SparkPlan): String = { + val exchangeIdMap = new mutable.HashMap[SparkPlan, Int]() + val subqueriesMap = new mutable.HashMap[SparkPlan, Int]() + + def getId(plan: SparkPlan): Int = plan match { + case exchange: Exchange => exchangeIdMap.getOrElseUpdate(exchange, exchangeIdMap.size + 1) + case ReusedExchangeExec(_, exchange) => + exchangeIdMap.getOrElseUpdate(exchange, exchangeIdMap.size + 1) + case subquery: SubqueryExec => + subqueriesMap.getOrElseUpdate(subquery, subqueriesMap.size + 1) + case _ => -1 + } + + /** + * Some expression names have ExprId in them due to using things such as + * "sum(sr_return_amt#14)", so we remove all of these using regex + */ + def cleanUpReferences(references: AttributeSet): String = { + referenceRegex.replaceAllIn(references.toSeq.map(_.name).sorted.mkString(","), "") + } + + /** + * Generate a simplified plan as a string + * Example output: + * TakeOrderedAndProject [c_customer_id] + * WholeStageCodegen + * Project [c_customer_id] + */ + def simplifyNode(node: SparkPlan, depth: Int): String = { + val padding = " " * depth + var thisNode = node.nodeName + if (node.references.nonEmpty) { + thisNode += s" [${cleanUpReferences(node.references)}]" + } + if (node.producedAttributes.nonEmpty) { + thisNode += s" [${cleanUpReferences(node.producedAttributes)}]" + } + val id = getId(node) + if (id > 0) { + thisNode += s" #$id" + } + val childrenSimplified = node.children.map(simplifyNode(_, depth + 1)) + val subqueriesSimplified = node.subqueries.map(simplifyNode(_, depth + 1)) + s"$padding$thisNode\n${subqueriesSimplified.mkString("")}${childrenSimplified.mkString("")}" + } + + simplifyNode(plan, 0) + } + + private def normalizeIds(plan: String): String = { + val map = new mutable.HashMap[String, String]() + normalizeRegex + .findAllMatchIn(plan) + .map(_.toString) + .foreach(map.getOrElseUpdate(_, (map.size + 1).toString)) + normalizeRegex.replaceAllIn(plan, regexMatch => s"#${map(regexMatch.toString)}") + } + + private def normalizeLocation(plan: String): String = { + plan.replaceAll( + s"Location.*spark-warehouse/", + "Location [not included in comparison]/{warehouse_dir}/") + } + + /** + * Test a TPC-DS query. Depending on the settings this test will either check if the plan matches + * a golden file or it will create a new golden file. + */ + protected def testQuery(tpcdsGroup: String, query: String, suffix: String = ""): Unit = { + val queryString = resourceToString( + s"$tpcdsGroup/$query.sql", + classLoader = Thread.currentThread().getContextClassLoader) + val qe = spark.sql(queryString).queryExecution + val plan = qe.executedPlan + val explain = normalizeLocation(normalizeIds(explainString(qe))) + + if (regenerateGoldenFiles) { + generateGoldenFile(plan, query + suffix, explain) + } else { + checkWithApproved(plan, query + suffix, explain) + } + } + + def explainString(queryExecution: QueryExecution): String = { + val explain = ExplainCommand(queryExecution.logical, extended = false) + spark.sessionState + .executePlan(explain) + .executedPlan + .executeCollect() + .map(_.getString(0)) + .mkString("\n") + } +} + +/** + * Spark Only Suite. + */ +class TPCDSV1_4_SparkPlanStabilitySuite extends PlanStabilitySuite { + override val goldenFilePath: String = + new File(baseResourcePath, "spark-2.4/approved-plans-v1_4").getAbsolutePath + + // Enable cross join because some queries fail during query optimization phase. + withSQLConf("spark.sql.crossJoin.enabled" -> "true") { + tpcdsQueries.foreach { q => + test(s"check simplified (tpcds-v1.4/$q)") { + testQuery("tpcds/queries", q) + } + } + } +} diff --git a/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDSBase.scala b/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDSBase.scala new file mode 100644 index 000000000..8b5139191 --- /dev/null +++ b/src/test/scala/com/microsoft/hyperspace/goldstandard/TPCDSBase.scala @@ -0,0 +1,570 @@ +// scalastyle:off + +/** + * This trait was built upon: https://github.com/apache/spark/blob/v3.1.1/sql/core/src/test/scala/org/apache/spark/sql/TPCDSBase.scala. + * + * The below license was copied from: https://github.com/FasterXML/jackson-module-scala/blob/2.10/src/main/resources/META-INF/LICENSE + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.microsoft.hyperspace.goldstandard + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.internal.SQLConf + +import com.microsoft.hyperspace.SparkInvolvedSuite + +trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite { + + val conf = SQLConf.get + + // The TPCDS queries below are based on v1.4. + // TODO: Fix bulid pipeline for q49 and reenable q49. + val tpcdsQueries = Seq("q1") + + private val tableColumns = Map( + "store_sales" -> + """ + |`ss_sold_date_sk` INT, + |`ss_sold_time_sk` INT, + |`ss_item_sk` INT, + |`ss_customer_sk` INT, + |`ss_cdemo_sk` INT, + |`ss_hdemo_sk` INT, + |`ss_addr_sk` INT, + |`ss_store_sk` INT, + |`ss_promo_sk` INT, + |`ss_ticket_number` INT, + |`ss_quantity` INT, + |`ss_wholesale_cost` DECIMAL(7,2), + |`ss_list_price` DECIMAL(7,2), + |`ss_sales_price` DECIMAL(7,2), + |`ss_ext_discount_amt` DECIMAL(7,2), + |`ss_ext_sales_price` DECIMAL(7,2), + |`ss_ext_wholesale_cost` DECIMAL(7,2), + |`ss_ext_list_price` DECIMAL(7,2), + |`ss_ext_tax` DECIMAL(7,2), + |`ss_coupon_amt` DECIMAL(7,2), + |`ss_net_paid` DECIMAL(7,2), + |`ss_net_paid_inc_tax` DECIMAL(7,2), + |`ss_net_profit` DECIMAL(7,2) + """.stripMargin, + "store_returns" -> + """ + |`sr_returned_date_sk` BIGINT, + |`sr_return_time_sk` BIGINT, + |`sr_item_sk` BIGINT, + |`sr_customer_sk` BIGINT, + |`sr_cdemo_sk` BIGINT, + |`sr_hdemo_sk` BIGINT, + |`sr_addr_sk` BIGINT, + |`sr_store_sk` BIGINT, + |`sr_reason_sk` BIGINT, + |`sr_ticket_number` BIGINT, + |`sr_return_quantity` INT, + |`sr_return_amt` DECIMAL(7,2), + |`sr_return_tax` DECIMAL(7,2), + |`sr_return_amt_inc_tax` DECIMAL(7,2), + |`sr_fee` DECIMAL(7,2), + |`sr_return_ship_cost` DECIMAL(7,2), + |`sr_refunded_cash` DECIMAL(7,2), + |`sr_reversed_charge` DECIMAL(7,2), + |`sr_store_credit` DECIMAL(7,2), + |`sr_net_loss` DECIMAL(7,2) + """.stripMargin, + "catalog_sales" -> + """ + |`cs_sold_date_sk` INT, + |`cs_sold_time_sk` INT, + |`cs_ship_date_sk` INT, + |`cs_bill_customer_sk` INT, + |`cs_bill_cdemo_sk` INT, + |`cs_bill_hdemo_sk` INT, + |`cs_bill_addr_sk` INT, + |`cs_ship_customer_sk` INT, + |`cs_ship_cdemo_sk` INT, + |`cs_ship_hdemo_sk` INT, + |`cs_ship_addr_sk` INT, + |`cs_call_center_sk` INT, + |`cs_catalog_page_sk` INT, + |`cs_ship_mode_sk` INT, + |`cs_warehouse_sk` INT, + |`cs_item_sk` INT, + |`cs_promo_sk` INT, + |`cs_order_number` INT, + |`cs_quantity` INT, + |`cs_wholesale_cost` DECIMAL(7,2), + |`cs_list_price` DECIMAL(7,2), + |`cs_sales_price` DECIMAL(7,2), + |`cs_ext_discount_amt` DECIMAL(7,2), + |`cs_ext_sales_price` DECIMAL(7,2), + |`cs_ext_wholesale_cost` DECIMAL(7,2), + |`cs_ext_list_price` DECIMAL(7,2), + |`cs_ext_tax` DECIMAL(7,2), + |`cs_coupon_amt` DECIMAL(7,2), + |`cs_ext_ship_cost` DECIMAL(7,2), + |`cs_net_paid` DECIMAL(7,2), + |`cs_net_paid_inc_tax` DECIMAL(7,2), + |`cs_net_paid_inc_ship` DECIMAL(7,2), + |`cs_net_paid_inc_ship_tax` DECIMAL(7,2), + |`cs_net_profit` DECIMAL(7,2) + """.stripMargin, + "catalog_returns" -> + """ + |`cr_returned_date_sk` INT, + |`cr_returned_time_sk` INT, + |`cr_item_sk` INT, + |`cr_refunded_customer_sk` INT, + |`cr_refunded_cdemo_sk` INT, + |`cr_refunded_hdemo_sk` INT, + |`cr_refunded_addr_sk` INT, + |`cr_returning_customer_sk` INT, + |`cr_returning_cdemo_sk` INT, + |`cr_returning_hdemo_sk` INT, + |`cr_returning_addr_sk` INT, + |`cr_call_center_sk` INT, + |`cr_catalog_page_sk` INT, + |`cr_ship_mode_sk` INT, + |`cr_warehouse_sk` INT, + |`cr_reason_sk` INT,`cr_order_number` INT, + |`cr_return_quantity` INT, + |`cr_return_amount` DECIMAL(7,2), + |`cr_return_tax` DECIMAL(7,2), + |`cr_return_amt_inc_tax` DECIMAL(7,2), + |`cr_fee` DECIMAL(7,2), + |`cr_return_ship_cost` DECIMAL(7,2), + |`cr_refunded_cash` DECIMAL(7,2), + |`cr_reversed_charge` DECIMAL(7,2), + |`cr_store_credit` DECIMAL(7,2), + |`cr_net_loss` DECIMAL(7,2) + """.stripMargin, + "web_sales" -> + """ + |`ws_sold_date_sk` INT, + |`ws_sold_time_sk` INT, + |`ws_ship_date_sk` INT, + |`ws_item_sk` INT, + |`ws_bill_customer_sk` INT, + |`ws_bill_cdemo_sk` INT, + |`ws_bill_hdemo_sk` INT, + |`ws_bill_addr_sk` INT, + |`ws_ship_customer_sk` INT, + |`ws_ship_cdemo_sk` INT, + |`ws_ship_hdemo_sk` INT, + |`ws_ship_addr_sk` INT, + |`ws_web_page_sk` INT, + |`ws_web_site_sk` INT, + |`ws_ship_mode_sk` INT, + |`ws_warehouse_sk` INT, + |`ws_promo_sk` INT, + |`ws_order_number` INT, + |`ws_quantity` INT, + |`ws_wholesale_cost` DECIMAL(7,2), + |`ws_list_price` DECIMAL(7,2), + |`ws_sales_price` DECIMAL(7,2), + |`ws_ext_discount_amt` DECIMAL(7,2), + |`ws_ext_sales_price` DECIMAL(7,2), + |`ws_ext_wholesale_cost` DECIMAL(7,2), + |`ws_ext_list_price` DECIMAL(7,2), + |`ws_ext_tax` DECIMAL(7,2), + |`ws_coupon_amt` DECIMAL(7,2), + |`ws_ext_ship_cost` DECIMAL(7,2), + |`ws_net_paid` DECIMAL(7,2), + |`ws_net_paid_inc_tax` DECIMAL(7,2), + |`ws_net_paid_inc_ship` DECIMAL(7,2), + |`ws_net_paid_inc_ship_tax` DECIMAL(7,2), + |`ws_net_profit` DECIMAL(7,2) + """.stripMargin, + "web_returns" -> + """ + |`wr_returned_date_sk` BIGINT, + |`wr_returned_time_sk` BIGINT, + |`wr_item_sk` BIGINT, + |`wr_refunded_customer_sk` BIGINT, + |`wr_refunded_cdemo_sk` BIGINT, + |`wr_refunded_hdemo_sk` BIGINT, + |`wr_refunded_addr_sk` BIGINT, + |`wr_returning_customer_sk` BIGINT, + |`wr_returning_cdemo_sk` BIGINT, + |`wr_returning_hdemo_sk` BIGINT, + |`wr_returning_addr_sk` BIGINT, + |`wr_web_page_sk` BIGINT, + |`wr_reason_sk` BIGINT, + |`wr_order_number` BIGINT, + |`wr_return_quantity` INT, + |`wr_return_amt` DECIMAL(7,2), + |`wr_return_tax` DECIMAL(7,2), + |`wr_return_amt_inc_tax` DECIMAL(7,2), + |`wr_fee` DECIMAL(7,2), + |`wr_return_ship_cost` DECIMAL(7,2), + |`wr_refunded_cash` DECIMAL(7,2), + |`wr_reversed_charge` DECIMAL(7,2), + |`wr_account_credit` DECIMAL(7,2), + |`wr_net_loss` DECIMAL(7,2) + """.stripMargin, + "inventory" -> + """ + |`inv_date_sk` INT, + |`inv_item_sk` INT, + |`inv_warehouse_sk` INT, + |`inv_quantity_on_hand` INT + """.stripMargin, + "store" -> + """ + |`s_store_sk` INT, + |`s_store_id` CHAR(16), + |`s_rec_start_date` DATE, + |`s_rec_end_date` DATE, + |`s_closed_date_sk` INT, + |`s_store_name` VARCHAR(50), + |`s_number_employees` INT, + |`s_floor_space` INT, + |`s_hours` CHAR(20), + |`s_manager` VARCHAR(40), + |`s_market_id` INT, + |`s_geography_class` VARCHAR(100), + |`s_market_desc` VARCHAR(100), + |`s_market_manager` VARCHAR(40), + |`s_division_id` INT, + |`s_division_name` VARCHAR(50), + |`s_company_id` INT, + |`s_company_name` VARCHAR(50), + |`s_street_number` VARCHAR(10), + |`s_street_name` VARCHAR(60), + |`s_street_type` CHAR(15), + |`s_suite_number` CHAR(10), + |`s_city` VARCHAR(60), + |`s_county` VARCHAR(30), + |`s_state` CHAR(2), + |`s_zip` CHAR(10), + |`s_country` VARCHAR(20), + |`s_gmt_offset` DECIMAL(5,2), + |`s_tax_percentage` DECIMAL(5,2) + """.stripMargin, + "call_center" -> + """ + |`cc_call_center_sk` INT, + |`cc_call_center_id` CHAR(16), + |`cc_rec_start_date` DATE, + |`cc_rec_end_date` DATE, + |`cc_closed_date_sk` INT, + |`cc_open_date_sk` INT, + |`cc_name` VARCHAR(50), + |`cc_class` VARCHAR(50), + |`cc_employees` INT, + |`cc_sq_ft` INT, + |`cc_hours` CHAR(20), + |`cc_manager` VARCHAR(40), + |`cc_mkt_id` INT, + |`cc_mkt_class` CHAR(50), + |`cc_mkt_desc` VARCHAR(100), + |`cc_market_manager` VARCHAR(40), + |`cc_division` INT, + |`cc_division_name` VARCHAR(50), + |`cc_company` INT, + |`cc_company_name` CHAR(50), + |`cc_street_number` CHAR(10), + |`cc_street_name` VARCHAR(60), + |`cc_street_type` CHAR(15), + |`cc_suite_number` CHAR(10), + |`cc_city` VARCHAR(60), + |`cc_county` VARCHAR(30), + |`cc_state` CHAR(2), + |`cc_zip` CHAR(10), + |`cc_country` VARCHAR(20), + |`cc_gmt_offset` DECIMAL(5,2), + |`cc_tax_percentage` DECIMAL(5,2) + """.stripMargin, + "catalog_page" -> + """ + |`cp_catalog_page_sk` INT, + |`cp_catalog_page_id` CHAR(16), + |`cp_start_date_sk` INT, + |`cp_end_date_sk` INT, + |`cp_department` VARCHAR(50), + |`cp_catalog_number` INT, + |`cp_catalog_page_number` INT, + |`cp_description` VARCHAR(100), + |`cp_type` VARCHAR(100) + """.stripMargin, + "web_site" -> + """ + |`web_site_sk` INT, + |`web_site_id` CHAR(16), + |`web_rec_start_date` DATE, + |`web_rec_end_date` DATE, + |`web_name` VARCHAR(50), + |`web_open_date_sk` INT, + |`web_close_date_sk` INT, + |`web_class` VARCHAR(50), + |`web_manager` VARCHAR(40), + |`web_mkt_id` INT, + |`web_mkt_class` VARCHAR(50), + |`web_mkt_desc` VARCHAR(100), + |`web_market_manager` VARCHAR(40), + |`web_company_id` INT, + |`web_company_name` CHAR(50), + |`web_street_number` CHAR(10), + |`web_street_name` VARCHAR(60), + |`web_street_type` CHAR(15), + |`web_suite_number` CHAR(10), + |`web_city` VARCHAR(60), + |`web_county` VARCHAR(30), + |`web_state` CHAR(2), + |`web_zip` CHAR(10), + |`web_country` VARCHAR(20), + |`web_gmt_offset` DECIMAL(5,2), + |`web_tax_percentage` DECIMAL(5,2) + """.stripMargin, + "web_page" -> + """ + |`wp_web_page_sk` INT, + |`wp_web_page_id` CHAR(16), + |`wp_rec_start_date` DATE, + |`wp_rec_end_date` DATE, + |`wp_creation_date_sk` INT, + |`wp_access_date_sk` INT, + |`wp_autogen_flag` CHAR(1), + |`wp_customer_sk` INT, + |`wp_url` VARCHAR(100), + |`wp_type` CHAR(50), + |`wp_char_count` INT, + |`wp_link_count` INT, + |`wp_image_count` INT, + |`wp_max_ad_count` INT + """.stripMargin, + "warehouse" -> + """ + |`w_warehouse_sk` INT, + |`w_warehouse_id` CHAR(16), + |`w_warehouse_name` VARCHAR(20), + |`w_warehouse_sq_ft` INT, + |`w_street_number` CHAR(10), + |`w_street_name` VARCHAR(20), + |`w_street_type` CHAR(15), + |`w_suite_number` CHAR(10), + |`w_city` VARCHAR(60), + |`w_county` VARCHAR(30), + |`w_state` CHAR(2), + |`w_zip` CHAR(10), + |`w_country` VARCHAR(20), + |`w_gmt_offset` DECIMAL(5,2) + """.stripMargin, + "customer" -> + """ + |`c_customer_sk` INT, + |`c_customer_id` CHAR(16), + |`c_current_cdemo_sk` INT, + |`c_current_hdemo_sk` INT, + |`c_current_addr_sk` INT, + |`c_first_shipto_date_sk` INT, + |`c_first_sales_date_sk` INT, + |`c_salutation` CHAR(10), + |`c_first_name` CHAR(20), + |`c_last_name` CHAR(30), + |`c_preferred_cust_flag` CHAR(1), + |`c_birth_day` INT, + |`c_birth_month` INT, + |`c_birth_year` INT, + |`c_birth_country` VARCHAR(20), + |`c_login` CHAR(13), + |`c_email_address` CHAR(50), + |`c_last_review_date` INT + """.stripMargin, + "customer_address" -> + """ + |`ca_address_sk` INT, + |`ca_address_id` CHAR(16), + |`ca_street_number` CHAR(10), + |`ca_street_name` VARCHAR(60), + |`ca_street_type` CHAR(15), + |`ca_suite_number` CHAR(10), + |`ca_city` VARCHAR(60), + |`ca_county` VARCHAR(30), + |`ca_state` CHAR(2), + |`ca_zip` CHAR(10), + |`ca_country` VARCHAR(20), + |`ca_gmt_offset` DECIMAL(5,2), + |`ca_location_type` CHAR(20) + """.stripMargin, + "customer_demographics" -> + """ + |`cd_demo_sk` INT, + |`cd_gender` CHAR(1), + |`cd_marital_status` CHAR(1), + |`cd_education_status` CHAR(20), + |`cd_purchase_estimate` INT, + |`cd_credit_rating` CHAR(10), + |`cd_dep_count` INT, + |`cd_dep_employed_count` INT, + |`cd_dep_college_count` INT + """.stripMargin, + "date_dim" -> + """ + |`d_date_sk` INT, + |`d_date_id` CHAR(16), + |`d_date` DATE, + |`d_month_seq` INT, + |`d_week_seq` INT, + |`d_quarter_seq` INT, + |`d_year` INT, + |`d_dow` INT, + |`d_moy` INT, + |`d_dom` INT, + |`d_qoy` INT, + |`d_fy_year` INT, + |`d_fy_quarter_seq` INT, + |`d_fy_week_seq` INT, + |`d_day_name` CHAR(9), + |`d_quarter_name` CHAR(1), + |`d_holiday` CHAR(1), + |`d_weekend` CHAR(1), + |`d_following_holiday` CHAR(1), + |`d_first_dom` INT, + |`d_last_dom` INT, + |`d_same_day_ly` INT, + |`d_same_day_lq` INT, + |`d_current_day` CHAR(1), + |`d_current_week` CHAR(1), + |`d_current_month` CHAR(1), + |`d_current_quarter` CHAR(1), + |`d_current_year` CHAR(1) + """.stripMargin, + "household_demographics" -> + """ + |`hd_demo_sk` INT, + |`hd_income_band_sk` INT, + |`hd_buy_potential` CHAR(15), + |`hd_dep_count` INT, + |`hd_vehicle_count` INT + """.stripMargin, + "item" -> + """ + |`i_item_sk` INT, + |`i_item_id` CHAR(16), + |`i_rec_start_date` DATE, + |`i_rec_end_date` DATE, + |`i_item_desc` VARCHAR(200), + |`i_current_price` DECIMAL(7,2), + |`i_wholesale_cost` DECIMAL(7,2), + |`i_brand_id` INT, + |`i_brand` CHAR(50), + |`i_class_id` INT, + |`i_class` CHAR(50), + |`i_category_id` INT, + |`i_category` CHAR(50), + |`i_manufact_id` INT, + |`i_manufact` CHAR(50), + |`i_size` CHAR(20), + |`i_formulation` CHAR(20), + |`i_color` CHAR(20), + |`i_units` CHAR(10), + |`i_container` CHAR(10), + |`i_manager_id` INT, + |`i_product_name` CHAR(50) + """.stripMargin, + "income_band" -> + """ + |`ib_income_band_sk` INT, + |`ib_lower_bound` INT, + |`ib_upper_bound` INT + """.stripMargin, + "promotion" -> + """ + |`p_promo_sk` INT, + |`p_promo_id` CHAR(16), + |`p_start_date_sk` INT, + |`p_end_date_sk` INT, + |`p_item_sk` INT, + |`p_cost` DECIMAL(15,2), + |`p_response_target` INT, + |`p_promo_name` CHAR(50), + |`p_channel_dmail` CHAR(1), + |`p_channel_email` CHAR(1), + |`p_channel_catalog` CHAR(1), + |`p_channel_tv` CHAR(1), + |`p_channel_radio` CHAR(1), + |`p_channel_press` CHAR(1), + |`p_channel_event` CHAR(1), + |`p_channel_demo` CHAR(1), + |`p_channel_details` VARCHAR(100), + |`p_purpose` CHAR(15), + |`p_discount_active` CHAR(1) + """.stripMargin, + "reason" -> + """ + |`r_reason_sk` INT, + |`r_reason_id` CHAR(16), + |`r_reason_desc` CHAR(100) + """.stripMargin, + "ship_mode" -> + """ + |`sm_ship_mode_sk` INT, + |`sm_ship_mode_id` CHAR(16), + |`sm_type` CHAR(30), + |`sm_code` CHAR(10), + |`sm_carrier` CHAR(20), + |`sm_contract` CHAR(20) + """.stripMargin, + "time_dim" -> + """ + |`t_time_sk` INT, + |`t_time_id` CHAR(16), + |`t_time` INT, + |`t_hour` INT, + |`t_minute` INT, + |`t_second` INT, + |`t_am_pm` CHAR(2), + |`t_shift` CHAR(20), + |`t_sub_shift` CHAR(20), + |`t_meal_time` CHAR(20) + """.stripMargin + ) + + val tableNames: Iterable[String] = tableColumns.keys + + def createTable( + spark: SparkSession, + tableName: String, + format: String = "parquet", + options: Seq[String] = Nil): Unit = { + spark.sql( + s""" + |CREATE TABLE `$tableName` (${tableColumns(tableName)}) + |USING $format + |${options.mkString("\n")} + """.stripMargin) + } + + override def beforeAll(): Unit = { + super.beforeAll() + tableNames.foreach { tableName => + createTable(spark, tableName) + } + } + + override def afterAll(): Unit = { + tableNames.foreach { tableName => + spark.sessionState.catalog.dropTable(TableIdentifier(tableName), true, true) + } + super.afterAll() + } +}