BUG-1254: fix concurrent add/remove session test
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / sal / ConcurrentSalRegistrationManagerTest.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.openflow.md.core.sal;
10
11 import java.math.BigInteger;
12 import java.net.InetSocketAddress;
13 import java.util.ArrayList;
14 import java.util.Collections;
15 import java.util.List;
16 import java.util.concurrent.ArrayBlockingQueue;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.Callable;
19 import java.util.concurrent.ExecutionException;
20 import java.util.concurrent.Future;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23
24 import org.junit.After;
25 import org.junit.Assert;
26 import org.junit.Before;
27 import org.junit.Test;
28 import org.junit.runner.RunWith;
29 import org.mockito.Matchers;
30 import org.mockito.Mock;
31 import org.mockito.Mockito;
32 import org.mockito.invocation.InvocationOnMock;
33 import org.mockito.runners.MockitoJUnitRunner;
34 import org.mockito.stubbing.Answer;
35 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
36 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
37 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
38 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
39 import org.opendaylight.openflowplugin.api.OFConstants;
40 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
41 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
42 import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
43 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
44 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
45 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
49 import org.opendaylight.yangtools.yang.binding.RpcService;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import com.google.common.util.concurrent.ListeningExecutorService;
54
55 /**
56  * Created by Martin Bobak mbobak@cisco.com on 9/22/14.
57  */
58 @RunWith(MockitoJUnitRunner.class)
59 public class ConcurrentSalRegistrationManagerTest {
60
61
62     /** registration related action must end within this amount of seconds */
63     private static final int REGISTRATION_ACTION_TIMEOUT = 5;
64     protected static final SalRegistrationManager registrationManager = new SalRegistrationManager();
65     protected static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class);
66     protected static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF();
67
68     private static final long THREAD_SLEEP_MILLIS = 100;
69     private static final String DELAYED_THREAD = "DELAYED_THREAD";
70     private static final String NO_DELAY_THREAD = "NO_DELAY_THREAD";
71
72     private ThreadPoolCollectingExecutor taskExecutor;
73
74     @Mock
75     protected SessionContext context;
76     @Mock
77     private ConnectionConductor connectionConductor;
78     @Mock
79     private ListeningExecutorService rpcPool; 
80     @Mock
81     private NotificationProviderService notificationProviderService; 
82     @Mock
83     private ProviderContext providerContext;
84     @Mock
85     private NotificationEnqueuer notificationEnqueuer;
86     @Mock
87     private ConnectionAdapter connectionAdapter;
88
89     private GetFeaturesOutput features;
90
91     /**
92      * prepare surrounding objects
93      */
94     @Before
95     public void setUp() {
96         SWITCH_SESSION_KEY_OF.setDatapathId(BigInteger.ONE);
97         Mockito.when(providerContext.getSALService(NotificationProviderService.class)).thenReturn(notificationProviderService);
98         Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
99         
100         // features mockery
101         features = new GetFeaturesOutputBuilder()
102         .setVersion(OFConstants.OFP_VERSION_1_3)
103         .setDatapathId(BigInteger.valueOf(42))
104         .setCapabilities(new Capabilities(true, true, true, true, true, true, true))
105         .build();
106         Mockito.when(context.getFeatures()).thenReturn(features);
107         
108         Mockito.when(context.getPrimaryConductor()).thenReturn(connectionConductor);
109         Mockito.when(context.getSessionKey()).thenReturn(SWITCH_SESSION_KEY_OF);
110         Mockito.when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
111         
112         // provider context - registration responder
113         Mockito.when(providerContext.addRoutedRpcImplementation(Matchers.any(Class.class), Matchers.any(RpcService.class)))
114         .then(new Answer<RoutedRpcRegistration<?>>() {
115             @Override
116             public RoutedRpcRegistration<?> answer(InvocationOnMock invocation) {
117                 if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
118                     try {
119                         LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS/10));
120                         Thread.sleep(THREAD_SLEEP_MILLIS);
121                     } catch (InterruptedException e) {
122                         LOG.error("delaying of worker thread [{}] failed.", Thread.currentThread().getName(), e);
123                     }
124                 }
125                 
126                 Object[] args = invocation.getArguments();
127                 RoutedRpcRegistration<RpcService> registration = Mockito.mock(RoutedRpcRegistration.class);
128                 Mockito.when(registration.getInstance()).thenReturn((RpcService) args[1]);
129                 
130                 return registration;
131             }
132         });
133         
134         Mockito.when(connectionConductor.getConnectionAdapter()).thenReturn(connectionAdapter);
135         Mockito.when(connectionAdapter.getRemoteAddress()).thenReturn(new InetSocketAddress("10.1.2.3", 4242));
136         
137         taskExecutor = new ThreadPoolCollectingExecutor(
138                 2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2), "junit");
139         
140         registrationManager.onSessionInitiated(providerContext);
141         OFSessionUtil.getSessionManager().setRpcPool(rpcPool);
142     }
143     
144     /**
145      * clean up
146      * @throws InterruptedException 
147      */
148     @After
149     public void tearDown() throws InterruptedException {
150         taskExecutor.shutdown();
151         taskExecutor.awaitTermination(1, TimeUnit.SECONDS);
152         if (!taskExecutor.isTerminated()) {
153            taskExecutor.shutdownNow(); 
154         }
155         LOG.info("All tasks have finished.");
156         
157         LOG.info("amount of scheduled threads: {}, exited threads: {}, failed threads: {}", 
158                 taskExecutor.getTaskCount(), taskExecutor.getThreadExitCounter(), taskExecutor.getFailLogbook().size());
159         for (String exitStatus : taskExecutor.getFailLogbook()) {
160             LOG.debug(exitStatus);
161         }
162         
163         OFSessionUtil.releaseSessionManager();
164         Assert.assertTrue("there should not be any failed threads in the pool", taskExecutor.getFailLogbook().isEmpty());
165         Assert.assertTrue("there should not be any living thread in the pool", taskExecutor.getActiveCount() == 0);
166     }
167
168     /**
169      * Test method which verifies that session could not be invalidated while in creation.
170      * @throws InterruptedException 
171      * @throws TimeoutException 
172      * @throws ExecutionException 
173      */
174     @Test
175     public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException, TimeoutException {
176         // run registrations
177         Callable<Void> delayedThread = new Callable<Void>() {
178             @Override
179             public Void call() {
180                 LOG.info("Delayed session adding thread started.");
181                 Thread.currentThread().setName(DELAYED_THREAD);
182                 OFSessionUtil.getSessionManager().addSessionContext(SWITCH_SESSION_KEY_OF, context);
183                 LOG.info("Delayed session adding thread finished.");
184                 return null;
185             }
186         };
187
188         Callable<Void> noDelayThread = new Callable<Void>() {
189             @Override
190             public Void call() {
191                 LOG.info("Session removing thread started.");
192                 Thread.currentThread().setName(NO_DELAY_THREAD);
193                 OFSessionUtil.getSessionManager().invalidateSessionContext(SWITCH_SESSION_KEY_OF);
194                 LOG.info("Session removing thread finished.");
195                 return null;
196             }
197         };
198         
199         Future<Void> addSessionResult = taskExecutor.submit(delayedThread);
200         Future<Void> removeSessionResult = taskExecutor.submit(noDelayThread);
201         
202         addSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
203         removeSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
204     }
205
206     private static class ThreadPoolCollectingExecutor extends ThreadPoolLoggingExecutor {
207         
208         private List<String> failLogbook;
209         private int threadExitCounter = 0;
210
211         /**
212          * @param corePoolSize
213          * @param maximumPoolSize
214          * @param keepAliveTime
215          * @param unit
216          * @param workQueue
217          * @param poolName
218          */
219         public ThreadPoolCollectingExecutor(int corePoolSize,
220                 int maximumPoolSize, long keepAliveTime, TimeUnit unit,
221                 BlockingQueue<Runnable> workQueue, String poolName) {
222             super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName);
223             
224             failLogbook = Collections.synchronizedList(new ArrayList<String>());
225         }
226         
227         @Override
228         protected void afterExecute(Runnable r, Throwable t) {
229             super.afterExecute(r, t);
230             threadExitCounter ++;
231             
232             if (t != null) {
233                 failLogbook.add("job ["+r+"] exited with throwable:" + t.getMessage());
234             }
235         }
236         
237         /**
238          * @return the chronicles
239          */
240         public List<String> getFailLogbook() {
241             return failLogbook;
242         }
243         
244         /**
245          * @return the threadExitCounter
246          */
247         public int getThreadExitCounter() {
248             return threadExitCounter;
249         }
250     }
251 }