package org.opendaylight.openflowplugin.openflow.md.core.sal;
-import static java.lang.Thread.sleep;
-import static org.mockito.Mockito.when;
-
import java.math.BigInteger;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.MockitoAnnotations.Mock;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.BindingAwareService;
-import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
import org.opendaylight.openflowplugin.api.OFConstants;
import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
+import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
+import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.Path;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
import org.opendaylight.yangtools.yang.binding.RpcService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
/**
* Created by Martin Bobak mbobak@cisco.com on 9/22/14.
*/
public class ConcurrentSalRegistrationManagerTest {
- private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
- private static final SalRegistrationManager registrationManager = new SalRegistrationManager();
-
- private static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF();
- private static final MockProviderContext MOCK_PROVIDER_CONTEXT = new MockProviderContext();
- private final MockNotificationProviderService MOCK_NOTIFICATION_PROVIDER_SERVICE = new MockNotificationProviderService();
+ /** registration related action must end within this amount of seconds */
+ private static final int REGISTRATION_ACTION_TIMEOUT = 5;
+ protected static final SalRegistrationManager registrationManager = new SalRegistrationManager();
+ protected static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class);
+ protected static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF();
- private static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class);
- private static final long THREAD_SLEEP_MILLIS = 1000;
+ private static final long THREAD_SLEEP_MILLIS = 100;
private static final String DELAYED_THREAD = "DELAYED_THREAD";
private static final String NO_DELAY_THREAD = "NO_DELAY_THREAD";
- @Mock
- private SessionContext context;
+ private ThreadPoolCollectingExecutor taskExecutor;
@Mock
- private GetFeaturesOutput features;
-
+ protected SessionContext context;
@Mock
private ConnectionConductor connectionConductor;
+ @Mock
+ private ListeningExecutorService rpcPool;
+ @Mock
+ private NotificationProviderService notificationProviderService;
+ @Mock
+ private ProviderContext providerContext;
+ @Mock
+ private NotificationEnqueuer notificationEnqueuer;
+ @Mock
+ private ConnectionAdapter connectionAdapter;
+
+ private GetFeaturesOutput features;
+ /**
+ * prepare surrounding objects
+ */
@Before
- public void setupRegistrationManager() {
- registrationManager.onSessionInitiated(MOCK_PROVIDER_CONTEXT);
+ public void setUp() {
SWITCH_SESSION_KEY_OF.setDatapathId(BigInteger.ONE);
-
- when(context.getFeatures()).thenReturn(features);
- when(features.getDatapathId()).thenReturn(BigInteger.valueOf(42));
- when(context.getPrimaryConductor()).thenReturn(connectionConductor);
- when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+ Mockito.when(providerContext.getSALService(NotificationProviderService.class)).thenReturn(notificationProviderService);
+ Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
+
+ // features mockery
+ features = new GetFeaturesOutputBuilder()
+ .setVersion(OFConstants.OFP_VERSION_1_3)
+ .setDatapathId(BigInteger.valueOf(42))
+ .setCapabilities(new Capabilities(true, true, true, true, true, true, true))
+ .build();
+ Mockito.when(context.getFeatures()).thenReturn(features);
+
+ Mockito.when(context.getPrimaryConductor()).thenReturn(connectionConductor);
+ Mockito.when(context.getSessionKey()).thenReturn(SWITCH_SESSION_KEY_OF);
+ Mockito.when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+
+ // provider context - registration responder
+ Mockito.when(providerContext.addRoutedRpcImplementation(Matchers.any(Class.class), Matchers.any(RpcService.class)))
+ .then(new Answer<RoutedRpcRegistration<?>>() {
+ @Override
+ public RoutedRpcRegistration<?> answer(InvocationOnMock invocation) {
+ if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
+ try {
+ LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS/10));
+ Thread.sleep(THREAD_SLEEP_MILLIS);
+ } catch (InterruptedException e) {
+ LOG.error("delaying of worker thread [{}] failed.", Thread.currentThread().getName(), e);
+ }
+ }
+
+ Object[] args = invocation.getArguments();
+ RoutedRpcRegistration<RpcService> registration = Mockito.mock(RoutedRpcRegistration.class);
+ Mockito.when(registration.getInstance()).thenReturn((RpcService) args[1]);
+
+ return registration;
+ }
+ });
+
+ Mockito.when(connectionConductor.getConnectionAdapter()).thenReturn(connectionAdapter);
+ Mockito.when(connectionAdapter.getRemoteAddress()).thenReturn(new InetSocketAddress("10.1.2.3", 4242));
+
+ taskExecutor = new ThreadPoolCollectingExecutor(
+ 2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2), "junit");
+
+ registrationManager.onSessionInitiated(providerContext);
+ OFSessionUtil.getSessionManager().setRpcPool(rpcPool);
+ }
+
+ /**
+ * clean up
+ * @throws InterruptedException
+ */
+ @After
+ public void tearDown() throws InterruptedException {
+ taskExecutor.shutdown();
+ taskExecutor.awaitTermination(1, TimeUnit.SECONDS);
+ if (!taskExecutor.isTerminated()) {
+ taskExecutor.shutdownNow();
+ }
+ LOG.info("All tasks have finished.");
+
+ LOG.info("amount of scheduled threads: {}, exited threads: {}, failed threads: {}",
+ taskExecutor.getTaskCount(), taskExecutor.getThreadExitCounter(), taskExecutor.getFailLogbook().size());
+ for (String exitStatus : taskExecutor.getFailLogbook()) {
+ LOG.debug(exitStatus);
+ }
+
+ OFSessionUtil.releaseSessionManager();
+ Assert.assertTrue("there should not be any failed threads in the pool", taskExecutor.getFailLogbook().isEmpty());
+ Assert.assertTrue("there should not be any living thread in the pool", taskExecutor.getActiveCount() == 0);
}
- @Test
/**
* Test method which verifies that session could not be invalidated while in creation.
+ * @throws InterruptedException
+ * @throws TimeoutException
+ * @throws ExecutionException
*/
- public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException {
-
-
- Thread delayedThread = new Thread(new Runnable() {
+ @Test
+ public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException, TimeoutException {
+ // run registrations
+ Callable<Void> delayedThread = new Callable<Void>() {
@Override
- public void run() {
+ public Void call() {
LOG.info("Delayed session adding thread started.");
Thread.currentThread().setName(DELAYED_THREAD);
- registrationManager.setPublishService(MOCK_NOTIFICATION_PROVIDER_SERVICE);
- registrationManager.onSessionAdded(SWITCH_SESSION_KEY_OF, context);
+ OFSessionUtil.getSessionManager().addSessionContext(SWITCH_SESSION_KEY_OF, context);
LOG.info("Delayed session adding thread finished.");
+ return null;
}
- }
- );
- taskExecutor.execute(delayedThread);
+ };
- Thread noDelayThread = new Thread(new Runnable() {
+ Callable<Void> noDelayThread = new Callable<Void>() {
@Override
- public void run() {
+ public Void call() {
LOG.info("Session removing thread started.");
Thread.currentThread().setName(NO_DELAY_THREAD);
- registrationManager.setPublishService(MOCK_NOTIFICATION_PROVIDER_SERVICE);
- registrationManager.onSessionRemoved(context);
+ OFSessionUtil.getSessionManager().invalidateSessionContext(SWITCH_SESSION_KEY_OF);
LOG.info("Session removing thread finished.");
+ return null;
}
- }
- );
- taskExecutor.execute(noDelayThread);
- taskExecutor.shutdown();
- while (!taskExecutor.isTerminated()) {
- }
- LOG.info("All tasks have finished.");
+ };
+
+ Future<Void> addSessionResult = taskExecutor.submit(delayedThread);
+ Future<Void> removeSessionResult = taskExecutor.submit(noDelayThread);
+
+ addSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
+ removeSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
}
- private final class MockNotificationProviderService implements NotificationProviderService {
-
- @Override
- public void publish(Notification notification) {
- if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
- try {
- LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS));
- sleep(THREAD_SLEEP_MILLIS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+ private static class ThreadPoolCollectingExecutor extends ThreadPoolLoggingExecutor {
+
+ private List<String> failLogbook;
+ private int threadExitCounter = 0;
+
+ /**
+ * @param corePoolSize
+ * @param maximumPoolSize
+ * @param keepAliveTime
+ * @param unit
+ * @param workQueue
+ * @param poolName
+ */
+ public ThreadPoolCollectingExecutor(int corePoolSize,
+ int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue, String poolName) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName);
+
+ failLogbook = Collections.synchronizedList(new ArrayList<String>());
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ super.afterExecute(r, t);
+ threadExitCounter ++;
+
+ if (t != null) {
+ failLogbook.add("job ["+r+"] exited with throwable:" + t.getMessage());
}
}
-
- @Override
- public void publish(Notification notification, ExecutorService executorService) {
-
- }
-
- @Override
- public ListenerRegistration<NotificationInterestListener> registerInterestListener(NotificationInterestListener notificationInterestListener) {
- return null;
- }
-
- @Override
- public <T extends Notification> ListenerRegistration<NotificationListener<T>> registerNotificationListener(Class<T> tClass, NotificationListener<T> tNotificationListener) {
- return null;
- }
-
- @Override
- public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener notificationListener) {
- return null;
- }
- }
-
- private static final class MockProviderContext implements BindingAwareBroker.ProviderContext {
-
-
- @Override
- public void registerFunctionality(BindingAwareProvider.ProviderFunctionality functionality) {
-
- }
-
- @Override
- public void unregisterFunctionality(BindingAwareProvider.ProviderFunctionality functionality) {
-
- }
-
- @Override
- public <T extends BindingAwareService> T getSALService(Class<T> service) {
- return null;
- }
-
- @Override
- public <T extends RpcService> BindingAwareBroker.RpcRegistration<T> addRpcImplementation(Class<T> serviceInterface, T implementation) throws IllegalStateException {
- return null;
- }
-
- @Override
- public <T extends RpcService> BindingAwareBroker.RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface, T implementation) throws IllegalStateException {
- return new MockRpcRegistration(implementation);
- }
-
- @Override
- public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
- return null;
- }
-
- @Override
- public <T extends RpcService> T getRpcService(Class<T> serviceInterface) {
- return null;
- }
- }
-
- private static final class MockRpcRegistration implements BindingAwareBroker.RoutedRpcRegistration {
-
- private Object implementation;
-
- public MockRpcRegistration(Object instance) {
- this.implementation = instance;
-
- }
-
- @Override
- public void registerInstance(Class context, InstanceIdentifier instance) {
-
- }
-
- @Override
- public void unregisterInstance(Class context, InstanceIdentifier instance) {
-
+
+ /**
+ * @return the chronicles
+ */
+ public List<String> getFailLogbook() {
+ return failLogbook;
}
-
- @Override
- public Object getInstance() {
- return this.implementation;
- }
-
- @Override
- public void registerPath(Object context, Path path) {
-
- }
-
- @Override
- public void unregisterPath(Object context, Path path) {
-
- }
-
- @Override
- public Class getServiceType() {
- return null;
- }
-
- @Override
- public void close() {
-
+
+ /**
+ * @return the threadExitCounter
+ */
+ public int getThreadExitCounter() {
+ return threadExitCounter;
}
}
-
}