[SPARK-36410][CORE][SQL][STRUCTURED STREAMING][EXAMPLES] Replace anonymous classes with lambda expressions

### What changes were proposed in this pull request?
The main change of this pr is replace anonymous classes with lambda expressions in Java code

**Before**
```java
 new Thread(new Runnable() {
    Override
    public void run() {
      // run thread
    }
  });
```

**After**

```java
new Thread(() -> {
    // run thread
  });
```

### Why are the changes needed?
Code Simpilefications.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

- Pass the Jenkins or GitHub Action
- Manually test `JavaUserDefinedScalar` with command
   `bin/spark-submit run-example org.apache.spark.examples.sql.JavaUserDefinedScalar` passed

Closes #33635 from LuciferYang/lambda.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
yangjie01 2021-08-09 19:28:31 +09:00 committed by Hyukjin Kwon
parent 9a539d5846
commit 900908b9be
7 changed files with 66 additions and 122 deletions

View file

@ -49,12 +49,8 @@ public class JavaUserDefinedScalar {
// +-------+ // +-------+
// Define and register a one-argument UDF // Define and register a one-argument UDF
spark.udf().register("plusOne", new UDF1<Integer, Integer>() { spark.udf().register("plusOne",
@Override (UDF1<Integer, Integer>) x -> x + 1, DataTypes.IntegerType);
public Integer call(Integer x) {
return x + 1;
}
}, DataTypes.IntegerType);
spark.sql("SELECT plusOne(5)").show(); spark.sql("SELECT plusOne(5)").show();
// +----------+ // +----------+
// |plusOne(5)| // |plusOne(5)|
@ -75,12 +71,8 @@ public class JavaUserDefinedScalar {
// +------------+ // +------------+
// UDF in a WHERE clause // UDF in a WHERE clause
spark.udf().register("oneArgFilter", new UDF1<Long, Boolean>() { spark.udf().register("oneArgFilter",
@Override (UDF1<Long, Boolean>) x -> x > 5, DataTypes.BooleanType);
public Boolean call(Long x) {
return x > 5;
}
}, DataTypes.BooleanType);
spark.range(1, 10).createOrReplaceTempView("test"); spark.range(1, 10).createOrReplaceTempView("test");
spark.sql("SELECT * FROM test WHERE oneArgFilter(id)").show(); spark.sql("SELECT * FROM test WHERE oneArgFilter(id)").show();
// +---+ // +---+

View file

@ -91,30 +91,20 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream( JavaInputDStream<ConsumerRecord<String, String>> istream1 = KafkaUtils.createDirectStream(
ssc, ssc,
LocationStrategies.PreferConsistent(), LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic1), kafkaParams) ConsumerStrategies.Subscribe(Arrays.asList(topic1), kafkaParams)
); );
JavaDStream<String> stream1 = istream1.transform( JavaDStream<String> stream1 = istream1.transform(
// Make sure you can get offset ranges from the rdd // Make sure you can get offset ranges from the rdd
new Function<JavaRDD<ConsumerRecord<String, String>>, (Function<JavaRDD<ConsumerRecord<String, String>>,
JavaRDD<ConsumerRecord<String, String>>>() { JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
@Override OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
public JavaRDD<ConsumerRecord<String, String>> call( offsetRanges.set(offsets);
JavaRDD<ConsumerRecord<String, String>> rdd Assert.assertEquals(topic1, offsets[0].topic());
) { return rdd;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); }
offsetRanges.set(offsets);
Assert.assertEquals(topic1, offsets[0].topic());
return rdd;
}
}
).map( ).map(
new Function<ConsumerRecord<String, String>, String>() { (Function<ConsumerRecord<String, String>, String>) ConsumerRecord::value
@Override
public String call(ConsumerRecord<String, String> r) {
return r.value();
}
}
); );
final Map<String, Object> kafkaParams2 = new HashMap<>(kafkaParams); final Map<String, Object> kafkaParams2 = new HashMap<>(kafkaParams);
@ -124,41 +114,26 @@ public class JavaDirectKafkaStreamSuite implements Serializable {
JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream( JavaInputDStream<ConsumerRecord<String, String>> istream2 = KafkaUtils.createDirectStream(
ssc, ssc,
LocationStrategies.PreferConsistent(), LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topic2), kafkaParams2) ConsumerStrategies.Subscribe(Arrays.asList(topic2), kafkaParams2)
); );
JavaDStream<String> stream2 = istream2.transform( JavaDStream<String> stream2 = istream2.transform(
// Make sure you can get offset ranges from the rdd // Make sure you can get offset ranges from the rdd
new Function<JavaRDD<ConsumerRecord<String, String>>, (Function<JavaRDD<ConsumerRecord<String, String>>,
JavaRDD<ConsumerRecord<String, String>>>() { JavaRDD<ConsumerRecord<String, String>>>) rdd -> {
@Override OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
public JavaRDD<ConsumerRecord<String, String>> call( offsetRanges.set(offsets);
JavaRDD<ConsumerRecord<String, String>> rdd Assert.assertEquals(topic2, offsets[0].topic());
) { return rdd;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); }
offsetRanges.set(offsets);
Assert.assertEquals(topic2, offsets[0].topic());
return rdd;
}
}
).map( ).map(
new Function<ConsumerRecord<String, String>, String>() { (Function<ConsumerRecord<String, String>, String>) ConsumerRecord::value
@Override
public String call(ConsumerRecord<String, String> r) {
return r.value();
}
}
); );
JavaDStream<String> unifiedStream = stream1.union(stream2); JavaDStream<String> unifiedStream = stream1.union(stream2);
final Set<String> result = Collections.synchronizedSet(new HashSet<>()); final Set<String> result = Collections.synchronizedSet(new HashSet<>());
unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { unifiedStream.foreachRDD((VoidFunction<JavaRDD<String>>) rdd -> result.addAll(rdd.collect())
@Override
public void call(JavaRDD<String> rdd) {
result.addAll(rdd.collect());
}
}
); );
ssc.start(); ssc.start();
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();

