f0bf2eb006
### What changes were proposed in this pull request? `str_to_map ` has not implemented with codegen support, which prevents a query that contains this expression from being whole stage codegen-ed. This PR removes `CodegenFallBack` from `StringToMap`, add the codegen support for it. ### Why are the changes needed? improve codegen coverage and gain better perfomance ### Does this PR introduce any user-facing change? no ### How was this patch tested? 1. pass ComplexTypeSuite 2. manually review generated code ```java -- !query 12 explain codegen select v, str_to_map(v) from values ('abc🅰️a,:'), (null), (''), ('1:2') t(v) -- !query 12 schema struct<plan:string> -- !query 12 output Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 (maxMethodCodeSize:511; maxConstantPoolSize:188(0.29% used); numInnerClasses:0) == *Project [v#x, str_to_map(v#x, ,, :) AS str_to_map(v, ,, :)#x] +- *LocalTableScan [v#x] Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ // codegenStageId=1 /* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 007 */ private Object[] references; /* 008 */ private scala.collection.Iterator[] inputs; /* 009 */ private scala.collection.Iterator localtablescan_input_0; /* 010 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] project_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1]; /* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[] project_mutableStateArray_1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[2]; /* 012 */ /* 013 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 014 */ this.references = references; /* 015 */ } /* 016 */ /* 017 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 018 */ partitionIndex = index; /* 019 */ this.inputs = inputs; /* 020 */ localtablescan_input_0 = inputs[0]; /* 021 */ project_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64); /* 022 */ project_mutableStateArray_1[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(project_mutableStateArray_0[0], 8); /* 023 */ project_mutableStateArray_1[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(project_mutableStateArray_0[0], 8); /* 024 */ /* 025 */ } /* 026 */ /* 027 */ private void project_doConsume_0(InternalRow localtablescan_row_0, UTF8String project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException { /* 028 */ boolean project_isNull_1 = true; /* 029 */ MapData project_value_1 = null; /* 030 */ /* 031 */ if (!project_exprIsNull_0_0) { /* 032 */ project_isNull_1 = false; // resultCode could change nullability. /* 033 */ /* 034 */ int project_i_0 = 0; /* 035 */ UTF8String[] project_kvs_0 = project_expr_0_0.split(((UTF8String) references[2] /* literal */), -1); /* 036 */ while (project_i_0 < project_kvs_0.length) { /* 037 */ UTF8String[] kv = project_kvs_0[project_i_0].split(((UTF8String) references[3] /* literal */), 2); /* 038 */ UTF8String key = kv[0]; /* 039 */ UTF8String value = null; /* 040 */ if (kv.length == 2) { /* 041 */ value = kv[1]; /* 042 */ } /* 043 */ ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[1] /* mapBuilder */).put(key, value); /* 044 */ project_i_0++; /* 045 */ } /* 046 */ project_value_1 = ((org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder) references[1] /* mapBuilder */).build(); /* 047 */ /* 048 */ } /* 049 */ project_mutableStateArray_0[0].reset(); /* 050 */ /* 051 */ project_mutableStateArray_0[0].zeroOutNullBytes(); /* 052 */ /* 053 */ if (project_exprIsNull_0_0) { /* 054 */ project_mutableStateArray_0[0].setNullAt(0); /* 055 */ } else { /* 056 */ project_mutableStateArray_0[0].write(0, project_expr_0_0); /* 057 */ } /* 058 */ /* 059 */ if (project_isNull_1) { /* 060 */ project_mutableStateArray_0[0].setNullAt(1); /* 061 */ } else { /* 062 */ final MapData project_tmpInput_0 = project_value_1; /* 063 */ if (project_tmpInput_0 instanceof UnsafeMapData) { /* 064 */ project_mutableStateArray_0[0].write(1, (UnsafeMapData) project_tmpInput_0); /* 065 */ } else { /* 066 */ // Remember the current cursor so that we can calculate how many bytes are /* 067 */ // written later. /* 068 */ final int project_previousCursor_0 = project_mutableStateArray_0[0].cursor(); /* 069 */ /* 070 */ // preserve 8 bytes to write the key array numBytes later. /* 071 */ project_mutableStateArray_0[0].grow(8); /* 072 */ project_mutableStateArray_0[0].increaseCursor(8); /* 073 */ /* 074 */ // Remember the current cursor so that we can write numBytes of key array later. /* 075 */ final int project_tmpCursor_0 = project_mutableStateArray_0[0].cursor(); /* 076 */ /* 077 */ final ArrayData project_tmpInput_1 = project_tmpInput_0.keyArray(); /* 078 */ if (project_tmpInput_1 instanceof UnsafeArrayData) { /* 079 */ project_mutableStateArray_0[0].write((UnsafeArrayData) project_tmpInput_1); /* 080 */ } else { /* 081 */ final int project_numElements_0 = project_tmpInput_1.numElements(); /* 082 */ project_mutableStateArray_1[0].initialize(project_numElements_0); /* 083 */ /* 084 */ for (int project_index_0 = 0; project_index_0 < project_numElements_0; project_index_0++) { /* 085 */ project_mutableStateArray_1[0].write(project_index_0, project_tmpInput_1.getUTF8String(project_index_0)); /* 086 */ } /* 087 */ } /* 088 */ /* 089 */ // Write the numBytes of key array into the first 8 bytes. /* 090 */ Platform.putLong( /* 091 */ project_mutableStateArray_0[0].getBuffer(), /* 092 */ project_tmpCursor_0 - 8, /* 093 */ project_mutableStateArray_0[0].cursor() - project_tmpCursor_0); /* 094 */ /* 095 */ final ArrayData project_tmpInput_2 = project_tmpInput_0.valueArray(); /* 096 */ if (project_tmpInput_2 instanceof UnsafeArrayData) { /* 097 */ project_mutableStateArray_0[0].write((UnsafeArrayData) project_tmpInput_2); /* 098 */ } else { /* 099 */ final int project_numElements_1 = project_tmpInput_2.numElements(); /* 100 */ project_mutableStateArray_1[1].initialize(project_numElements_1); /* 101 */ /* 102 */ for (int project_index_1 = 0; project_index_1 < project_numElements_1; project_index_1++) { /* 103 */ if (project_tmpInput_2.isNullAt(project_index_1)) { /* 104 */ project_mutableStateArray_1[1].setNull8Bytes(project_index_1); /* 105 */ } else { /* 106 */ project_mutableStateArray_1[1].write(project_index_1, project_tmpInput_2.getUTF8String(project_index_1)); /* 107 */ } /* 108 */ /* 109 */ } /* 110 */ } /* 111 */ /* 112 */ project_mutableStateArray_0[0].setOffsetAndSizeFromPreviousCursor(1, project_previousCursor_0); /* 113 */ } /* 114 */ } /* 115 */ append((project_mutableStateArray_0[0].getRow())); /* 116 */ /* 117 */ } /* 118 */ /* 119 */ protected void processNext() throws java.io.IOException { /* 120 */ while ( localtablescan_input_0.hasNext()) { /* 121 */ InternalRow localtablescan_row_0 = (InternalRow) localtablescan_input_0.next(); /* 122 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1); /* 123 */ boolean localtablescan_isNull_0 = localtablescan_row_0.isNullAt(0); /* 124 */ UTF8String localtablescan_value_0 = localtablescan_isNull_0 ? /* 125 */ null : (localtablescan_row_0.getUTF8String(0)); /* 126 */ /* 127 */ project_doConsume_0(localtablescan_row_0, localtablescan_value_0, localtablescan_isNull_0); /* 128 */ if (shouldStop()) return; /* 129 */ } /* 130 */ } /* 131 */ /* 132 */ } -- !query 13 select v, str_to_map(v) from values ('abc🅰️a,:'), (null), (''), ('1:2') t(v) -- !query 13 schema struct<v:string,str_to_map(v, ,, :):map<string,string>> -- !query 13 output {"":null} 1:2 {"1":"2"} NULL NULL abc🅰️a,: {"":"","abc":"a:a"} ``` Closes #27013 from yaooqinn/SPARK-30356. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
---|---|---|
.. | ||
catalyst | ||
core | ||
hive | ||
hive-thriftserver | ||
create-docs.sh | ||
gen-sql-markdown.py | ||
mkdocs.yml | ||
README.md |
Spark SQL
This module provides support for executing relational queries expressed in either SQL or the DataFrame/Dataset API.
Spark SQL is broken up into four subprojects:
- Catalyst (sql/catalyst) - An implementation-agnostic framework for manipulating trees of relational operators and expressions.
- Execution (sql/core) - A query planner / execution engine for translating Catalyst's logical query plans into Spark RDDs. This component also includes a new public interface, SQLContext, that allows users to execute SQL or LINQ statements against existing RDDs and Parquet files.
- Hive Support (sql/hive) - Includes extensions that allow users to write queries using a subset of HiveQL and access data from a Hive Metastore using Hive SerDes. There are also wrappers that allow users to run queries that include Hive UDFs, UDAFs, and UDTFs.
- HiveServer and CLI support (sql/hive-thriftserver) - Includes support for the SQL CLI (bin/spark-sql) and a HiveServer2 (for JDBC/ODBC) compatible server.
Running ./sql/create-docs.sh
generates SQL documentation for built-in functions under sql/site
.