Merge "Drop Felix Gogo"
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / sal / ConcurrentSalRegistrationManagerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.sal;
10
11 import com.google.common.util.concurrent.ListeningExecutorService;
12 import java.math.BigInteger;
13 import java.net.InetSocketAddress;
14 import java.util.ArrayList;
15 import java.util.Collections;
16 import java.util.List;
17 import java.util.concurrent.ArrayBlockingQueue;
18 import java.util.concurrent.BlockingQueue;
19 import java.util.concurrent.Callable;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.TimeUnit;
23 import java.util.concurrent.TimeoutException;
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.junit.runner.RunWith;
29 import org.mockito.Matchers;
30 import org.mockito.Mock;
31 import org.mockito.Mockito;
32 import org.mockito.invocation.InvocationOnMock;
33 import org.mockito.runners.MockitoJUnitRunner;
34 import org.mockito.stubbing.Answer;
35 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
36 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
37 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
38 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
39 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
40 import org.opendaylight.openflowplugin.api.OFConstants;
41 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
42 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
43 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
44 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
45 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
46 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
47 import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
48 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
52 import org.opendaylight.yangtools.yang.binding.RpcService;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 /**
57  * Created by Martin Bobak mbobak@cisco.com on 9/22/14.
58  */
59 @RunWith(MockitoJUnitRunner.class)
60 public class ConcurrentSalRegistrationManagerTest {
61
62
63     /** registration related action must end within this amount of seconds */
64     private static final int REGISTRATION_ACTION_TIMEOUT = 5;
65     protected SalRegistrationManager registrationManager;
66     protected static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class);
67     protected static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF();
68
69     private static final long THREAD_SLEEP_MILLIS = 100;
70     private static final String DELAYED_THREAD = "DELAYED_THREAD";
71     private static final String NO_DELAY_THREAD = "NO_DELAY_THREAD";
72
73     private ThreadPoolCollectingExecutor taskExecutor;
74
75     @Mock
76     protected SessionContext context;
77     @Mock
78     private ConnectionConductor connectionConductor;
79     @Mock
80     private ListeningExecutorService rpcPool;
81     @Mock
82     private NotificationProviderService notificationProviderService;
83     @Mock
84     private RpcProviderRegistry rpcProviderRegistry;
85     @Mock
86     private DataBroker dataBroker;
87     @Mock
88     private NotificationEnqueuer notificationEnqueuer;
89     @Mock
90     private ConnectionAdapter connectionAdapter;
91
92     private GetFeaturesOutput features;
93
94     /**
95      * prepare surrounding objects
96      */
97     @Before
98     public void setUp() {
99         final ConvertorManager convertorManager = ConvertorManagerFactory.createDefaultManager();
100         registrationManager = new SalRegistrationManager(convertorManager);
101         SWITCH_SESSION_KEY_OF.setDatapathId(BigInteger.ONE);
102         Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
103
104         // features mockery
105         features = new GetFeaturesOutputBuilder()
106         .setVersion(OFConstants.OFP_VERSION_1_3)
107         .setDatapathId(BigInteger.valueOf(42))
108         .setCapabilities(new Capabilities(true, true, true, true, true, true, true))
109         .build();
110         Mockito.when(context.getFeatures()).thenReturn(features);
111
112         Mockito.when(context.getPrimaryConductor()).thenReturn(connectionConductor);
113         Mockito.when(context.getSessionKey()).thenReturn(SWITCH_SESSION_KEY_OF);
114         Mockito.when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
115
116         // provider context - registration responder
117         Mockito.when(rpcProviderRegistry.addRoutedRpcImplementation(Matchers.<Class<RpcService>>any(), Matchers.any(RpcService.class)))
118         .then(new Answer<RoutedRpcRegistration<?>>() {
119             @Override
120             public RoutedRpcRegistration<?> answer(InvocationOnMock invocation) {
121                 if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
122                     try {
123                         LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS/10));
124                         Thread.sleep(THREAD_SLEEP_MILLIS);
125                     } catch (InterruptedException e) {
126                         LOG.error("delaying of worker thread [{}] failed.", Thread.currentThread().getName(), e);
127                     }
128                 }
129
130                 Object[] args = invocation.getArguments();
131                 RoutedRpcRegistration<RpcService> registration = Mockito.mock(RoutedRpcRegistration.class);
132                 Mockito.when(registration.getInstance()).thenReturn((RpcService) args[1]);
133
134                 return registration;
135             }
136         });
137
138         Mockito.when(connectionConductor.getConnectionAdapter()).thenReturn(connectionAdapter);
139         Mockito.when(connectionAdapter.getRemoteAddress()).thenReturn(new InetSocketAddress("10.1.2.3", 4242));
140
141         taskExecutor = new ThreadPoolCollectingExecutor(
142                 2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2), "junit");
143
144         registrationManager.setRpcProviderRegistry(rpcProviderRegistry);
145         registrationManager.setDataService(dataBroker);
146         registrationManager.setPublishService(notificationProviderService);
147         registrationManager.init();
148         OFSessionUtil.getSessionManager().setRpcPool(rpcPool);
149     }
150
151     /**
152      * clean up
153      * @throws InterruptedException
154      */
155     @After
156     public void tearDown() throws InterruptedException {
157         taskExecutor.shutdown();
158         taskExecutor.awaitTermination(1, TimeUnit.SECONDS);
159         if (!taskExecutor.isTerminated()) {
160             taskExecutor.shutdownNow();
161         }
162         LOG.info("All tasks have finished.");
163
164         LOG.info("amount of scheduled threads: {}, exited threads: {}, failed threads: {}",
165                 taskExecutor.getTaskCount(), taskExecutor.getThreadExitCounter(), taskExecutor.getFailLogbook().size());
166         for (String exitStatus : taskExecutor.getFailLogbook()) {
167             LOG.debug(exitStatus);
168         }
169
170         OFSessionUtil.releaseSessionManager();
171         Assert.assertTrue("there should not be any failed threads in the pool", taskExecutor.getFailLogbook().isEmpty());
172         Assert.assertTrue("there should not be any living thread in the pool", taskExecutor.getActiveCount() == 0);
173     }
174
175     /**
176      * Test method which verifies that session could not be invalidated while in creation.
177      * @throws InterruptedException
178      * @throws TimeoutException
179      * @throws ExecutionException
180      */
181     @Test
182     public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException, TimeoutException {
183         // run registrations
184         Callable<Void> delayedThread = new Callable<Void>() {
185             @Override
186             public Void call() {
187                 LOG.info("Delayed session adding thread started.");
188                 Thread.currentThread().setName(DELAYED_THREAD);
189                 OFSessionUtil.getSessionManager().addSessionContext(SWITCH_SESSION_KEY_OF, context);
190                 LOG.info("Delayed session adding thread finished.");
191                 return null;
192             }
193         };
194
195         Callable<Void> noDelayThread = new Callable<Void>() {
196             @Override
197             public Void call() {
198                 LOG.info("Session removing thread started.");
199                 Thread.currentThread().setName(NO_DELAY_THREAD);
200                 OFSessionUtil.getSessionManager().invalidateSessionContext(SWITCH_SESSION_KEY_OF);
201                 LOG.info("Session removing thread finished.");
202                 return null;
203             }
204         };
205
206         Future<Void> addSessionResult = taskExecutor.submit(delayedThread);
207         Future<Void> removeSessionResult = taskExecutor.submit(noDelayThread);
208
209         addSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
210         removeSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
211     }
212
213     private static class ThreadPoolCollectingExecutor extends ThreadPoolLoggingExecutor {
214
215         private List<String> failLogbook;
216         private int threadExitCounter = 0;
217
218         /**
219          * @param corePoolSize
220          * @param maximumPoolSize
221          * @param keepAliveTime
222          * @param unit
223          * @param workQueue
224          * @param poolName
225          */
226         public ThreadPoolCollectingExecutor(int corePoolSize,
227                 int maximumPoolSize, long keepAliveTime, TimeUnit unit,
228                 BlockingQueue<Runnable> workQueue, String poolName) {
229             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName);
230
231             failLogbook = Collections.synchronizedList(new ArrayList<String>());
232         }
233
234         @Override
235         protected void afterExecute(Runnable r, Throwable t) {
236             super.afterExecute(r, t);
237             threadExitCounter ++;
238
239             if (t != null) {
240                 failLogbook.add("job ["+r+"] exited with throwable:" + t.getMessage());
241             }
242         }
243
244         /**
245          * @return the chronicles
246          */
247         public List<String> getFailLogbook() {
248             return failLogbook;
249         }
250
251         /**
252          * @return the threadExitCounter
253          */
254         public int getThreadExitCounter() {
255             return threadExitCounter;
256         }
257     }
258 }