2 * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.openflowplugin.openflow.md.core.sal;
11 import com.google.common.util.concurrent.ListeningExecutorService;
12 import java.math.BigInteger;
13 import java.net.InetSocketAddress;
14 import java.util.ArrayList;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.ArrayBlockingQueue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.junit.runner.RunWith;
29 import org.mockito.Matchers;
30 import org.mockito.Mock;
31 import org.mockito.Mockito;
32 import org.mockito.invocation.InvocationOnMock;
33 import org.mockito.runners.MockitoJUnitRunner;
34 import org.mockito.stubbing.Answer;
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
37 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
38 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
39 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
40 import org.opendaylight.openflowplugin.api.OFConstants;
41 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
42 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
43 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
44 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
45 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
46 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
50 import org.opendaylight.yangtools.yang.binding.RpcService;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
55 * Created by Martin Bobak mbobak@cisco.com on 9/22/14.
57 @RunWith(MockitoJUnitRunner.class)
58 public class ConcurrentSalRegistrationManagerTest {
61 /** registration related action must end within this amount of seconds */
62 private static final int REGISTRATION_ACTION_TIMEOUT = 5;
63 protected static final SalRegistrationManager registrationManager = new SalRegistrationManager();
64 protected static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class);
65 protected static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF();
67 private static final long THREAD_SLEEP_MILLIS = 100;
68 private static final String DELAYED_THREAD = "DELAYED_THREAD";
69 private static final String NO_DELAY_THREAD = "NO_DELAY_THREAD";
71 private ThreadPoolCollectingExecutor taskExecutor;
74 protected SessionContext context;
76 private ConnectionConductor connectionConductor;
78 private ListeningExecutorService rpcPool;
80 private NotificationProviderService notificationProviderService;
82 private RpcProviderRegistry rpcProviderRegistry;
84 private DataBroker dataBroker;
86 private NotificationEnqueuer notificationEnqueuer;
88 private ConnectionAdapter connectionAdapter;
90 private GetFeaturesOutput features;
93 * prepare surrounding objects
97 SWITCH_SESSION_KEY_OF.setDatapathId(BigInteger.ONE);
98 Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
101 features = new GetFeaturesOutputBuilder()
102 .setVersion(OFConstants.OFP_VERSION_1_3)
103 .setDatapathId(BigInteger.valueOf(42))
104 .setCapabilities(new Capabilities(true, true, true, true, true, true, true))
106 Mockito.when(context.getFeatures()).thenReturn(features);
108 Mockito.when(context.getPrimaryConductor()).thenReturn(connectionConductor);
109 Mockito.when(context.getSessionKey()).thenReturn(SWITCH_SESSION_KEY_OF);
110 Mockito.when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
112 // provider context - registration responder
113 Mockito.when(rpcProviderRegistry.addRoutedRpcImplementation(Matchers.any(Class.class), Matchers.any(RpcService.class)))
114 .then(new Answer<RoutedRpcRegistration<?>>() {
116 public RoutedRpcRegistration<?> answer(InvocationOnMock invocation) {
117 if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
119 LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS/10));
120 Thread.sleep(THREAD_SLEEP_MILLIS);
121 } catch (InterruptedException e) {
122 LOG.error("delaying of worker thread [{}] failed.", Thread.currentThread().getName(), e);
126 Object[] args = invocation.getArguments();
127 RoutedRpcRegistration<RpcService> registration = Mockito.mock(RoutedRpcRegistration.class);
128 Mockito.when(registration.getInstance()).thenReturn((RpcService) args[1]);
134 Mockito.when(connectionConductor.getConnectionAdapter()).thenReturn(connectionAdapter);
135 Mockito.when(connectionAdapter.getRemoteAddress()).thenReturn(new InetSocketAddress("10.1.2.3", 4242));
137 taskExecutor = new ThreadPoolCollectingExecutor(
138 2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2), "junit");
140 registrationManager.setRpcProviderRegistry(rpcProviderRegistry);
141 registrationManager.setDataService(dataBroker);
142 registrationManager.setPublishService(notificationProviderService);
143 registrationManager.init();
144 OFSessionUtil.getSessionManager().setRpcPool(rpcPool);
149 * @throws InterruptedException
152 public void tearDown() throws InterruptedException {
153 taskExecutor.shutdown();
154 taskExecutor.awaitTermination(1, TimeUnit.SECONDS);
155 if (!taskExecutor.isTerminated()) {
156 taskExecutor.shutdownNow();
158 LOG.info("All tasks have finished.");
160 LOG.info("amount of scheduled threads: {}, exited threads: {}, failed threads: {}",
161 taskExecutor.getTaskCount(), taskExecutor.getThreadExitCounter(), taskExecutor.getFailLogbook().size());
162 for (String exitStatus : taskExecutor.getFailLogbook()) {
163 LOG.debug(exitStatus);
166 OFSessionUtil.releaseSessionManager();
167 Assert.assertTrue("there should not be any failed threads in the pool", taskExecutor.getFailLogbook().isEmpty());
168 Assert.assertTrue("there should not be any living thread in the pool", taskExecutor.getActiveCount() == 0);
172 * Test method which verifies that session could not be invalidated while in creation.
173 * @throws InterruptedException
174 * @throws TimeoutException
175 * @throws ExecutionException
178 public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException, TimeoutException {
180 Callable<Void> delayedThread = new Callable<Void>() {
183 LOG.info("Delayed session adding thread started.");
184 Thread.currentThread().setName(DELAYED_THREAD);
185 OFSessionUtil.getSessionManager().addSessionContext(SWITCH_SESSION_KEY_OF, context);
186 LOG.info("Delayed session adding thread finished.");
191 Callable<Void> noDelayThread = new Callable<Void>() {
194 LOG.info("Session removing thread started.");
195 Thread.currentThread().setName(NO_DELAY_THREAD);
196 OFSessionUtil.getSessionManager().invalidateSessionContext(SWITCH_SESSION_KEY_OF);
197 LOG.info("Session removing thread finished.");
202 Future<Void> addSessionResult = taskExecutor.submit(delayedThread);
203 Future<Void> removeSessionResult = taskExecutor.submit(noDelayThread);
205 addSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
206 removeSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
209 private static class ThreadPoolCollectingExecutor extends ThreadPoolLoggingExecutor {
211 private List<String> failLogbook;
212 private int threadExitCounter = 0;
215 * @param corePoolSize
216 * @param maximumPoolSize
217 * @param keepAliveTime
222 public ThreadPoolCollectingExecutor(int corePoolSize,
223 int maximumPoolSize, long keepAliveTime, TimeUnit unit,
224 BlockingQueue<Runnable> workQueue, String poolName) {
225 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName);
227 failLogbook = Collections.synchronizedList(new ArrayList<String>());
231 protected void afterExecute(Runnable r, Throwable t) {
232 super.afterExecute(r, t);
233 threadExitCounter ++;
236 failLogbook.add("job ["+r+"] exited with throwable:" + t.getMessage());
241 * @return the chronicles
243 public List<String> getFailLogbook() {
248 * @return the threadExitCounter
250 public int getThreadExitCounter() {
251 return threadExitCounter;