Commit graph

184 commits

Author SHA1 Message Date
Kazuaki Ishizaki 9215ee7a16 [SPARK-23976][CORE] Detect length overflow in UTF8String.concat()/ByteArray.concat()
## What changes were proposed in this pull request?

This PR detects length overflow if total elements in inputs are not acceptable.

For example, when the three inputs has `0x7FFF_FF00`, `0x7FFF_FF00`, and `0xE00`, we should detect length overflow since we cannot allocate such a large structure on `byte[]`.
On the other hand, the current algorithm can allocate the result structure with `0x1000`-byte length due to integer sum overflow.

## How was this patch tested?

Existing UTs.
If we would create UTs, we need large heap (6-8GB). It may make test environment unstable.
If it is necessary to create UTs, I will create them.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21064 from kiszk/SPARK-23976.
2018-05-02 10:41:34 +02:00
Marcelo Vanzin 428b903859 [SPARK-24029][CORE] Follow up: set SO_REUSEADDR on the server socket.
"childOption" is for the remote connections, not for the server socket
that actually listens for incoming connections.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21132 from vanzin/SPARK-24029.2.
2018-04-24 09:10:29 +08:00
Kazuaki Ishizaki c3a86faa53 [SPARK-10399][SPARK-23879][FOLLOWUP][CORE] Free unused off-heap memory in MemoryBlockSuite
## What changes were proposed in this pull request?