View file

@ -89,13 +89,7 @@ public class JavaKafkaRDDSuite implements Serializable {
leaders.put(offsetRanges[0].topicPartition(), broker); leaders.put(offsetRanges[0].topicPartition(), broker);
leaders.put(offsetRanges[1].topicPartition(), broker); leaders.put(offsetRanges[1].topicPartition(), broker);
Function<ConsumerRecord<String, String>, String> handler = Function<ConsumerRecord<String, String>, String> handler = ConsumerRecord::value;
new Function<ConsumerRecord<String, String>, String>() {
@Override
public String call(ConsumerRecord<String, String> r) {
return r.value();
}
};
JavaRDD<String> rdd1 = KafkaUtils.<String, String>createRDD( JavaRDD<String> rdd1 = KafkaUtils.<String, String>createRDD(
sc, sc,

View file

@ -58,10 +58,7 @@ public class JavaDataStreamReaderWriterSuite {
.readStream() .readStream()
.textFile(input) .textFile(input)
.writeStream() .writeStream()
.foreachBatch(new VoidFunction2<Dataset<String>, Long>() { .foreachBatch((VoidFunction2<Dataset<String>, Long>) (v1, v2) -> {})
@Override
public void call(Dataset<String> v1, Long v2) throws Exception {}
})
.start(); .start();
query.stop(); query.stop();
} }

View file

@ -42,18 +42,16 @@ public class TSubjectAssumingTransport extends TFilterTransport {
try { try {
AccessControlContext context = AccessController.getContext(); AccessControlContext context = AccessController.getContext();
Subject subject = Subject.getSubject(context); Subject subject = Subject.getSubject(context);
Subject.doAs(subject, new PrivilegedExceptionAction<Void>() { Subject.doAs(subject, (PrivilegedExceptionAction<Void>) () -> {
public Void run() { try {
try { wrapped.open();
wrapped.open(); } catch (TTransportException tte) {
} catch (TTransportException tte) { // Wrap the transport exception in an RTE, since Subject.doAs() then goes
// Wrap the transport exception in an RTE, since Subject.doAs() then goes // and unwraps this for us out of the doAs block. We then unwrap one
// and unwraps this for us out of the doAs block. We then unwrap one // more time in our catch clause to get back the TTE. (ugh)
// more time in our catch clause to get back the TTE. (ugh) throw new RuntimeException(tte);
throw new RuntimeException(tte);
}
return null;
} }
return null;
}); });
} catch (PrivilegedActionException ioe) { } catch (PrivilegedActionException ioe) {
throw new RuntimeException("Received an ioe we never threw!", ioe); throw new RuntimeException("Received an ioe we never threw!", ioe);

View file

@ -191,45 +191,39 @@ public class SQLOperation extends ExecuteStatementOperation {
final UserGroupInformation currentUGI = getCurrentUGI(opConfig); final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
// Runnable impl to call runInternal asynchronously, // Runnable impl to call runInternal asynchronously,
// from a different thread // from a different thread
Runnable backgroundOperation = new Runnable() { Runnable backgroundOperation = () -> {
@Override PrivilegedExceptionAction<Object> doAsAction = () -> {
public void run() { Hive.set(parentHive);
PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() { SessionState.setCurrentSessionState(parentSessionState);
@Override // Set current OperationLog in this async thread for keeping on saving query log.
public Object run() throws HiveSQLException { registerCurrentOperationLog();
Hive.set(parentHive);
SessionState.setCurrentSessionState(parentSessionState);
// Set current OperationLog in this async thread for keeping on saving query log.
registerCurrentOperationLog();
try {
runQuery(opConfig);
} catch (HiveSQLException e) {
setOperationException(e);
LOG.error("Error running hive query: ", e);
} finally {
unregisterOperationLog();
}
return null;
}
};
try { try {
currentUGI.doAs(doAsAction); runQuery(opConfig);
} catch (Exception e) { } catch (HiveSQLException e) {
setOperationException(new HiveSQLException(e)); setOperationException(e);
LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); LOG.error("Error running hive query: ", e);
} finally {
unregisterOperationLog();
} }
finally { return null;
/** };
* We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
* when this thread is garbage collected later. try {
* @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() currentUGI.doAs(doAsAction);
*/ } catch (Exception e) {
if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { setOperationException(new HiveSQLException(e));
ThreadWithGarbageCleanup currentThread = LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
(ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); }
currentThread.cacheThreadLocalRawStore(); finally {
} /**
* We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
* when this thread is garbage collected later.
* @see ThreadWithGarbageCleanup#finalize()
*/
if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
ThreadWithGarbageCleanup currentThread =
(ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
currentThread.cacheThreadLocalRawStore();
} }
} }
}; };

View file

@ -55,13 +55,7 @@ public class HiveSessionProxy implements InvocationHandler {
if (method.getDeclaringClass() == HiveSessionBase.class) { if (method.getDeclaringClass() == HiveSessionBase.class) {
return invoke(method, args); return invoke(method, args);
} }
return ugi.doAs( return ugi.doAs((PrivilegedExceptionAction<Object>) () -> invoke(method, args));
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws HiveSQLException {
return invoke(method, args);
}
});
} catch (UndeclaredThrowableException e) { } catch (UndeclaredThrowableException e) {
Throwable innerException = e.getCause(); Throwable innerException = e.getCause();
if (innerException instanceof PrivilegedActionException) { if (innerException instanceof PrivilegedActionException) {