From c1801fd6da4a2dd5f37dc366b92bede669e8fda0 Mon Sep 17 00:00:00 2001 From: wang-zhun Date: Fri, 8 May 2020 15:41:23 -0500 Subject: [PATCH] =?UTF-8?q?[SPARK-31235][FOLLOWUP][TESTS][TEST-HADOOP3.2]?= =?UTF-8?q?=20Fix=20test=20"specify=20a=20more=20specific=20type=20for=20t?= =?UTF-8?q?he=20ap=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Update the input parameters for instantiating `RMAppManager` and `ClientRMService` ### Why are the changes needed? For hadoop3.2, if `RMAppManager` is not created correctly, the following exception will occur: ``` java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:135) at org.apache.hadoop.yarn.security.YarnAuthorizationProvider.getInstance(YarnAuthorizationProvider.java:55) at org.apache.hadoop.yarn.server.resourcemanager.RMAppManager.(RMAppManager.java:117) ``` ### How was this patch tested? UTs Closes #28456 from wang-zhun/Fix-SPARK-31235. Authored-by: wang-zhun Signed-off-by: Thomas Graves --- .../spark/deploy/yarn/ClientSuite.scala | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index b335e7fc04..7611ccd4da 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.{ClientRMService, RMAppMana import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler import org.apache.hadoop.yarn.server.security.ApplicationACLsManager import org.apache.hadoop.yarn.util.Records import org.mockito.ArgumentMatchers.{any, anyBoolean, anyShort, eq => meq} @@ -222,11 +223,50 @@ class ClientSuite extends SparkFunSuite with Matchers { 3 -> ("SPARK-SQL", "SPARK-SQL"), 4 -> ("012345678901234567890123", "01234567890123456789")) + // Mock yarn submit application + val yarnClient = mock(classOf[YarnClient]) + val rmApps = new ConcurrentHashMap[ApplicationId, RMApp]() + val rmContext = mock(classOf[RMContext]) + when(rmContext.getRMApps).thenReturn(rmApps) + val dispatcher = mock(classOf[Dispatcher]) + when(rmContext.getDispatcher).thenReturn(dispatcher) + when[EventHandler[_]](dispatcher.getEventHandler).thenReturn( + new EventHandler[Event[_]] { + override def handle(event: Event[_]): Unit = {} + } + ) + val writer = mock(classOf[RMApplicationHistoryWriter]) + when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer) + val publisher = mock(classOf[SystemMetricsPublisher]) + when(rmContext.getSystemMetricsPublisher).thenReturn(publisher) + val yarnScheduler = mock(classOf[YarnScheduler]) + val rmAppManager = new RMAppManager(rmContext, + yarnScheduler, + null, + mock(classOf[ApplicationACLsManager]), + new Configuration()) + val clientRMService = new ClientRMService(rmContext, + yarnScheduler, + rmAppManager, + null, + null, + null) + clientRMService.init(new Configuration()) + when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => { + val subContext = invocationOnMock.getArguments()(0) + .asInstanceOf[ApplicationSubmissionContext] + val request = Records.newRecord(classOf[SubmitApplicationRequest]) + request.setApplicationSubmissionContext(subContext) + clientRMService.submitApplication(request) + null + }) + + // Spark submit application + val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext])) + when(appContext.getUnmanagedAM).thenReturn(true) for ((id, (sourceType, targetType)) <- appTypes) { val sparkConf = new SparkConf().set("spark.yarn.applicationType", sourceType) val args = new ClientArguments(Array()) - - val appContext = spy(Records.newRecord(classOf[ApplicationSubmissionContext])) val appId = ApplicationId.newInstance(123456, id) appContext.setApplicationId(appId) val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) @@ -237,48 +277,8 @@ class ClientSuite extends SparkFunSuite with Matchers { new YarnClientApplication(getNewApplicationResponse, appContext), containerLaunchContext) - val yarnClient = mock(classOf[YarnClient]) - when(yarnClient.submitApplication(any())).thenAnswer((invocationOnMock: InvocationOnMock) => { - val subContext = invocationOnMock.getArguments()(0) - .asInstanceOf[ApplicationSubmissionContext] - val request = Records.newRecord(classOf[SubmitApplicationRequest]) - request.setApplicationSubmissionContext(subContext) - - val rmContext = mock(classOf[RMContext]) - val conf = mock(classOf[Configuration]) - val map = new ConcurrentHashMap[ApplicationId, RMApp]() - when(rmContext.getRMApps).thenReturn(map) - val dispatcher = mock(classOf[Dispatcher]) - when(rmContext.getDispatcher).thenReturn(dispatcher) - when[EventHandler[_]](dispatcher.getEventHandler).thenReturn( - new EventHandler[Event[_]] { - override def handle(event: Event[_]): Unit = {} - } - ) - val writer = mock(classOf[RMApplicationHistoryWriter]) - when(rmContext.getRMApplicationHistoryWriter).thenReturn(writer) - val publisher = mock(classOf[SystemMetricsPublisher]) - when(rmContext.getSystemMetricsPublisher).thenReturn(publisher) - when(appContext.getUnmanagedAM).thenReturn(true) - - val rmAppManager = new RMAppManager(rmContext, - null, - null, - mock(classOf[ApplicationACLsManager]), - conf) - val clientRMService = new ClientRMService(rmContext, - null, - rmAppManager, - null, - null, - null) - clientRMService.submitApplication(request) - - assert(map.get(subContext.getApplicationId).getApplicationType === targetType) - null - }) - yarnClient.submitApplication(context) + assert(rmApps.get(appId).getApplicationType === targetType) } }