+ Mockito.when(providerContext.getSALService(NotificationProviderService.class)).thenReturn(notificationProviderService);
+ Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
+
+ // features mockery
+ features = new GetFeaturesOutputBuilder()
+ .setVersion(OFConstants.OFP_VERSION_1_3)
+ .setDatapathId(BigInteger.valueOf(42))
+ .setCapabilities(new Capabilities(true, true, true, true, true, true, true))
+ .build();
+ Mockito.when(context.getFeatures()).thenReturn(features);
+
+ Mockito.when(context.getPrimaryConductor()).thenReturn(connectionConductor);
+ Mockito.when(context.getSessionKey()).thenReturn(SWITCH_SESSION_KEY_OF);
+ Mockito.when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+
+ // provider context - registration responder
+ Mockito.when(providerContext.addRoutedRpcImplementation(Matchers.any(Class.class), Matchers.any(RpcService.class)))
+ .then(new Answer<RoutedRpcRegistration<?>>() {
+ @Override
+ public RoutedRpcRegistration<?> answer(InvocationOnMock invocation) {
+ if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
+ try {
+ LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS/10));
+ Thread.sleep(THREAD_SLEEP_MILLIS);
+ } catch (InterruptedException e) {
+ LOG.error("delaying of worker thread [{}] failed.", Thread.currentThread().getName(), e);
+ }
+ }
+
+ Object[] args = invocation.getArguments();
+ RoutedRpcRegistration<RpcService> registration = Mockito.mock(RoutedRpcRegistration.class);
+ Mockito.when(registration.getInstance()).thenReturn((RpcService) args[1]);
+
+ return registration;
+ }
+ });
+
+ Mockito.when(connectionConductor.getConnectionAdapter()).thenReturn(connectionAdapter);
+ Mockito.when(connectionAdapter.getRemoteAddress()).thenReturn(new InetSocketAddress("10.1.2.3", 4242));
+
+ taskExecutor = new ThreadPoolCollectingExecutor(
+ 2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2), "junit");
+
+ registrationManager.onSessionInitiated(providerContext);
+ OFSessionUtil.getSessionManager().setRpcPool(rpcPool);
+ }
+
+ /**
+ * clean up
+ * @throws InterruptedException
+ */
+ @After
+ public void tearDown() throws InterruptedException {
+ taskExecutor.shutdown();
+ taskExecutor.awaitTermination(1, TimeUnit.SECONDS);
+ if (!taskExecutor.isTerminated()) {
+ taskExecutor.shutdownNow();
+ }
+ LOG.info("All tasks have finished.");
+
+ LOG.info("amount of scheduled threads: {}, exited threads: {}, failed threads: {}",
+ taskExecutor.getTaskCount(), taskExecutor.getThreadExitCounter(), taskExecutor.getFailLogbook().size());
+ for (String exitStatus : taskExecutor.getFailLogbook()) {
+ LOG.debug(exitStatus);
+ }
+
+ OFSessionUtil.releaseSessionManager();
+ Assert.assertTrue("there should not be any failed threads in the pool", taskExecutor.getFailLogbook().isEmpty());
+ Assert.assertTrue("there should not be any living thread in the pool", taskExecutor.getActiveCount() == 0);