[SPARK-29947][SQL] Improve ResolveRelations performance
### What changes were proposed in this pull request? It is very common for a SQL query to query a table more than once. For example: ``` == Physical Plan == *(12) HashAggregate(keys=[cmn_mtrc_summ_dt#21, rev_rollup#1279, CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END#1366, CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END#1367], functions=[sum(coalesce(bid_count#34, 0)), sum(coalesce(ck_trans_count#35, 0)), sum(coalesce(ended_bid_count#36, 0)), sum(coalesce(ended_lstg_count#37, 0)), sum(coalesce(ended_success_lstg_count#38, 0)), sum(coalesce(item_sold_count#39, 0)), sum(coalesce(new_lstg_count#40, 0)), sum(coalesce(gmv_us_amt#41, 0.00)), sum(coalesce(gmv_slr_lc_amt#42, 0.00)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_insrtn_fee_us_amt#46, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_insrtn_crd_us_amt#50, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fetr_fee_us_amt#54, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fetr_crd_us_amt#58, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fv_fee_us_amt#62, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fv_crd_us_amt#67, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_l_fee_us_amt#72, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_l_crd_us_amt#76, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_nl_fee_us_amt#80, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_nl_crd_us_amt#84, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_slr_tools_fee_us_amt#88, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_slr_tools_crd_us_amt#92, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), sum(coalesce(rvnu_unasgnd_us_amt#96, 0.000000)), sum((coalesce(rvnu_transaction_us_amt#112, 0.0) + coalesce(rvnu_transaction_crd_us_amt#115, 0.0))), sum((coalesce(rvnu_total_us_amt#118, 0.0) + coalesce(rvnu_total_crd_us_amt#121, 0.0)))]) +- Exchange hashpartitioning(cmn_mtrc_summ_dt#21, rev_rollup#1279, CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END#1366, CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END#1367, 200), true, [id=#403] +- *(11) HashAggregate(keys=[cmn_mtrc_summ_dt#21, rev_rollup#1279, CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END AS CASE WHEN (rev_rollup#1319 = rev_rollup#1279) THEN 0 ELSE 1 END#1366, CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END AS CASE WHEN cast(sap_category_id#24 as decimal(10,0)) IN (5,7,23,41) THEN 0 ELSE 1 END#1367], functions=[partial_sum(coalesce(bid_count#34, 0)), partial_sum(coalesce(ck_trans_count#35, 0)), partial_sum(coalesce(ended_bid_count#36, 0)), partial_sum(coalesce(ended_lstg_count#37, 0)), partial_sum(coalesce(ended_success_lstg_count#38, 0)), partial_sum(coalesce(item_sold_count#39, 0)), partial_sum(coalesce(new_lstg_count#40, 0)), partial_sum(coalesce(gmv_us_amt#41, 0.00)), partial_sum(coalesce(gmv_slr_lc_amt#42, 0.00)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_insrtn_fee_us_amt#46, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_insrtn_crd_us_amt#50, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fetr_fee_us_amt#54, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fetr_crd_us_amt#58, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_fv_fee_us_amt#62, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_fv_crd_us_amt#67, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_l_fee_us_amt#72, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_l_crd_us_amt#76, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_othr_nl_fee_us_amt#80, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_othr_nl_crd_us_amt#84, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(CheckOverflow((promote_precision(cast(coalesce(rvnu_slr_tools_fee_us_amt#88, 0.000000) as decimal(19,6))) + promote_precision(cast(coalesce(rvnu_slr_tools_crd_us_amt#92, 0.000000) as decimal(19,6)))), DecimalType(19,6), true)), partial_sum(coalesce(rvnu_unasgnd_us_amt#96, 0.000000)), partial_sum((coalesce(rvnu_transaction_us_amt#112, 0.0) + coalesce(rvnu_transaction_crd_us_amt#115, 0.0))), partial_sum((coalesce(rvnu_total_us_amt#118, 0.0) + coalesce(rvnu_total_crd_us_amt#121, 0.0)))]) +- *(11) Project [cmn_mtrc_summ_dt#21, sap_category_id#24, bid_count#34, ck_trans_count#35, ended_bid_count#36, ended_lstg_count#37, ended_success_lstg_count#38, item_sold_count#39, new_lstg_count#40, gmv_us_amt#41, gmv_slr_lc_amt#42, rvnu_insrtn_fee_us_amt#46, rvnu_insrtn_crd_us_amt#50, rvnu_fetr_fee_us_amt#54, rvnu_fetr_crd_us_amt#58, rvnu_fv_fee_us_amt#62, rvnu_fv_crd_us_amt#67, rvnu_othr_l_fee_us_amt#72, rvnu_othr_l_crd_us_amt#76, rvnu_othr_nl_fee_us_amt#80, rvnu_othr_nl_crd_us_amt#84, rvnu_slr_tools_fee_us_amt#88, rvnu_slr_tools_crd_us_amt#92, rvnu_unasgnd_us_amt#96, ... 6 more fields] +- *(11) BroadcastHashJoin [byr_cntry_id#23], [cntry_id#1309], LeftOuter, BuildRight :- *(11) Project [cmn_mtrc_summ_dt#21, byr_cntry_id#23, sap_category_id#24, bid_count#34, ck_trans_count#35, ended_bid_count#36, ended_lstg_count#37, ended_success_lstg_count#38, item_sold_count#39, new_lstg_count#40, gmv_us_amt#41, gmv_slr_lc_amt#42, rvnu_insrtn_fee_us_amt#46, rvnu_insrtn_crd_us_amt#50, rvnu_fetr_fee_us_amt#54, rvnu_fetr_crd_us_amt#58, rvnu_fv_fee_us_amt#62, rvnu_fv_crd_us_amt#67, rvnu_othr_l_fee_us_amt#72, rvnu_othr_l_crd_us_amt#76, rvnu_othr_nl_fee_us_amt#80, rvnu_othr_nl_crd_us_amt#84, rvnu_slr_tools_fee_us_amt#88, rvnu_slr_tools_crd_us_amt#92, ... 6 more fields] : +- *(11) BroadcastHashJoin [slr_cntry_id#28], [cntry_id#1269], LeftOuter, BuildRight : :- *(11) Project [gen_attr_1#360 AS cmn_mtrc_summ_dt#21, gen_attr_5#267 AS byr_cntry_id#23, gen_attr_7#268 AS sap_category_id#24, gen_attr_15#272 AS slr_cntry_id#28, gen_attr_27#278 AS bid_count#34, gen_attr_29#279 AS ck_trans_count#35, gen_attr_31#280 AS ended_bid_count#36, gen_attr_33#282 AS ended_lstg_count#37, gen_attr_35#283 AS ended_success_lstg_count#38, gen_attr_37#284 AS item_sold_count#39, gen_attr_39#281 AS new_lstg_count#40, gen_attr_41#285 AS gmv_us_amt#41, gen_attr_43#287 AS gmv_slr_lc_amt#42, gen_attr_51#290 AS rvnu_insrtn_fee_us_amt#46, gen_attr_59#294 AS rvnu_insrtn_crd_us_amt#50, gen_attr_67#298 AS rvnu_fetr_fee_us_amt#54, gen_attr_75#302 AS rvnu_fetr_crd_us_amt#58, gen_attr_83#306 AS rvnu_fv_fee_us_amt#62, gen_attr_93#311 AS rvnu_fv_crd_us_amt#67, gen_attr_103#316 AS rvnu_othr_l_fee_us_amt#72, gen_attr_111#320 AS rvnu_othr_l_crd_us_amt#76, gen_attr_119#324 AS rvnu_othr_nl_fee_us_amt#80, gen_attr_127#328 AS rvnu_othr_nl_crd_us_amt#84, gen_attr_135#332 AS rvnu_slr_tools_fee_us_amt#88, ... 6 more fields] : : +- *(11) BroadcastHashJoin [cast(gen_attr_308#777 as decimal(20,0))], [cast(gen_attr_309#803 as decimal(20,0))], LeftOuter, BuildRight : : :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields] : : : +- *(11) BroadcastHashJoin [cast(gen_attr_310#674 as int)], [cast(gen_attr_311#774 as int)], LeftOuter, BuildRight : : : :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields] : : : : +- *(11) BroadcastHashJoin [cast(gen_attr_5#267 as decimal(20,0))], [cast(gen_attr_312#665 as decimal(20,0))], LeftOuter, BuildRight : : : : :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 5 more fields] : : : : : +- *(11) BroadcastHashJoin [cast(gen_attr_313#565 as decimal(20,0))], [cast(gen_attr_314#591 as decimal(20,0))], LeftOuter, BuildRight : : : : : :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields] : : : : : : +- *(11) BroadcastHashJoin [cast(gen_attr_315#462 as int)], [cast(gen_attr_316#562 as int)], LeftOuter, BuildRight : : : : : : :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 6 more fields] : : : : : : : +- *(11) BroadcastHashJoin [cast(gen_attr_15#272 as decimal(20,0))], [cast(gen_attr_317#453 as decimal(20,0))], LeftOuter, BuildRight : : : : : : : :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, gen_attr_143#336, ... 5 more fields] : : : : : : : : +- *(11) BroadcastHashJoin [cast(gen_attr_25#277 as decimal(20,0))], [cast(gen_attr_318#379 as decimal(20,0))], LeftOuter, BuildRight : : : : : : : : :- *(11) Project [gen_attr_5#267, gen_attr_7#268, gen_attr_15#272, gen_attr_25#277, gen_attr_27#278, gen_attr_29#279, gen_attr_31#280, gen_attr_39#281, gen_attr_33#282, gen_attr_35#283, gen_attr_37#284, gen_attr_41#285, gen_attr_43#287, gen_attr_51#290, gen_attr_59#294, gen_attr_67#298, gen_attr_75#302, gen_attr_83#306, gen_attr_93#311, gen_attr_103#316, gen_attr_111#320, gen_attr_119#324, gen_attr_127#328, gen_attr_135#332, ... 6 more fields] : : : : : : : : : +- *(11) BroadcastHashJoin [cast(gen_attr_23#276 as decimal(20,0))], [cast(gen_attr_319#367 as decimal(20,0))], LeftOuter, BuildRight : : : : : : : : : :- *(11) Project [byr_cntry_id#1169 AS gen_attr_5#267, sap_category_id#1170 AS gen_attr_7#268, slr_cntry_id#1174 AS gen_attr_15#272, lstg_curncy_id#1178 AS gen_attr_23#276, blng_curncy_id#1179 AS gen_attr_25#277, bid_count#1180 AS gen_attr_27#278, ck_trans_count#1181 AS gen_attr_29#279, ended_bid_count#1182 AS gen_attr_31#280, new_lstg_count#1183 AS gen_attr_39#281, ended_lstg_count#1184 AS gen_attr_33#282, ended_success_lstg_count#1185 AS gen_attr_35#283, item_sold_count#1186 AS gen_attr_37#284, gmv_us_amt#1187 AS gen_attr_41#285, gmv_slr_lc_amt#1189 AS gen_attr_43#287, rvnu_insrtn_fee_us_amt#1192 AS gen_attr_51#290, rvnu_insrtn_crd_us_amt#1196 AS gen_attr_59#294, rvnu_fetr_fee_us_amt#1200 AS gen_attr_67#298, rvnu_fetr_crd_us_amt#1204 AS gen_attr_75#302, rvnu_fv_fee_us_amt#1208 AS gen_attr_83#306, rvnu_fv_crd_us_amt#1213 AS gen_attr_93#311, rvnu_othr_l_fee_us_amt#1218 AS gen_attr_103#316, rvnu_othr_l_crd_us_amt#1222 AS gen_attr_111#320, rvnu_othr_nl_fee_us_amt#1226 AS gen_attr_119#324, rvnu_othr_nl_crd_us_amt#1230 AS gen_attr_127#328, ... 7 more fields] : : : : : : : : : : +- *(11) ColumnarToRow : : : : : : : : : : +- FileScan parquet default.big_table1[byr_cntry_id#1169,sap_category_id#1170,slr_cntry_id#1174,lstg_curncy_id#1178,blng_curncy_id#1179,bid_count#1180,ck_trans_count#1181,ended_bid_count#1182,new_lstg_count#1183,ended_lstg_count#1184,ended_success_lstg_count#1185,item_sold_count#1186,gmv_us_amt#1187,gmv_slr_lc_amt#1189,rvnu_insrtn_fee_us_amt#1192,rvnu_insrtn_crd_us_amt#1196,rvnu_fetr_fee_us_amt#1200,rvnu_fetr_crd_us_amt#1204,rvnu_fv_fee_us_amt#1208,rvnu_fv_crd_us_amt#1213,rvnu_othr_l_fee_us_amt#1218,rvnu_othr_l_crd_us_amt#1222,rvnu_othr_nl_fee_us_amt#1226,rvnu_othr_nl_crd_us_amt#1230,... 7 more fields] Batched: true, DataFilters: [], Format: Parquet, Location: PrunedInMemoryFileIndex[], PartitionFilters: [isnotnull(cmn_mtrc_summ_dt#1262), (cmn_mtrc_summ_dt#1262 >= 18078), (cmn_mtrc_summ_dt#1262 <= 18..., PushedFilters: [], ReadSchema: struct<byr_cntry_id:decimal(4,0),sap_category_id:decimal(9,0),slr_cntry_id:decimal(4,0),lstg_curn... : : : : : : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#288] : : : : : : : : : +- *(1) Project [CURNCY_ID#1263 AS gen_attr_319#367] : : : : : : : : : +- *(1) Filter isnotnull(CURNCY_ID#1263) : : : : : : : : : +- *(1) ColumnarToRow : : : : : : : : : +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1 : : : : : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#297] : : : : : : : : +- *(2) Project [CURNCY_ID#1263 AS gen_attr_318#379] : : : : : : : : +- *(2) Filter isnotnull(CURNCY_ID#1263) : : : : : : : : +- *(2) ColumnarToRow : : : : : : : : +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1 : : : : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(4,0), true] as decimal(20,0)))), [id=#306] : : : : : : : +- *(3) Project [cntry_id#1269 AS gen_attr_317#453, rev_rollup_id#1278 AS gen_attr_315#462] : : : : : : : +- *(3) Filter isnotnull(cntry_id#1269) : : : : : : : +- *(3) ColumnarToRow : : : : : : : +- FileScan parquet default.small_table2[cntry_id#1269,rev_rollup_id#1278] Batched: true, DataFilters: [isnotnull(cntry_id#1269)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table2], PartitionFilters: [], PushedFilters: [IsNotNull(cntry_id)], ReadSchema: struct<cntry_id:decimal(4,0),rev_rollup_id:smallint> : : : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[0, smallint, true] as int) as bigint))), [id=#315] : : : : : : +- *(4) Project [rev_rollup_id#1286 AS gen_attr_316#562, curncy_id#1289 AS gen_attr_313#565] : : : : : : +- *(4) Filter isnotnull(rev_rollup_id#1286) : : : : : : +- *(4) ColumnarToRow : : : : : : +- FileScan parquet default.small_table3[rev_rollup_id#1286,curncy_id#1289] Batched: true, DataFilters: [isnotnull(rev_rollup_id#1286)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table3], PartitionFilters: [], PushedFilters: [IsNotNull(rev_rollup_id)], ReadSchema: struct<rev_rollup_id:smallint,curncy_id:decimal(4,0)> : : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#324] : : : : : +- *(5) Project [CURNCY_ID#1263 AS gen_attr_314#591] : : : : : +- *(5) Filter isnotnull(CURNCY_ID#1263) : : : : : +- *(5) ColumnarToRow : : : : : +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1 : : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(4,0), true] as decimal(20,0)))), [id=#333] : : : : +- *(6) Project [cntry_id#1269 AS gen_attr_312#665, rev_rollup_id#1278 AS gen_attr_310#674] : : : : +- *(6) Filter isnotnull(cntry_id#1269) : : : : +- *(6) ColumnarToRow : : : : +- FileScan parquet default.small_table2[cntry_id#1269,rev_rollup_id#1278] Batched: true, DataFilters: [isnotnull(cntry_id#1269)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table2], PartitionFilters: [], PushedFilters: [IsNotNull(cntry_id)], ReadSchema: struct<cntry_id:decimal(4,0),rev_rollup_id:smallint> : : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[0, smallint, true] as int) as bigint))), [id=#342] : : : +- *(7) Project [rev_rollup_id#1286 AS gen_attr_311#774, curncy_id#1289 AS gen_attr_308#777] : : : +- *(7) Filter isnotnull(rev_rollup_id#1286) : : : +- *(7) ColumnarToRow : : : +- FileScan parquet default.small_table3[rev_rollup_id#1286,curncy_id#1289] Batched: true, DataFilters: [isnotnull(rev_rollup_id#1286)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table3], PartitionFilters: [], PushedFilters: [IsNotNull(rev_rollup_id)], ReadSchema: struct<rev_rollup_id:smallint,curncy_id:decimal(4,0)> : : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, decimal(9,0), true] as decimal(20,0)))), [id=#351] : : +- *(8) Project [CURNCY_ID#1263 AS gen_attr_309#803] : : +- *(8) Filter isnotnull(CURNCY_ID#1263) : : +- *(8) ColumnarToRow : : +- FileScan parquet default.small_table1[CURNCY_ID#1263] Batched: true, DataFilters: [isnotnull(CURNCY_ID#1263)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table1], PartitionFilters: [], PushedFilters: [IsNotNull(CURNCY_ID)], ReadSchema: struct<CURNCY_ID:decimal(9,0)>, SelectedBucketsCount: 1 out of 1 : +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(4,0), true])), [id=#360] : +- *(9) Project [cntry_id#1269, rev_rollup#1279] : +- *(9) Filter isnotnull(cntry_id#1269) : +- *(9) ColumnarToRow : +- FileScan parquet default.small_table2[cntry_id#1269,rev_rollup#1279] Batched: true, DataFilters: [isnotnull(cntry_id#1269)], Format: Parquet, Location: InMemoryFileIndex[file:/user/hive/warehouse/small_table2], PartitionFilters: [], PushedFilters: [IsNotNull(cntry_id)], ReadSchema: struct<cntry_id:decimal(4,0),rev_rollup:string> +- ReusedExchange [cntry_id#1309, rev_rollup#1319], BroadcastExchange HashedRelationBroadcastMode(List(input[0, decimal(4,0), true])), [id=#360] ``` This PR try to improve `ResolveTables` and `ResolveRelations` performance by reducing the connection times to Hive Metastore Server in such case. ### Why are the changes needed? 1. Reduce the connection times to Hive Metastore Server. 2. Improve `ResolveTables` and `ResolveRelations` performance. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? manual test. After [SPARK-29606](https://issues.apache.org/jira/browse/SPARK-29606) and before this PR: ``` === Metrics of Analyzer/Optimizer Rules === Total number of runs: 9323 Total time: 2.687441263 seconds Rule Effective Time / Total Time Effective Runs / Total Runs org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations 929173767 / 930133504 2 / 18 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables 0 / 383363402 0 / 18 org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin 0 / 99433540 0 / 4 org.apache.spark.sql.catalyst.analysis.DecimalPrecision 41809394 / 83727901 2 / 18 org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions 71372977 / 71372977 1 / 1 org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts 0 / 59071933 0 / 18 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences 37858325 / 58471776 5 / 18 org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings 20889892 / 53229016 1 / 18 org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion 23428968 / 50890815 1 / 18 org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion 23230666 / 49182607 1 / 18 org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator 0 / 43638350 0 / 18 org.apache.spark.sql.catalyst.optimizer.ColumnPruning 17194844 / 42530885 1 / 6 ``` After [SPARK-29606](https://issues.apache.org/jira/browse/SPARK-29606) and after this PR: ``` === Metrics of Analyzer/Optimizer Rules === Total number of runs: 9323 Total time: 2.163765869 seconds Rule Effective Time / Total Time Effective Runs / Total Runs org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations 658905353 / 659829383 2 / 18 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveTables 0 / 220708715 0 / 18 org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin 0 / 99606816 0 / 4 org.apache.spark.sql.catalyst.analysis.DecimalPrecision 39616060 / 78215752 2 / 18 org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences 36706549 / 54917789 5 / 18 org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions 53561921 / 53561921 1 / 1 org.apache.spark.sql.catalyst.analysis.TypeCoercion$ImplicitTypeCasts 0 / 52329678 0 / 18 org.apache.spark.sql.catalyst.analysis.TypeCoercion$PromoteStrings 20945755 / 49695998 1 / 18 org.apache.spark.sql.catalyst.analysis.TypeCoercion$FunctionArgumentConversion 20872241 / 46740145 1 / 18 org.apache.spark.sql.catalyst.analysis.TypeCoercion$InConversion 19780298 / 44327227 1 / 18 org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractGenerator 0 / 42312023 0 / 18 org.apache.spark.sql.catalyst.optimizer.ColumnPruning 17197393 / 39501424 1 / 6 ``` Closes #26589 from wangyum/SPARK-29947. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
e38964c442
commit
568ad4e77a
|
@ -93,10 +93,14 @@ object FakeV2SessionCatalog extends TableCatalog {
|
|||
* views.
|
||||
* @param nestedViewDepth The nested depth in the view resolution, this enables us to limit the
|
||||
* depth of nested views.
|
||||
* @param relationCache A mapping from qualified table names to resolved relations. This can ensure
|
||||
* that the table is resolved only once if a table is used multiple times
|
||||
* in a query.
|
||||
*/
|
||||
case class AnalysisContext(
|
||||
catalogAndNamespace: Seq[String] = Nil,
|
||||
nestedViewDepth: Int = 0)
|
||||
nestedViewDepth: Int = 0,
|
||||
relationCache: mutable.Map[Seq[String], LogicalPlan] = mutable.Map.empty)
|
||||
|
||||
object AnalysisContext {
|
||||
private val value = new ThreadLocal[AnalysisContext]() {
|
||||
|
@ -111,7 +115,7 @@ object AnalysisContext {
|
|||
def withAnalysisContext[A](catalogAndNamespace: Seq[String])(f: => A): A = {
|
||||
val originContext = value.get()
|
||||
val context = AnalysisContext(
|
||||
catalogAndNamespace, nestedViewDepth = originContext.nestedViewDepth + 1)
|
||||
catalogAndNamespace, originContext.nestedViewDepth + 1, originContext.relationCache)
|
||||
set(context)
|
||||
try f finally { set(originContext) }
|
||||
}
|
||||
|
@ -902,7 +906,9 @@ class Analyzer(
|
|||
case SessionCatalogAndIdentifier(catalog, ident) =>
|
||||
CatalogV2Util.loadTable(catalog, ident).map {
|
||||
case v1Table: V1Table =>
|
||||
v1SessionCatalog.getRelation(v1Table.v1Table)
|
||||
val key = catalog.name +: ident.namespace :+ ident.name
|
||||
AnalysisContext.get.relationCache.getOrElseUpdate(
|
||||
key, v1SessionCatalog.getRelation(v1Table.v1Table))
|
||||
case table =>
|
||||
DataSourceV2Relation.create(table)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue