From 900908b9becd97e3e36c7089128e7ccb0c735332 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Mon, 9 Aug 2021 19:28:31 +0900 Subject: [PATCH] [SPARK-36410][CORE][SQL][STRUCTURED STREAMING][EXAMPLES] Replace anonymous classes with lambda expressions ### What changes were proposed in this pull request? The main change of this pr is replace anonymous classes with lambda expressions in Java code **Before** ```java new Thread(new Runnable() { Override public void run() { // run thread } }); ``` **After** ```java new Thread(() -> { // run thread }); ``` ### Why are the changes needed? Code Simpilefications. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass the Jenkins or GitHub Action - Manually test `JavaUserDefinedScalar` with command `bin/spark-submit run-example org.apache.spark.examples.sql.JavaUserDefinedScalar` passed Closes #33635 from LuciferYang/lambda. Authored-by: yangjie01 Signed-off-by: Hyukjin Kwon --- .../examples/sql/JavaUserDefinedScalar.java | 16 ++--- .../kafka010/JavaDirectKafkaStreamSuite.java | 63 ++++++----------- .../streaming/kafka010/JavaKafkaRDDSuite.java | 8 +-- .../JavaDataStreamReaderWriterSuite.java | 5 +- .../auth/TSubjectAssumingTransport.java | 20 +++--- .../service/cli/operation/SQLOperation.java | 68 +++++++++---------- .../service/cli/session/HiveSessionProxy.java | 8 +-- 7 files changed, 66 insertions(+), 122 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedScalar.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedScalar.java index e5e698809b..e420368023 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedScalar.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaUserDefinedScalar.java @@ -49,12 +49,8 @@ public class JavaUserDefinedScalar { // +-------+ // Define and register a one-argument UDF - spark.udf().register("plusOne", new UDF1() { - @Override - public Integer call(Integer x) { - return x + 1; - } - }, DataTypes.IntegerType); + spark.udf().register("plusOne", + (UDF1) x -> x + 1, DataTypes.IntegerType); spark.sql("SELECT plusOne(5)").show(); // +----------+ // |plusOne(5)| @@ -75,12 +71,8 @@ public class JavaUserDefinedScalar { // +------------+ // UDF in a WHERE clause - spark.udf().register("oneArgFilter", new UDF1() { - @Override - public Boolean call(Long x) { - return x > 5; - } - }, DataTypes.BooleanType); + spark.udf().register("oneArgFilter", + (UDF1) x -> x > 5, DataTypes.BooleanType); spark.range(1, 10).createOrReplaceTempView("test"); spark.sql("SELECT * FROM test WHERE oneArgFilter(id)").show(); // +---+ diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java index 2b8b1852fe..d0030c3fab 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java @@ -91,30 +91,20 @@ public class JavaDirectKafkaStreamSuite implements Serializable { JavaInputDStream> istream1 = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), - ConsumerStrategies.Subscribe(Arrays.asList(topic1), kafkaParams) + ConsumerStrategies.Subscribe(Arrays.asList(topic1), kafkaParams) ); JavaDStream stream1 = istream1.transform( // Make sure you can get offset ranges from the rdd - new Function>, - JavaRDD>>() { - @Override - public JavaRDD> call( - JavaRDD> rdd - ) { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - offsetRanges.set(offsets); - Assert.assertEquals(topic1, offsets[0].topic()); - return rdd; - } - } + (Function>, + JavaRDD>>) rdd -> { + OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + offsetRanges.set(offsets); + Assert.assertEquals(topic1, offsets[0].topic()); + return rdd; + } ).map( - new Function, String>() { - @Override - public String call(ConsumerRecord r) { - return r.value(); - } - } + (Function, String>) ConsumerRecord::value ); final Map kafkaParams2 = new HashMap<>(kafkaParams); @@ -124,41 +114,26 @@ public class JavaDirectKafkaStreamSuite implements Serializable { JavaInputDStream> istream2 = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent(), - ConsumerStrategies.Subscribe(Arrays.asList(topic2), kafkaParams2) + ConsumerStrategies.Subscribe(Arrays.asList(topic2), kafkaParams2) ); JavaDStream stream2 = istream2.transform( // Make sure you can get offset ranges from the rdd - new Function>, - JavaRDD>>() { - @Override - public JavaRDD> call( - JavaRDD> rdd - ) { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - offsetRanges.set(offsets); - Assert.assertEquals(topic2, offsets[0].topic()); - return rdd; - } - } + (Function>, + JavaRDD>>) rdd -> { + OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + offsetRanges.set(offsets); + Assert.assertEquals(topic2, offsets[0].topic()); + return rdd; + } ).map( - new Function, String>() { - @Override - public String call(ConsumerRecord r) { - return r.value(); - } - } + (Function, String>) ConsumerRecord::value ); JavaDStream unifiedStream = stream1.union(stream2); final Set result = Collections.synchronizedSet(new HashSet<>()); - unifiedStream.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaRDD rdd) { - result.addAll(rdd.collect()); - } - } + unifiedStream.foreachRDD((VoidFunction>) rdd -> result.addAll(rdd.collect()) ); ssc.start(); long startTime = System.currentTimeMillis(); diff --git a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java index b20fad2291..e726f3b33b 100644 --- a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java +++ b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaKafkaRDDSuite.java @@ -89,13 +89,7 @@ public class JavaKafkaRDDSuite implements Serializable { leaders.put(offsetRanges[0].topicPartition(), broker); leaders.put(offsetRanges[1].topicPartition(), broker); - Function, String> handler = - new Function, String>() { - @Override - public String call(ConsumerRecord r) { - return r.value(); - } - }; + Function, String> handler = ConsumerRecord::value; JavaRDD rdd1 = KafkaUtils.createRDD( sc, diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java index 5903623847..cdb6ef17c5 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/streaming/JavaDataStreamReaderWriterSuite.java @@ -58,10 +58,7 @@ public class JavaDataStreamReaderWriterSuite { .readStream() .textFile(input) .writeStream() - .foreachBatch(new VoidFunction2, Long>() { - @Override - public void call(Dataset v1, Long v2) throws Exception {} - }) + .foreachBatch((VoidFunction2, Long>) (v1, v2) -> {}) .start(); query.stop(); } diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java index d901a5be29..c81df5f5dc 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/auth/TSubjectAssumingTransport.java @@ -42,18 +42,16 @@ public class TSubjectAssumingTransport extends TFilterTransport { try { AccessControlContext context = AccessController.getContext(); Subject subject = Subject.getSubject(context); - Subject.doAs(subject, new PrivilegedExceptionAction() { - public Void run() { - try { - wrapped.open(); - } catch (TTransportException tte) { - // Wrap the transport exception in an RTE, since Subject.doAs() then goes - // and unwraps this for us out of the doAs block. We then unwrap one - // more time in our catch clause to get back the TTE. (ugh) - throw new RuntimeException(tte); - } - return null; + Subject.doAs(subject, (PrivilegedExceptionAction) () -> { + try { + wrapped.open(); + } catch (TTransportException tte) { + // Wrap the transport exception in an RTE, since Subject.doAs() then goes + // and unwraps this for us out of the doAs block. We then unwrap one + // more time in our catch clause to get back the TTE. (ugh) + throw new RuntimeException(tte); } + return null; }); } catch (PrivilegedActionException ioe) { throw new RuntimeException("Received an ioe we never threw!", ioe); diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java index 194b3f866d..ce704b281d 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/operation/SQLOperation.java @@ -191,45 +191,39 @@ public class SQLOperation extends ExecuteStatementOperation { final UserGroupInformation currentUGI = getCurrentUGI(opConfig); // Runnable impl to call runInternal asynchronously, // from a different thread - Runnable backgroundOperation = new Runnable() { - @Override - public void run() { - PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { - @Override - public Object run() throws HiveSQLException { - Hive.set(parentHive); - SessionState.setCurrentSessionState(parentSessionState); - // Set current OperationLog in this async thread for keeping on saving query log. - registerCurrentOperationLog(); - try { - runQuery(opConfig); - } catch (HiveSQLException e) { - setOperationException(e); - LOG.error("Error running hive query: ", e); - } finally { - unregisterOperationLog(); - } - return null; - } - }; - + Runnable backgroundOperation = () -> { + PrivilegedExceptionAction doAsAction = () -> { + Hive.set(parentHive); + SessionState.setCurrentSessionState(parentSessionState); + // Set current OperationLog in this async thread for keeping on saving query log. + registerCurrentOperationLog(); try { - currentUGI.doAs(doAsAction); - } catch (Exception e) { - setOperationException(new HiveSQLException(e)); - LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); + runQuery(opConfig); + } catch (HiveSQLException e) { + setOperationException(e); + LOG.error("Error running hive query: ", e); + } finally { + unregisterOperationLog(); } - finally { - /** - * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup - * when this thread is garbage collected later. - * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() - */ - if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { - ThreadWithGarbageCleanup currentThread = - (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); - currentThread.cacheThreadLocalRawStore(); - } + return null; + }; + + try { + currentUGI.doAs(doAsAction); + } catch (Exception e) { + setOperationException(new HiveSQLException(e)); + LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); + } + finally { + /** + * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup + * when this thread is garbage collected later. + * @see ThreadWithGarbageCleanup#finalize() + */ + if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { + ThreadWithGarbageCleanup currentThread = + (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); + currentThread.cacheThreadLocalRawStore(); } } }; diff --git a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java index 16ee922422..ebc6282ed2 100644 --- a/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java +++ b/sql/hive-thriftserver/src/main/java/org/apache/hive/service/cli/session/HiveSessionProxy.java @@ -55,13 +55,7 @@ public class HiveSessionProxy implements InvocationHandler { if (method.getDeclaringClass() == HiveSessionBase.class) { return invoke(method, args); } - return ugi.doAs( - new PrivilegedExceptionAction() { - @Override - public Object run() throws HiveSQLException { - return invoke(method, args); - } - }); + return ugi.doAs((PrivilegedExceptionAction) () -> invoke(method, args)); } catch (UndeclaredThrowableException e) { Throwable innerException = e.getCause(); if (innerException instanceof PrivilegedActionException) {