As viirya pointed out [here](https://github.com/apache/spark/pull/19222#discussion_r179910484), this PR explicitly frees unused off-heap memory in `MemoryBlockSuite`

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21117 from kiszk/SPARK-10399-free-offheap.
2018-04-23 10:45:25 +08:00
Marcelo Vanzin 32b4bcd6d3 [SPARK-24029][CORE] Set SO_REUSEADDR on listen sockets.
This allows sockets to be bound even if there are sockets
from a previous application that are still pending closure. It
avoids bind issues when, for example, re-starting the SHS.

Don't enable the option on Windows though. The following page
explains some odd behavior that this option can have there:
https://msdn.microsoft.com/en-us/library/windows/desktop/ms740621%28v=vs.85%29.aspx

I intentionally ignored server sockets that always bind to
ephemeral ports, since those don't benefit from this option.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21110 from vanzin/SPARK-24029.
2018-04-21 23:14:58 +08:00
mn-mikke e6b466084c [SPARK-23736][SQL] Extending the concat function to support array columns
## What changes were proposed in this pull request?
The PR adds a logic for easy concatenation of multiple array columns and covers:
- Concat expression has been extended to support array columns
- A Python wrapper

## How was this patch tested?
New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite
- typeCoercion/native/concat.sql

## Codegen examples
### Primitive-type elements
```
val df = Seq(
  (Seq(1 ,2), Seq(3, 4)),
  (Seq(1, 2, 3), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 070 */               project_numElements,
/* 071 */               4);
/* 072 */             if (project_size > 2147483632) {
/* 073 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_size +
/* 074 */                 " bytes of data due to exceeding the limit 2147483632 bytes" +
/* 075 */                 " for UnsafeArrayData.");
/* 076 */             }
/* 077 */
/* 078 */             byte[] project_array = new byte[(int)project_size];
/* 079 */             UnsafeArrayData project_arrayData = new UnsafeArrayData();
/* 080 */             Platform.putLong(project_array, 16, project_numElements);
/* 081 */             project_arrayData.pointTo(project_array, 16, (int)project_size);
/* 082 */             int project_counter = 0;
/* 083 */             for (int y = 0; y < 2; y++) {
/* 084 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 085 */                 if (args[y].isNullAt(z)) {
/* 086 */                   project_arrayData.setNullAt(project_counter);
/* 087 */                 } else {
/* 088 */                   project_arrayData.setInt(
/* 089 */                     project_counter,
/* 090 */                     args[y].getInt(z)
/* 091 */                   );
/* 092 */                 }
/* 093 */                 project_counter++;
/* 094 */               }
/* 095 */             }
/* 096 */             return project_arrayData;
/* 097 */           }
/* 098 */         }.concat(project_args);
/* 099 */         boolean project_isNull = project_value == null;
```

### Non-primitive-type elements
```
val df = Seq(
  (Seq("aa" ,"bb"), Seq("ccc", "ddd")),
  (Seq("x", "y"), null)
).toDF("a", "b")
df.filter('a.isNotNull).select(concat('a, 'b)).debugCodegen()
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         if (!(!inputadapter_isNull)) continue;
/* 038 */
/* 039 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 040 */
/* 041 */         ArrayData[] project_args = new ArrayData[2];
/* 042 */
/* 043 */         if (!false) {
/* 044 */           project_args[0] = inputadapter_value;
/* 045 */         }
/* 046 */
/* 047 */         boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 048 */         ArrayData inputadapter_value1 = inputadapter_isNull1 ?
/* 049 */         null : (inputadapter_row.getArray(1));
/* 050 */         if (!inputadapter_isNull1) {
/* 051 */           project_args[1] = inputadapter_value1;
/* 052 */         }
/* 053 */
/* 054 */         ArrayData project_value = new Object() {
/* 055 */           public ArrayData concat(ArrayData[] args) {
/* 056 */             for (int z = 0; z < 2; z++) {
/* 057 */               if (args[z] == null) return null;
/* 058 */             }
/* 059 */
/* 060 */             long project_numElements = 0L;
/* 061 */             for (int z = 0; z < 2; z++) {
/* 062 */               project_numElements += args[z].numElements();
/* 063 */             }
/* 064 */             if (project_numElements > 2147483632) {
/* 065 */               throw new RuntimeException("Unsuccessful try to concat arrays with " + project_numElements +
/* 066 */                 " elements due to exceeding the array size limit 2147483632.");
/* 067 */             }
/* 068 */
/* 069 */             Object[] project_arrayObjects = new Object[(int)project_numElements];
/* 070 */             int project_counter = 0;
/* 071 */             for (int y = 0; y < 2; y++) {
/* 072 */               for (int z = 0; z < args[y].numElements(); z++) {
/* 073 */                 project_arrayObjects[project_counter] = args[y].getUTF8String(z);
/* 074 */                 project_counter++;
/* 075 */               }
/* 076 */             }
/* 077 */             return new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObjects);
/* 078 */           }
/* 079 */         }.concat(project_args);
/* 080 */         boolean project_isNull = project_value == null;
```

Author: mn-mikke <mrkAha12346github>

Closes #20858 from mn-mikke/feature/array-api-concat_arrays-to-master.
2018-04-20 14:58:11 +09:00
Kris Mok f94f3624ea [SPARK-23947][SQL] Add hashUTF8String convenience method to hasher classes
## What changes were proposed in this pull request?

Add `hashUTF8String()` to the hasher classes to allow Spark SQL codegen to generate cleaner code for hashing `UTF8String`s. No change in behavior otherwise.

Although with the introduction of SPARK-10399, the code size for hashing `UTF8String` is already smaller, it's still good to extract a separate function in the hasher classes so that the generated code can stay clean.

## How was this patch tested?

Existing tests.

Author: Kris Mok <kris.mok@databricks.com>

Closes #21016 from rednaxelafx/hashutf8.
2018-04-09 21:07:28 -07:00
Kazuaki Ishizaki 710a68cec2 [SPARK-23892][TEST] Improve converge and fix lint error in UTF8String-related tests
## What changes were proposed in this pull request?

This PR improves test coverage in `UTF8StringSuite` and code efficiency in `UTF8StringPropertyCheckSuite`.

This PR also fixes lint-java issue in `UTF8StringSuite` reported at [here](https://github.com/apache/spark/pull/20995#issuecomment-379325527)

```[ERROR] src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java:[28,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.```

## How was this patch tested?

Existing UT

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21000 from kiszk/SPARK-23892.
2018-04-08 20:26:31 +02:00
Kazuaki Ishizaki b6935ffb4d [SPARK-10399][SPARK-23879][HOTFIX] Fix Java lint errors
## What changes were proposed in this pull request?

This PR fixes the following errors in [Java lint](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-lint/7717/console) after #19222 has been merged. These errors were pointed by ueshin .

```
[ERROR] src/main/java/org/apache/spark/unsafe/array/ByteArrayMethods.java:[57] (sizes) LineLength: Line is longer than 100 characters (found 106).
[ERROR] src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java:[26,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OffHeapMemoryBlock.java:[23,10] (modifier) ModifierOrder: 'public' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[64,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[69,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[74,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[79,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[84,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[89,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[94,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[99,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[104,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[109,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[114,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[119,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[124,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/OnHeapMemoryBlock.java:[129,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[60,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[65,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[70,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[75,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[80,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[85,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[90,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[95,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[100,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[105,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[110,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[115,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[120,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java:[125,10] (modifier) RedundantModifier: Redundant 'final' modifier.
[ERROR] src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java:[114,16] (modifier) ModifierOrder: 'static' modifier out of order with the JLS suggestions.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/HiveHasher.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.memory.MemoryBlock.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[126,15] (naming) MethodName: Method name 'ByteArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[143,15] (naming) MethodName: Method name 'OnHeapMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/test/java/org/apache/spark/unsafe/memory/MemoryBlockSuite.java:[160,15] (naming) MethodName: Method name 'OffHeapArrayMemoryBlockTest' must match pattern '^[a-z][a-z0-9][a-zA-Z0-9_]*$'.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[19,8] (imports) UnusedImports: Unused import - com.google.common.primitives.Ints.
[ERROR] src/main/java/org/apache/spark/sql/catalyst/expressions/XXH64.java:[21,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
[ERROR] src/test/java/org/apache/spark/sql/catalyst/expressions/HiveHasherSuite.java:[20,8] (imports) UnusedImports: Unused import - org.apache.spark.unsafe.Platform.
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20991 from kiszk/SPARK-10399-jlint.
2018-04-06 10:23:26 -07:00
Kazuaki Ishizaki c926acf719 [SPARK-23882][CORE] UTF8StringSuite.writeToOutputStreamUnderflow() is not expected to be supported
## What changes were proposed in this pull request?

This PR excludes an existing UT [`writeToOutputStreamUnderflow()`](https://github.com/apache/spark/blob/master/common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java#L519-L532) in `UTF8StringSuite`.

As discussed [here](https://github.com/apache/spark/pull/19222#discussion_r177692142), the behavior of this test looks surprising. This test seems to access metadata area of the JVM object where is reserved by `Platform.BYTE_ARRAY_OFFSET`.

This test is introduced thru #16089 by NathanHowell. More specifically, [the commit](27c102deb1) `Improve test coverage of UTFString.write` introduced this UT. However, I cannot find any discussion about this UT.

I think that it would be good to exclude this UT.

```java
  public void writeToOutputStreamUnderflow() throws IOException {
    // offset underflow is apparently supported?
    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
    final byte[] test = "01234567".getBytes(StandardCharsets.UTF_8);

    for (int i = 1; i <= Platform.BYTE_ARRAY_OFFSET; ++i) {
      new UTF8String(
        new ByteArrayMemoryBlock(test, Platform.BYTE_ARRAY_OFFSET - i, test.length + i))
          .writeTo(outputStream);
      final ByteBuffer buffer = ByteBuffer.wrap(outputStream.toByteArray(), i, test.length);
      assertEquals("01234567", StandardCharsets.UTF_8.decode(buffer).toString());
      outputStream.reset();
    }
  }
```

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #20995 from kiszk/SPARK-23882.
2018-04-06 18:42:14 +02:00
Kazuaki Ishizaki 4807d381bb [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks to choose several types of memory block
## What changes were proposed in this pull request?

This PR allows us to use one of several types of `MemoryBlock`, such as byte array, int array, long array, or `java.nio.DirectByteBuffer`. To use `java.nio.DirectByteBuffer` allows to have off heap memory which is automatically deallocated by JVM. `MemoryBlock`  class has primitive accessors like `Platform.getInt()`, `Platform.putint()`, or `Platform.copyMemory()`.

This PR uses `MemoryBlock` for `OffHeapColumnVector`, `UTF8String`, and other places. This PR can improve performance of operations involving memory accesses (e.g. `UTF8String.trim`) by 1.8x.

For now, this PR does not use `MemoryBlock` for `BufferHolder` based on cloud-fan's [suggestion](https://github.com/apache/spark/pull/11494#issuecomment-309694290).

Since this PR is a successor of #11494, close #11494. Many codes were ported from #11494. Many efforts were put here. **I think this PR should credit to yzotov.**

This PR can achieve **1.1-1.4x performance improvements** for  operations in `UTF8String` or `Murmur3_x86_32`. Other operations are almost comparable performances.

Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 526 /  536          0.0   131399881.5       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       525 /  552       1022.6           1.0       1.0X
substring                                      414 /  423       1298.0           0.8       1.3X
```

With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_121-8u121-b13-0ubuntu1.16.04.2-b13 on Linux 4.4.0-22-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Hash byte arrays with length 268435487:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Murmur3_x86_32                                 474 /  488          0.0   118552232.0       1.0X

UTF8String benchmark:                    Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
hashCode                                       476 /  480       1127.3           0.9       1.0X
substring                                      287 /  291       1869.9           0.5       1.7X
```

Benchmark program
```
test("benchmark Murmur3_x86_32") {
  val length = 8192 * 32768 + 31
  val seed = 42L
  val iters = 1 << 2
  val random = new Random(seed)
  val arrays = Array.fill[MemoryBlock](numArrays) {
    val bytes = new Array[Byte](length)
    random.nextBytes(bytes)
    new ByteArrayMemoryBlock(bytes, Platform.BYTE_ARRAY_OFFSET, length)
  }

  val benchmark = new Benchmark("Hash byte arrays with length " + length,
    iters * numArrays, minNumIters = 20)
  benchmark.addCase("HiveHasher") { _: Int =>
    var sum = 0L
    for (_ <- 0L until iters) {
      sum += HiveHasher.hashUnsafeBytesBlock(
        arrays(i), Platform.BYTE_ARRAY_OFFSET, length)
    }
  }
  benchmark.run()
}

test("benchmark UTF8String") {
  val N = 512 * 1024 * 1024
  val iters = 2
  val benchmark = new Benchmark("UTF8String benchmark", N, minNumIters = 20)
  val str0 = new java.io.StringWriter() { { for (i <- 0 until N) { write(" ") } } }.toString
  val s0 = UTF8String.fromString(str0)
  benchmark.addCase("hashCode") { _: Int =>
    var h: Int = 0
    for (_ <- 0L until iters) { h += s0.hashCode }
  }
  benchmark.addCase("substring") { _: Int =>
    var s: UTF8String = null
    for (_ <- 0L until iters) { s = s0.substring(N / 2 - 5, N / 2 + 5) }
  }
  benchmark.run()
}
```

I run [this benchmark program](https://gist.github.com/kiszk/94f75b506c93a663bbbc372ffe8f05de) using [the commit](ee5a79861c). I got the following results:

```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Memory access benchmarks:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
ByteArrayMemoryBlock get/putInt()              220 /  221        609.3           1.6       1.0X
Platform get/putInt(byte[])                    220 /  236        610.9           1.6       1.0X
Platform get/putInt(Object)                    492 /  494        272.8           3.7       0.4X
OnHeapMemoryBlock get/putLong()                322 /  323        416.5           2.4       0.7X
long[]                                         221 /  221        608.0           1.6       1.0X
Platform get/putLong(long[])                   321 /  321        418.7           2.4       0.7X
Platform get/putLong(Object)                   561 /  563        239.2           4.2       0.4X
```

I also run [this benchmark program](https://gist.github.com/kiszk/5fdb4e03733a5d110421177e289d1fb5) for comparing performance of `Platform.copyMemory()`.
```
OpenJDK 64-Bit Server VM 1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12 on Linux 4.4.0-66-generic
Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
Platform copyMemory:                     Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Object to Object                              1961 / 1967          8.6         116.9       1.0X
System.arraycopy Object to Object             1917 / 1921          8.8         114.3       1.0X
byte array to byte array                      1961 / 1968          8.6         116.9       1.0X
System.arraycopy byte array to byte array      1909 / 1937          8.8         113.8       1.0X
int array to int array                        1921 / 1990          8.7         114.5       1.0X
double array to double array                  1918 / 1923          8.7         114.3       1.0X
Object to byte array                          1961 / 1967          8.6         116.9       1.0X
Object to short array                         1965 / 1972          8.5         117.1       1.0X
Object to int array                           1910 / 1915          8.8         113.9       1.0X
Object to float array                         1971 / 1978          8.5         117.5       1.0X
Object to double array                        1919 / 1944          8.7         114.4       1.0X
byte array to Object                          1959 / 1967          8.6         116.8       1.0X
int array to Object                           1961 / 1970          8.6         116.9       1.0X
double array to Object                        1917 / 1924          8.8         114.3       1.0X
```

These results show three facts:
1. According to the second/third or sixth/seventh results in the first experiment, if we use `Platform.get/putInt(Object)`, we achieve more than 2x worse performance than `Platform.get/putInt(byte[])` with concrete type (i.e. `byte[]`).
2. According to the second/third or fourth/fifth/sixth results in the first experiment, the fastest way to access an array element on Java heap is `array[]`. **Cons of `array[]` is that it is not possible to support unaligned-8byte access.**
3. According to the first/second/third or fourth/sixth/seventh results in the first experiment, `getInt()/putInt() or getLong()/putLong()` in subclasses of `MemoryBlock` can achieve comparable performance to `Platform.get/putInt()` or `Platform.get/putLong()` with concrete type (second or sixth result). There is no overhead regarding virtual call.
4. According to results in the second experiment, for `Platform.copy()`, to pass `Object` can achieve the same performance as to pass any type of primitive array as source or destination.
5. According to second/fourth results in the second experiment, `Platform.copy()` can achieve the same performance as `System.arrayCopy`. **It would be good to use `Platform.copy()` since `Platform.copy()` can take any types for src and dst.**

We are incrementally replace `Platform.get/putXXX` with `MemoryBlock.get/putXXX`. This is because we have two advantages.
1) Achieve better performance due to having a concrete type for an array.
2) Use simple OO design instead of passing `Object`
It is easy to use `MemoryBlock` in `InternalRow`, `BufferHolder`, `TaskMemoryManager`, and others that are already abstracted. It is not easy to use `MemoryBlock` in utility classes related to hashing or others.

Other candidates are
- UnsafeRow, UnsafeArrayData, UnsafeMapData, SpecificUnsafeRowJoiner
- UTF8StringBuffer
- BufferHolder
- TaskMemoryManager
- OnHeapColumnVector
- BytesToBytesMap
- CachedBatch
- classes for hash
- others.

## How was this patch tested?

Added `UnsafeMemoryAllocator`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19222 from kiszk/SPARK-10399.
2018-04-06 10:13:59 +08:00
Maxim Gekk 5e7bc2acef [SPARK-23649][SQL] Skipping chars disallowed in UTF-8
## What changes were proposed in this pull request?

The mapping of UTF-8 char's first byte to char's size doesn't cover whole range 0-255. It is defined only for 0-253:
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L60-L65
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L190

If the first byte of a char is 253-255, IndexOutOfBoundsException is thrown. Besides of that values for 244-252 are not correct according to recent unicode standard for UTF-8: http://www.unicode.org/versions/Unicode10.0.0/UnicodeStandard-10.0.pdf

As a consequence of the exception above, the length of input string in UTF-8 encoding cannot be calculated if the string contains chars started from 253 code. It is visible on user's side as for example crashing of schema inferring of csv file which contains such chars but the file can be read if the schema is specified explicitly or if the mode set to multiline.

The proposed changes build correct mapping of first byte of UTF-8 char to its size (now it covers all cases) and skip disallowed chars (counts it as one octet).

## How was this patch tested?

Added a test and a file with a char which is disallowed in UTF-8 - 0xFF.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #20796 from MaxGekk/skip-wrong-utf8-chars.
2018-03-20 10:34:56 -07:00
Shintaro Murakami d5ed2108d3 [SPARK-23381][CORE] Murmur3 hash generates a different value from other implementations
## What changes were proposed in this pull request?
Murmur3 hash generates a different value from the original and other implementations (like Scala standard library and Guava or so) when the length of a bytes array is not multiple of 4.

## How was this patch tested?
Added a unit test.

**Note: When we merge this PR, please give all the credits to Shintaro Murakami.**

Author: Shintaro Murakami <mrkm4ntrgmail.com>

Author: gatorsmile <gatorsmile@gmail.com>
Author: Shintaro Murakami <mrkm4ntr@gmail.com>

Closes #20630 from gatorsmile/pr-20568.
2018-02-16 17:17:55 -08:00
Marco Gaido 0fc26313f8 [SPARK-21860][CORE][FOLLOWUP] fix java style error
## What changes were proposed in this pull request?

#19077 introduced a Java style error (too long line). Quick fix.

## How was this patch tested?

running `./dev/lint-java`

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #20558 from mgaido91/SPARK-21860.
2018-02-09 08:46:27 -06:00
liuxian 76e019d9bd [SPARK-21860][CORE] Improve memory reuse for heap memory in HeapMemoryAllocator
## What changes were proposed in this pull request?
In `HeapMemoryAllocator`, when allocating memory from pool, and the key of pool is memory size.
Actually some size of memory ,such as 1025bytes,1026bytes,......1032bytes, we can think they are the same,because we allocate memory in multiples of 8 bytes.
In this case, we can improve memory reuse.

## How was this patch tested?
Existing tests and added unit tests

Author: liuxian <liu.xian3@zte.com.cn>

Closes #19077 from 10110346/headmemoptimize.
2018-02-08 23:41:30 +08:00
Shixiong Zhu ec63e2d074 [SPARK-23289][CORE] OneForOneBlockFetcher.DownloadCallback.onData should write the buffer fully
## What changes were proposed in this pull request?

`channel.write(buf)` may not write the whole buffer since the underlying channel is a FileChannel, we should retry until the whole buffer is written.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20461 from zsxwing/SPARK-23289.
2018-02-01 21:00:47 +08:00
Marcelo Vanzin aa3a1276f9 [SPARK-23103][CORE] Ensure correct sort order for negative values in LevelDB.
The code was sorting "0" as "less than" negative values, which is a little
wrong. Fix is simple, most of the changes are the added test and related
cleanup.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20284 from vanzin/SPARK-23103.
2018-01-19 13:32:20 -06:00
Dongjoon Hyun 7bd14cfd40 [MINOR][BUILD] Fix Java linter errors
## What changes were proposed in this pull request?

This PR cleans up the java-lint errors (for v2.3.0-rc1 tag). Hopefully, this will be the final one.

```
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java:[85] (sizes) LineLength: Line is longer than 100 characters (found 101).
[ERROR] src/main/java/org/apache/spark/launcher/InProcessAppHandle.java:[20,8] (imports) UnusedImports: Unused import - java.io.IOException.
[ERROR] src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java:[41,9] (modifier) ModifierOrder: 'private' modifier out of order with the JLS suggestions.
[ERROR] src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java:[464] (sizes) LineLength: Line is longer than 100 characters (found 102).
```

## How was this patch tested?

Manual.

```
$ dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks passed.
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #20242 from dongjoon-hyun/fix_lint_java_2.3_rc1.
2018-01-12 10:18:42 -08:00
gatorsmile 651f76153f [SPARK-23028] Bump master branch version to 2.4.0-SNAPSHOT
## What changes were proposed in this pull request?
This patch bumps the master branch version to `2.4.0-SNAPSHOT`.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #20222 from gatorsmile/bump24.
2018-01-13 00:37:59 +08:00
Marcelo Vanzin 1c70da3bfb [SPARK-20657][CORE] Speed up rendering of the stages page.
There are two main changes to speed up rendering of the tasks list
when rendering the stage page.

The first one makes the code only load the tasks being shown in the
current page of the tasks table, and information related to only
those tasks. One side-effect of this change is that the graph that
shows task-related events now only shows events for the tasks in
the current page, instead of the previously hardcoded limit of "events
for the first 1000 tasks". That ends up helping with readability,
though.

To make sorting efficient when using a disk store, the task wrapper
was extended to include many new indices, one for each of the sortable
columns in the UI, and metrics for which quantiles are calculated.

The second changes the way metric quantiles are calculated for stages.
Instead of using the "Distribution" class to process data for all task
metrics, which requires scanning all tasks of a stage, the code now
uses the KVStore "skip()" functionality to only read tasks that contain
interesting information for the quantiles that are desired.

This is still not cheap; because there are many metrics that the UI
and API track, the code needs to scan the index for each metric to
gather the information. Savings come mainly from skipping deserialization
when using the disk store, but the in-memory code also seems to be
faster than before (most probably because of other changes in this
patch).

To make subsequent calls faster, some quantiles are cached in the
status store. This makes UIs much faster after the first time a stage
has been loaded.

With the above changes, a lot of code in the UI layer could be simplified.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20013 from vanzin/SPARK-20657.
2018-01-11 19:41:48 +08:00
Josh Rosen f340b6b306 [SPARK-22997] Add additional defenses against use of freed MemoryBlocks
## What changes were proposed in this pull request?

This patch modifies Spark's `MemoryAllocator` implementations so that `free(MemoryBlock)` mutates the passed block to clear pointers (in the off-heap case) or null out references to backing `long[]` arrays (in the on-heap case). The goal of this change is to add an extra layer of defense against use-after-free bugs because currently it's hard to detect corruption caused by blind writes to freed memory blocks.

## How was this patch tested?

New unit tests in `PlatformSuite`, including new tests for existing functionality because we did not have sufficient mutation coverage of the on-heap memory allocator's pooling logic.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #20191 from JoshRosen/SPARK-22997-add-defenses-against-use-after-free-bugs-in-memory-allocator.
2018-01-10 00:45:47 -08:00
jerryshao 93f92c0ed7 [SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for external shuffle service
## What changes were proposed in this pull request?

This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code.

## How was this patch tested?

Existing tests.

Author: jerryshao <sshao@hortonworks.com>

Closes #20144 from jerryshao/SPARK-21475-v2.
2018-01-04 11:39:42 -08:00
Sean Owen c284c4e1f6 [MINOR] Fix a bunch of typos 2018-01-02 07:10:19 +09:00
Shixiong Zhu 14c4a62c12 [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths"
## What changes were proposed in this pull request?

This reverts commit 5fd0294ff8 because of a huge performance regression.
I manually fixed a minor conflict in `OneForOneBlockFetcher.java`.

`Files.newInputStream` returns `sun.nio.ch.ChannelInputStream`. `ChannelInputStream` doesn't override `InputStream.skip`, so it's using the default `InputStream.skip` which just consumes and discards data. This causes a huge performance regression when reading shuffle files.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #20119 from zsxwing/revert-SPARK-21475.
2017-12-29 22:33:29 -08:00
Takeshi Yamamuro f2b3525c17 [SPARK-22771][SQL] Concatenate binary inputs into a binary output
## What changes were proposed in this pull request?
This pr modified `concat` to concat binary inputs into a single binary output.
`concat` in the current master always output data as a string. But, in some databases (e.g., PostgreSQL), if all inputs are binary, `concat` also outputs binary.

## How was this patch tested?
Added tests in `SQLQueryTestSuite` and `TypeCoercionSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19977 from maropu/SPARK-22771.
2017-12-30 14:09:56 +08:00
Bryan Cutler 59d52631eb [SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
## What changes were proposed in this pull request?

Upgrade Spark to Arrow 0.8.0 for Java and Python.  Also includes an upgrade of Netty to 4.1.17 to resolve dependency requirements.

The highlights that pertain to Spark for the update from Arrow versoin 0.4.1 to 0.8.0 include:

* Java refactoring for more simple API
* Java reduced heap usage and streamlined hot code paths
* Type support for DecimalType, ArrayType
* Improved type casting support in Python
* Simplified type checking in Python

## How was this patch tested?

Existing tests

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #19884 from BryanCutler/arrow-upgrade-080-SPARK-22324.
2017-12-21 20:43:56 +09:00
Yuming Wang 9df08e218c [SPARK-22454][CORE] ExternalShuffleClient.close() should check clientFactory null
## What changes were proposed in this pull request?

`ExternalShuffleClient.close()` should check `clientFactory` null. otherwise it will throw NPE sometimes:
```
17/11/06 20:08:05 ERROR Utils: Uncaught exception in thread main
java.lang.NullPointerException
	at org.apache.spark.network.shuffle.ExternalShuffleClient.close(ExternalShuffleClient.java:152)
	at org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1407)
	at org.apache.spark.SparkEnv.stop(SparkEnv.scala:89)
	at org.apache.spark.SparkContext$$anonfun$stop$11.apply$mcV$sp(SparkContext.scala:1849)
```

## How was this patch tested?
manual tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #19670 from wangyum/SPARK-22454.
2017-11-07 08:30:58 +00:00
Marcelo Vanzin 0e9a750a8d [SPARK-20643][CORE] Add listener implementation to collect app state.
The initial listener code is based on the existing JobProgressListener (and others),
and tries to mimic their behavior as much as possible. The change also includes
some minor code movement so that some types and methods from the initial history
server code code can be reused.

The code introduces a few mutable versions of public API types, used internally,
to make it easier to update information without ugly copy methods, and also to
make certain updates cheaper.

Note the code here is not 100% correct. This is meant as a building ground for
the UI integration in the next milestones. As different parts of the UI are
ported, fixes will be made to the different parts of this code to account
for the needed behavior.

I also added annotations to API types so that Jackson is able to correctly
deserialize options, sequences and maps that store primitive types.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19383 from vanzin/SPARK-20643.
2017-10-26 11:05:16 -05:00
liuxian 3d43a9f939 [SPARK-22349] In on-heap mode, when allocating memory from pool,we should fill memory with MEMORY_DEBUG_FILL_CLEAN_VALUE
## What changes were proposed in this pull request?
In on-heap mode, when allocating memory from pool,we should fill memory with `MEMORY_DEBUG_FILL_CLEAN_VALUE`

## How was this patch tested?
added unit tests

Author: liuxian <liu.xian3@zte.com.cn>

Closes #19572 from 10110346/MEMORY_DEBUG.
2017-10-25 21:34:00 +05:30
jerryshao e1960c3d6f [SPARK-22062][CORE] Spill large block to disk in BlockManager's remote fetch to avoid OOM
## What changes were proposed in this pull request?

In the current BlockManager's `getRemoteBytes`, it will call `BlockTransferService#fetchBlockSync` to get remote block. In the `fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the whole fetched block. This will potentially lead to OOM if block size is too big or several blocks are fetched simultaneously in this executor.

So here leveraging the idea of shuffle fetch, to spill the large block to local disk before consumed by upstream code. The behavior is controlled by newly added configuration, if block size is smaller than the threshold, then this block will be persisted in memory; otherwise it will first spill to disk, and then read from disk file.

To achieve this feature, what I did is:

1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is not only used by shuffle.
2. Add a new `TempFileManager` to manage the files of fetched remote blocks, the files are tracked by weak reference, will be deleted when no use at all.

## How was this patch tested?

This was tested by adding UT, also manual verification in local test to perform GC to clean the files.

Author: jerryshao <sshao@hortonworks.com>

Closes #19476 from jerryshao/SPARK-22062.
2017-10-17 22:54:38 +08:00
Feng Liu bebd2e1ce1 [SPARK-22222][CORE] Fix the ARRAY_MAX in BufferHolder and add a test
## What changes were proposed in this pull request?

We should not break the assumption that the length of the allocated byte array is word rounded:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170
So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` as the upper bound of an allocated byte array.

cc: srowen gatorsmile
## How was this patch tested?

Since the Spark unit test JVM has less than 1GB heap, here we run the test code as a submit job, so it can run on a JVM has 4GB memory.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Feng Liu <fengliu@databricks.com>

Closes #19460 from liufengdb/fix_array_max.
2017-10-09 21:34:37 -07:00
Thomas Graves a74ec6d7bb [SPARK-22218] spark shuffle services fails to update secret on app re-attempts
This patch fixes application re-attempts when running spark on yarn using the external shuffle service with security on.  Currently executors will fail to launch on any application re-attempt when launched on a nodemanager that had an executor from the first attempt.  The reason for this is because we aren't updating the secret key after the first application attempt.  The fix here is to just remove the containskey check to see if it already exists. In this way, we always add it and make sure its the most recent secret.  Similarly remove the check for containsKey on the remove since its just adding extra check that isn't really needed.

Note this worked before spark 2.2 because the check used to be contains (which was looking for the value) rather then containsKey, so that never matched and it was just always adding the new secret.

Patch was tested on a 10 node cluster as well as added the unit test.
The test ran was a wordcount where the output directory already existed.  With the bug present the application attempt failed with max number of executor Failures which were all saslExceptions.  With the fix present the application re-attempts fail with directory already exists or when you remove the directory between attempts the re-attemps succeed.

Author: Thomas Graves <tgraves@unharmedunarmed.corp.ne1.yahoo.com>

Closes #19450 from tgravescs/SPARK-22218.
2017-10-09 12:56:37 -07:00
Kazuaki Ishizaki 12e740bba1 [SPARK-22130][CORE] UTF8String.trim() scans " " twice
## What changes were proposed in this pull request?

This PR allows us to scan a string including only white space (e.g. `"     "`) once while the current implementation scans twice (right to left, and then left to right).

## How was this patch tested?

Existing test suites

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #19355 from kiszk/SPARK-22130.
2017-09-27 23:19:10 +09:00
Marcelo Vanzin 74daf622de [SPARK-20642][CORE] Store FsHistoryProvider listing data in a KVStore.
The application listing is still generated from event logs, but is now stored
in a KVStore instance. By default an in-memory store is used, but a new config
allows setting a local disk path to store the data, in which case a LevelDB
store will be created.

The provider stores things internally using the public REST API types; I believe
this is better going forward since it will make it easier to get rid of the
internal history server API which is mostly redundant at this point.

I also added a finalizer to LevelDBIterator, to make sure that resources are
eventually released. This helps when code iterates but does not exhaust the
iterator, thus not triggering the auto-close code.

HistoryServerSuite was modified to not re-start the history server unnecessarily;
this makes the json validation tests run more quickly.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18887 from vanzin/SPARK-20642.
2017-09-27 20:33:41 +08:00
Sean Owen 50ada2a4d3 [SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations
## What changes were proposed in this pull request?

Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19266 from srowen/SPARK-22033.
2017-09-23 15:40:59 +01:00
jerryshao 1da5822e6a [SPARK-21934][CORE] Expose Shuffle Netty memory usage to MetricsSystem
## What changes were proposed in this pull request?

This is a followup work of SPARK-9104 to expose the Netty memory usage to MetricsSystem. Current the shuffle Netty memory usage of `NettyBlockTransferService` will be exposed, if using external shuffle, then the Netty memory usage of `ExternalShuffleClient` and `ExternalShuffleService` will be exposed instead. Currently I don't expose Netty memory usage of `YarnShuffleService`, because `YarnShuffleService` doesn't have `MetricsSystem` itself, and is better to connect to Hadoop's MetricsSystem.

## How was this patch tested?

Manually verified in local cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #19160 from jerryshao/SPARK-21934.
2017-09-21 13:54:30 +08:00
Sean Owen 3d4dd14cd5 [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it, fix violations
## What changes were proposed in this pull request?

Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19282 from srowen/SPARK-22066.
2017-09-20 10:01:46 +01:00
Kevin Yu c66d64b3df [SPARK-14878][SQL] Trim characters string function support
#### What changes were proposed in this pull request?

This PR enhances the TRIM function support in Spark SQL by allowing the specification
of trim characters set. Below is the SQL syntax :

``` SQL
<trim function> ::= TRIM <left paren> <trim operands> <right paren>
<trim operands> ::= [ [ <trim specification> ] [ <trim character set> ] FROM ] <trim source>
<trim source> ::= <character value expression>
<trim specification> ::=
  LEADING
| TRAILING
| BOTH
<trim character set> ::= <characters value expression>
```
or
``` SQL
LTRIM (source-exp [, trim-exp])
RTRIM (source-exp [, trim-exp])
```

Here are the documentation link of support of this feature by other mainstream databases.
- **Oracle:** [TRIM function](http://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2126.htm#OLADM704)
- **DB2:** [TRIM scalar function](https://www.ibm.com/support/knowledgecenter/en/SSMKHH_10.0.0/com.ibm.etools.mft.doc/ak05270_.htm)
- **MySQL:** [Trim function](http://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_trim)
- **Oracle:** [ltrim](https://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2018.htm#OLADM594)
- **DB2:** [ltrim](https://www.ibm.com/support/knowledgecenter/en/SSEPEK_11.0.0/sqlref/src/tpc/db2z_bif_ltrim.html)

This PR is to implement the above enhancement. In the implementation, the design principle is to keep the changes to the minimum. Also, the exiting trim functions (which handles a special case, i.e., trimming space characters) are kept unchanged for performane reasons.
#### How was this patch tested?

The unit test cases are added in the following files:
- UTF8StringSuite.java
- StringExpressionsSuite.scala
- sql/SQLQuerySuite.scala
- StringFunctionsSuite.scala

Author: Kevin Yu <qyu@us.ibm.com>

Closes #12646 from kevinyu98/spark-14878.
2017-09-18 12:12:35 -07:00
Armin 73d9067226 [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8String#compareTo Should Compare 8 Bytes at a Time for Better Performance
## What changes were proposed in this pull request?

* Using 64 bit unsigned long comparison instead of unsigned int comparison in `org.apache.spark.unsafe.types.UTF8String#compareTo` for better performance.
* Making `IS_LITTLE_ENDIAN` a constant for correctness reasons (shouldn't use a non-constant in `compareTo` implementations and it def. is a constant per JVM)

## How was this patch tested?

Build passes and the functionality is widely covered by existing tests as far as I can see.

Author: Armin <me@obrown.io>

Closes #19180 from original-brownbear/SPARK-21967.
2017-09-16 09:18:13 +01:00
Armin b6ef1f57bc [SPARK-21970][CORE] Fix Redundant Throws Declarations in Java Codebase
## What changes were proposed in this pull request?

1. Removing all redundant throws declarations from Java codebase.
2. Removing dead code made visible by this from `ShuffleExternalSorter#closeAndGetSpills`

## How was this patch tested?

Build still passes.

Author: Armin <me@obrown.io>

Closes #19182 from original-brownbear/SPARK-21970.
2017-09-13 14:04:26 +01:00
jerryshao 445f1790ad [SPARK-9104][CORE] Expose Netty memory metrics in Spark
## What changes were proposed in this pull request?

This PR exposes Netty memory usage for Spark's `TransportClientFactory` and `TransportServer`, including the details of each direct arena and heap arena metrics, as well as aggregated metrics. The purpose of adding the Netty metrics is to better know the memory usage of Netty in Spark shuffle, rpc and others network communications, and guide us to better configure the memory size of executors.

This PR doesn't expose these metrics to any sink, to leverage this feature, still requires to connect to either MetricsSystem or collect them back to Driver to display.

## How was this patch tested?

Add Unit test to verify it, also manually verified in real cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #18935 from jerryshao/SPARK-9104.
2017-09-05 21:28:54 -07:00
jerryshao 4482ff23ad [SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is disabled
In the current code, if NM recovery is not enabled then `YarnShuffleService` will write shuffle metadata to NM local dir-1, if this local dir-1 is on bad disk, then `YarnShuffleService` will be failed to start. So to solve this issue, in Spark side if NM recovery is not enabled, then Spark will not persist data into leveldb, in that case yarn shuffle service can still be served but lose the ability for recovery, (it is fine because the failure of NM will kill the containers as well as applications).

Tested in the local cluster with NM recovery off and on to see if folder is created or not. MiniCluster UT isn't added because in MiniCluster NM will always set port to 0, but NM recovery requires non-ephemeral port.

Author: jerryshao <sshao@hortonworks.com>

Closes #19032 from jerryshao/SPARK-17321.

Change-Id: I8f2fe73d175e2ad2c4e380caede3873e0192d027
2017-08-31 09:26:20 +08:00
liuxian d4895c9de6 [MINOR][TEST] Off -heap memory leaks for unit tests
## What changes were proposed in this pull request?
Free off -heap memory .
I have checked all the unit tests.

## How was this patch tested?
N/A

Author: liuxian <liu.xian3@zte.com.cn>

Closes #19075 from 10110346/memleak.
2017-08-30 10:16:11 +01:00
Sean Owen de7af295c2 [MINOR][BUILD] Fix build warnings and Java lint errors
## What changes were proposed in this pull request?

Fix build warnings and Java lint errors. This just helps a bit in evaluating (new) warnings in another PR I have open.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19051 from srowen/JavaWarnings.
2017-08-25 16:07:13 +01:00
xu.zhang 763b83ee84 [SPARK-21701][CORE] Enable RPC client to use SO_RCVBUF and SO_SNDBUF in SparkConf.
## What changes were proposed in this pull request?

TCP parameters like SO_RCVBUF and SO_SNDBUF can be set in SparkConf, and `org.apache.spark.network.server.TransportServe`r can use those parameters to build server by leveraging netty. But for TransportClientFactory, there is no such way to set those parameters from SparkConf. This could be inconsistent in server and client side when people set parameters in SparkConf. So this PR make RPC client to be enable to use those TCP parameters as well.

## How was this patch tested?

Existing tests.

Author: xu.zhang <xu.zhang@hulu.com>

Closes #18964 from neoremind/add_client_param.
2017-08-24 14:27:52 -07:00
Sanket Chintapalli 1662e93119 [SPARK-21501] Change CacheLoader to limit entries based on memory footprint
Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.

https://issues.apache.org/jira/browse/SPARK-21501

Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.

Author: Sanket Chintapalli <schintap@yahoo-inc.com>

Closes #18940 from redsanket/SPARK-21501.
2017-08-23 11:51:11 -05:00
Marcelo Vanzin 2c1bfb497f [SPARK-21671][CORE] Move kvstore to "util" sub-package, add private annotation.
Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18886 from vanzin/SPARK-21671.
2017-08-08 14:33:27 -07:00
Marcelo Vanzin 979bf946d5 [SPARK-20655][CORE] In-memory KVStore implementation.
This change adds an in-memory implementation of KVStore that can be
used by the live UI.

The implementation is not fully optimized, neither for speed nor
space, but should be fast enough for using in the listener bus.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #18395 from vanzin/SPARK-20655.
2017-08-08 11:02:54 -07:00
zhoukang 8b69b17f3f [SPARK-21544][DEPLOY][TEST-MAVEN] Tests jar of some module should not upload twice
## What changes were proposed in this pull request?

**For moudle below:**
common/network-common
streaming
sql/core
sql/catalyst
**tests.jar will install or deploy twice.Like:**
`[DEBUG] Installing org.apache.spark:spark-streaming_2.11/maven-metadata.xml to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/maven-metadata-local.xml
[INFO] Installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar
[DEBUG] Skipped re-installing /home/mi/Work/Spark/scala2.11/spark/streaming/target/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar to /home/mi/.m2/repository/org/apache/spark/spark-streaming_2.11/2.1.0-mdh2.1.0.1-SNAPSHOT/spark-streaming_2.11-2.1.0-mdh2.1.0.1-SNAPSHOT-tests.jar, seems unchanged`
**The reason is below:**
`[DEBUG]   (f) artifact = org.apache.spark:spark-streaming_2.11🫙2.1.0-mdh2.1.0.1-SNAPSHOT
[DEBUG]   (f) attachedArtifacts = [org.apache.spark:spark-streaming_2.11:test-jar:tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11🫙tests:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark
-streaming_2.11:java-source:sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:java-source:test-sources:2.1.0-mdh2.1.0.1-SNAPSHOT, org.apache.spark:spark-streaming_2.11:javadoc:javadoc:2.1.0
-mdh2.1.0.1-SNAPSHOT]`

when executing 'mvn deploy' to nexus during release.I will fail since release nexus can not be overrided.

## How was this patch tested?
Execute 'mvn clean install -Pyarn -Phadoop-2.6 -Phadoop-provided -DskipTests'

Author: zhoukang <zhoukang199191@gmail.com>

Closes #18745 from caneGuy/zhoukang/fix-installtwice.
2017-08-07 12:51:39 +01:00
Grzegorz Slowikowski 74cda94c5e [SPARK-21592][BUILD] Skip maven-compiler-plugin main and test compilations in Maven build
`scala-maven-plugin` in `incremental` mode compiles `Scala` and `Java` classes. There is no need to execute `maven-compiler-plugin` goals to compile (in fact recompile) `Java`.

This change reduces compilation time (over 10% on my machine).

Author: Grzegorz Slowikowski <gslowikowski@gmail.com>

Closes #18750 from gslowikowski/remove-redundant-compilation-from-maven.
2017-08-01 19:03:34 +01:00
jerryshao 5fd0294ff8 [SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/FileOutputStream in some critical paths
## What changes were proposed in this pull request?

Java's `FileInputStream` and `FileOutputStream` overrides finalize(), even this file input/output stream is closed correctly and promptly, it will still leave some memory footprints which will only get cleaned in Full GC. This will introduce two side effects:

1. Lots of memory footprints regarding to Finalizer will be kept in memory and this will increase the memory overhead. In our use case of external shuffle service, a busy shuffle service will have bunch of this object and potentially lead to OOM.
2. The Finalizer will only be called in Full GC, and this will increase the overhead of Full GC and lead to long GC pause.

https://bugs.openjdk.java.net/browse/JDK-8080225

https://www.cloudbees.com/blog/fileinputstream-fileoutputstream-considered-harmful

So to fix this potential issue, here propose to use NIO's Files#newInput/OutputStream instead in some critical paths like shuffle.

Left unchanged FileInputStream in core which I think is not so critical:

```
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:467:    val file = new DataInputStream(new FileInputStream(filename))
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:942:    val in = new FileInputStream(new File(path))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:76:    val fileIn = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:248:        val fis = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:910:                input = new FileInputStream(new File(t))
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:20:import java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala:132:        case Some(f) => new FileInputStream(f)
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:20:import java.io.{FileInputStream, InputStream}
./core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala:77:        val fis = new FileInputStream(f)
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:27:import org.apache.spark.io.NioBufferedFileInputStream
./core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala:94:      new DataInputStream(new NioBufferedFileInputStream(index))
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:111:        val channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:219:    val channel = new FileInputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:106:      val in = new FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:89:        inputStream = new FileInputStream(activeFile)
./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:332:        val inChannel = in.asInstanceOf[FileInputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:1533:      gzInputStream = new GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1560:      new GZIPInputStream(new FileInputStream(file))
./core/src/main/scala/org/apache/spark/util/Utils.scala:1562:      new FileInputStream(file)
./core/src/main/scala/org/apache/spark/util/Utils.scala:2090:    val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
```

Left unchanged FileOutputStream in core:

```
./core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala:957:    val out = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:20:import java.io.{DataOutputStream, File, FileOutputStream, IOException}
./core/src/main/scala/org/apache/spark/api/r/RBackend.scala:131:      val dos = new DataOutputStream(new FileOutputStream(f))
./core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala:62:    val fileOut = new FileOutputStream(file)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:160:          val outStream = new FileOutputStream(outPath)
./core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala:239:    val zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFile, false))
./core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:949:        val out = new FileOutputStream(tempFile)
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala:106:    val out = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:109:     * Therefore, for local files, use FileOutputStream instead. */
./core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:112:        new FileOutputStream(uri.getPath)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:20:import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStream}
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:71:  private var fos: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:102:    fos = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:213:      var truncateStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala:215:        truncateStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/storage/DiskStore.scala:153:    val out = new FileOutputStream(file).getChannel()
./core/src/main/scala/org/apache/spark/TestUtils.scala:20:import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
./core/src/main/scala/org/apache/spark/TestUtils.scala:81:    val jarStream = new JarOutputStream(new FileOutputStream(jarFile))
./core/src/main/scala/org/apache/spark/TestUtils.scala:96:    val jarFileStream = new FileOutputStream(jarFile)
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:20:import java.io.{File, FileOutputStream, InputStream, IOException}
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:31:  volatile private var outputStream: FileOutputStream = null
./core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala:97:    outputStream = new FileOutputStream(file, true)
./core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala:90:        gzOutputStream = new GZIPOutputStream(new FileOutputStream(gzFile))
./core/src/main/scala/org/apache/spark/util/Utils.scala:329:      if (in.isInstanceOf[FileInputStream] && out.isInstanceOf[FileOutputStream]
./core/src/main/scala/org/apache/spark/util/Utils.scala:333:        val outChannel = out.asInstanceOf[FileOutputStream].getChannel()
./core/src/main/scala/org/apache/spark/util/Utils.scala:527:      val out = new FileOutputStream(tempFile)
```

Here in `DiskBlockObjectWriter`, it uses `FileDescriptor` so it is not easy to change to NIO Files API.

For the `FileInputStream` and `FileOutputStream` in common/shuffle* I changed them all.

## How was this patch tested?

Existing tests and manual verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #18684 from jerryshao/SPARK-21475.
2017-08-01 10:23:45 +01:00