BUG-1254: fix concurrent add/remove session test 24/12024/5
authorMichal Rehak <mirehak@cisco.com>
Thu, 16 Oct 2014 10:54:00 +0000 (12:54 +0200)
committerMichal Rehak <mirehak@cisco.com>
Wed, 22 Oct 2014 11:57:22 +0000 (13:57 +0200)
 - replaced executor service with logging impl
 - mock tidy up
 - ipaddress mockery added
 - feature replaced for real object
 - introduced action waiting mechanism based on future results
 - added cleanUp of sessionManager singletons (SalRegistrationManagerTest)
 - extended close functionality of SalRegistrationManagerOFImpl
 - rebased

Change-Id: I2e3c26eb37b8f80f85a79b8fed63933eb62ccb96
Signed-off-by: Michal Rehak <mirehak@cisco.com>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManager.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionManagerOFImpl.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/ConcurrentSalRegistrationManagerTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/sal/SalRegistrationManagerTest.java

index 4050c368098a5ee698f6a8440a7f446c1af7558f..ee5e8e951c8014c1d64f751078ddbc4c4dccb6ae 100644 (file)
@@ -40,6 +40,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.N
 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.NodeKey;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.opendaylight.yangtools.concepts.CompositeObjectRegistration;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier.InstanceIdentifierBuilder;
 import org.slf4j.Logger;
@@ -60,6 +61,8 @@ public class SalRegistrationManager implements SessionListener, AutoCloseable {
 
     private SwitchFeaturesUtil swFeaturesUtil;
 
+    private ListenerRegistration<SessionListener> sessionListenerRegistration;
+
     public SalRegistrationManager() {
         swFeaturesUtil = SwitchFeaturesUtil.getInstance();
     }
@@ -82,7 +85,7 @@ public class SalRegistrationManager implements SessionListener, AutoCloseable {
         this.publishService = session.getSALService(NotificationProviderService.class);
         this.dataService = session.getSALService(DataBroker.class);
         // We register as listener for Session Manager
-        getSessionManager().registerSessionListener(this);
+        sessionListenerRegistration = getSessionManager().registerSessionListener(this);
         getSessionManager().setNotificationProviderService(publishService);
         getSessionManager().setDataBroker(dataService);
         LOG.debug("SalRegistrationManager initialized");
@@ -199,5 +202,8 @@ public class SalRegistrationManager implements SessionListener, AutoCloseable {
         dataService = null;
         providerContext = null;
         publishService = null;
+        if (sessionListenerRegistration != null) {
+            sessionListenerRegistration.close();
+        }
     }
 }
index d6b3579a6b1824fa80639611bb7a951102e25999..d83df7bc236c04f7f982f72e472139826b20ecfc 100644 (file)
@@ -9,12 +9,14 @@
 package org.opendaylight.openflowplugin.openflow.md.core.session;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
+
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
@@ -29,7 +31,7 @@ import org.opendaylight.openflowplugin.extension.api.core.extension.ExtensionCon
 import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.util.ListenerRegistry;
+import org.opendaylight.yangtools.util.ListenerRegistry;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
@@ -263,7 +265,6 @@ public class SessionManagerOFImpl implements ConjunctSessionManager {
     @Override
     public void close() {
         LOG.debug("close");
-        sessionListeners = null;
         synchronized (sessionLot) {
             for (SessionContext sessionContext : sessionLot.values()) {
                 sessionContext.getPrimaryConductor().disconnect();
@@ -271,6 +272,17 @@ public class SessionManagerOFImpl implements ConjunctSessionManager {
             // TODO: handle timeouted shutdown
             rpcPool.shutdown();
         }
+        
+        for (ListenerRegistration<SessionListener> listenerRegistration : sessionListeners) {
+            SessionListener listener = listenerRegistration.getInstance();
+            if (listener instanceof AutoCloseable) {
+                try {
+                    ((AutoCloseable) listener).close();
+                } catch (Exception e) {
+                    LOG.warn("closing of sessionListenerRegistration failed", e);
+                }
+            }
+        }
     }
 
     @Override
index 15daf02109f9cb8a7d474d28af2f18c8adb65b59..d59872c56270f09511ea1adc454df97bf2526937 100644 (file)
@@ -8,38 +8,50 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core.sal;
 
-import static java.lang.Thread.sleep;
-import static org.mockito.Mockito.when;
-
 import java.math.BigInteger;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.MockitoAnnotations.Mock;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
 import org.mockito.runners.MockitoJUnitRunner;
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
-import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
-import org.opendaylight.controller.sal.binding.api.BindingAwareService;
-import org.opendaylight.controller.sal.binding.api.NotificationListener;
+import org.mockito.stubbing.Answer;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.ProviderContext;
+import org.opendaylight.controller.sal.binding.api.BindingAwareBroker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.binding.api.NotificationProviderService;
-import org.opendaylight.controller.sal.binding.api.rpc.RpcContextIdentifier;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.api.OFConstants;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
+import org.opendaylight.openflowplugin.openflow.md.core.ThreadPoolLoggingExecutor;
+import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.concepts.Path;
-import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.binding.Notification;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
 import org.opendaylight.yangtools.yang.binding.RpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+
 /**
  * Created by Martin Bobak mbobak@cisco.com on 9/22/14.
  */
@@ -47,193 +59,193 @@ import org.slf4j.LoggerFactory;
 public class ConcurrentSalRegistrationManagerTest {
 
 
-    private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
-    private static final SalRegistrationManager registrationManager = new SalRegistrationManager();
-
-    private static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF();
-    private static final MockProviderContext MOCK_PROVIDER_CONTEXT = new MockProviderContext();
-    private final MockNotificationProviderService MOCK_NOTIFICATION_PROVIDER_SERVICE = new MockNotificationProviderService();
+    /** registration related action must end within this amount of seconds */
+    private static final int REGISTRATION_ACTION_TIMEOUT = 5;
+    protected static final SalRegistrationManager registrationManager = new SalRegistrationManager();
+    protected static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class);
+    protected static final SwitchSessionKeyOF SWITCH_SESSION_KEY_OF = new SwitchSessionKeyOF();
 
-    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentSalRegistrationManagerTest.class);
-    private static final long THREAD_SLEEP_MILLIS = 1000;
+    private static final long THREAD_SLEEP_MILLIS = 100;
     private static final String DELAYED_THREAD = "DELAYED_THREAD";
     private static final String NO_DELAY_THREAD = "NO_DELAY_THREAD";
 
-    @Mock
-    private SessionContext context;
+    private ThreadPoolCollectingExecutor taskExecutor;
 
     @Mock
-    private GetFeaturesOutput features;
-
+    protected SessionContext context;
     @Mock
     private ConnectionConductor connectionConductor;
+    @Mock
+    private ListeningExecutorService rpcPool; 
+    @Mock
+    private NotificationProviderService notificationProviderService; 
+    @Mock
+    private ProviderContext providerContext;
+    @Mock
+    private NotificationEnqueuer notificationEnqueuer;
+    @Mock
+    private ConnectionAdapter connectionAdapter;
+
+    private GetFeaturesOutput features;
 
+    /**
+     * prepare surrounding objects
+     */
     @Before
-    public void setupRegistrationManager() {
-        registrationManager.onSessionInitiated(MOCK_PROVIDER_CONTEXT);
+    public void setUp() {
         SWITCH_SESSION_KEY_OF.setDatapathId(BigInteger.ONE);
-
-        when(context.getFeatures()).thenReturn(features);
-        when(features.getDatapathId()).thenReturn(BigInteger.valueOf(42));
-        when(context.getPrimaryConductor()).thenReturn(connectionConductor);
-        when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+        Mockito.when(providerContext.getSALService(NotificationProviderService.class)).thenReturn(notificationProviderService);
+        Mockito.when(context.getNotificationEnqueuer()).thenReturn(notificationEnqueuer);
+        
+        // features mockery
+        features = new GetFeaturesOutputBuilder()
+        .setVersion(OFConstants.OFP_VERSION_1_3)
+        .setDatapathId(BigInteger.valueOf(42))
+        .setCapabilities(new Capabilities(true, true, true, true, true, true, true))
+        .build();
+        Mockito.when(context.getFeatures()).thenReturn(features);
+        
+        Mockito.when(context.getPrimaryConductor()).thenReturn(connectionConductor);
+        Mockito.when(context.getSessionKey()).thenReturn(SWITCH_SESSION_KEY_OF);
+        Mockito.when(connectionConductor.getVersion()).thenReturn(OFConstants.OFP_VERSION_1_3);
+        
+        // provider context - registration responder
+        Mockito.when(providerContext.addRoutedRpcImplementation(Matchers.any(Class.class), Matchers.any(RpcService.class)))
+        .then(new Answer<RoutedRpcRegistration<?>>() {
+            @Override
+            public RoutedRpcRegistration<?> answer(InvocationOnMock invocation) {
+                if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
+                    try {
+                        LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS/10));
+                        Thread.sleep(THREAD_SLEEP_MILLIS);
+                    } catch (InterruptedException e) {
+                        LOG.error("delaying of worker thread [{}] failed.", Thread.currentThread().getName(), e);
+                    }
+                }
+                
+                Object[] args = invocation.getArguments();
+                RoutedRpcRegistration<RpcService> registration = Mockito.mock(RoutedRpcRegistration.class);
+                Mockito.when(registration.getInstance()).thenReturn((RpcService) args[1]);
+                
+                return registration;
+            }
+        });
+        
+        Mockito.when(connectionConductor.getConnectionAdapter()).thenReturn(connectionAdapter);
+        Mockito.when(connectionAdapter.getRemoteAddress()).thenReturn(new InetSocketAddress("10.1.2.3", 4242));
+        
+        taskExecutor = new ThreadPoolCollectingExecutor(
+                2, 2, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(2), "junit");
+        
+        registrationManager.onSessionInitiated(providerContext);
+        OFSessionUtil.getSessionManager().setRpcPool(rpcPool);
+    }
+    
+    /**
+     * clean up
+     * @throws InterruptedException 
+     */
+    @After
+    public void tearDown() throws InterruptedException {
+        taskExecutor.shutdown();
+        taskExecutor.awaitTermination(1, TimeUnit.SECONDS);
+        if (!taskExecutor.isTerminated()) {
+           taskExecutor.shutdownNow(); 
+        }
+        LOG.info("All tasks have finished.");
+        
+        LOG.info("amount of scheduled threads: {}, exited threads: {}, failed threads: {}", 
+                taskExecutor.getTaskCount(), taskExecutor.getThreadExitCounter(), taskExecutor.getFailLogbook().size());
+        for (String exitStatus : taskExecutor.getFailLogbook()) {
+            LOG.debug(exitStatus);
+        }
+        
+        OFSessionUtil.releaseSessionManager();
+        Assert.assertTrue("there should not be any failed threads in the pool", taskExecutor.getFailLogbook().isEmpty());
+        Assert.assertTrue("there should not be any living thread in the pool", taskExecutor.getActiveCount() == 0);
     }
 
-    @Test
     /**
      * Test method which verifies that session could not be invalidated while in creation.
+     * @throws InterruptedException 
+     * @throws TimeoutException 
+     * @throws ExecutionException 
      */
-    public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException {
-
-
-        Thread delayedThread = new Thread(new Runnable() {
+    @Test
+    public void testConcurrentRemoveSessionContext() throws InterruptedException, ExecutionException, TimeoutException {
+        // run registrations
+        Callable<Void> delayedThread = new Callable<Void>() {
             @Override
-            public void run() {
+            public Void call() {
                 LOG.info("Delayed session adding thread started.");
                 Thread.currentThread().setName(DELAYED_THREAD);
-                registrationManager.setPublishService(MOCK_NOTIFICATION_PROVIDER_SERVICE);
-                registrationManager.onSessionAdded(SWITCH_SESSION_KEY_OF, context);
+                OFSessionUtil.getSessionManager().addSessionContext(SWITCH_SESSION_KEY_OF, context);
                 LOG.info("Delayed session adding thread finished.");
+                return null;
             }
-        }
-        );
-        taskExecutor.execute(delayedThread);
+        };
 
-        Thread noDelayThread = new Thread(new Runnable() {
+        Callable<Void> noDelayThread = new Callable<Void>() {
             @Override
-            public void run() {
+            public Void call() {
                 LOG.info("Session removing thread started.");
                 Thread.currentThread().setName(NO_DELAY_THREAD);
-                registrationManager.setPublishService(MOCK_NOTIFICATION_PROVIDER_SERVICE);
-                registrationManager.onSessionRemoved(context);
+                OFSessionUtil.getSessionManager().invalidateSessionContext(SWITCH_SESSION_KEY_OF);
                 LOG.info("Session removing thread finished.");
+                return null;
             }
-        }
-        );
-        taskExecutor.execute(noDelayThread);
-        taskExecutor.shutdown();
-        while (!taskExecutor.isTerminated()) {
-        }
-        LOG.info("All tasks have finished.");
+        };
+        
+        Future<Void> addSessionResult = taskExecutor.submit(delayedThread);
+        Future<Void> removeSessionResult = taskExecutor.submit(noDelayThread);
+        
+        addSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
+        removeSessionResult.get(REGISTRATION_ACTION_TIMEOUT, TimeUnit.SECONDS);
     }
 
-    private final class MockNotificationProviderService implements NotificationProviderService {
-
-        @Override
-        public void publish(Notification notification) {
-            if (Thread.currentThread().getName().equals(DELAYED_THREAD)) {
-                try {
-                    LOG.info(String.format("Will wait for %d millis", THREAD_SLEEP_MILLIS));
-                    sleep(THREAD_SLEEP_MILLIS);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
+    private static class ThreadPoolCollectingExecutor extends ThreadPoolLoggingExecutor {
+        
+        private List<String> failLogbook;
+        private int threadExitCounter = 0;
+
+        /**
+         * @param corePoolSize
+         * @param maximumPoolSize
+         * @param keepAliveTime
+         * @param unit
+         * @param workQueue
+         * @param poolName
+         */
+        public ThreadPoolCollectingExecutor(int corePoolSize,
+                int maximumPoolSize, long keepAliveTime, TimeUnit unit,
+                BlockingQueue<Runnable> workQueue, String poolName) {
+            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, poolName);
+            
+            failLogbook = Collections.synchronizedList(new ArrayList<String>());
+        }
+        
+        @Override
+        protected void afterExecute(Runnable r, Throwable t) {
+            super.afterExecute(r, t);
+            threadExitCounter ++;
+            
+            if (t != null) {
+                failLogbook.add("job ["+r+"] exited with throwable:" + t.getMessage());
             }
         }
-
-        @Override
-        public void publish(Notification notification, ExecutorService executorService) {
-
-        }
-
-        @Override
-        public ListenerRegistration<NotificationInterestListener> registerInterestListener(NotificationInterestListener notificationInterestListener) {
-            return null;
-        }
-
-        @Override
-        public <T extends Notification> ListenerRegistration<NotificationListener<T>> registerNotificationListener(Class<T> tClass, NotificationListener<T> tNotificationListener) {
-            return null;
-        }
-
-        @Override
-        public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener notificationListener) {
-            return null;
-        }
-    }
-
-    private static final class MockProviderContext implements BindingAwareBroker.ProviderContext {
-
-
-        @Override
-        public void registerFunctionality(BindingAwareProvider.ProviderFunctionality functionality) {
-
-        }
-
-        @Override
-        public void unregisterFunctionality(BindingAwareProvider.ProviderFunctionality functionality) {
-
-        }
-
-        @Override
-        public <T extends BindingAwareService> T getSALService(Class<T> service) {
-            return null;
-        }
-
-        @Override
-        public <T extends RpcService> BindingAwareBroker.RpcRegistration<T> addRpcImplementation(Class<T> serviceInterface, T implementation) throws IllegalStateException {
-            return null;
-        }
-
-        @Override
-        public <T extends RpcService> BindingAwareBroker.RoutedRpcRegistration<T> addRoutedRpcImplementation(Class<T> serviceInterface, T implementation) throws IllegalStateException {
-            return new MockRpcRegistration(implementation);
-        }
-
-        @Override
-        public <L extends RouteChangeListener<RpcContextIdentifier, InstanceIdentifier<?>>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
-            return null;
-        }
-
-        @Override
-        public <T extends RpcService> T getRpcService(Class<T> serviceInterface) {
-            return null;
-        }
-    }
-
-    private static final class MockRpcRegistration implements BindingAwareBroker.RoutedRpcRegistration {
-
-        private Object implementation;
-
-        public MockRpcRegistration(Object instance) {
-            this.implementation = instance;
-
-        }
-
-        @Override
-        public void registerInstance(Class context, InstanceIdentifier instance) {
-
-        }
-
-        @Override
-        public void unregisterInstance(Class context, InstanceIdentifier instance) {
-
+        
+        /**
+         * @return the chronicles
+         */
+        public List<String> getFailLogbook() {
+            return failLogbook;
         }
-
-        @Override
-        public Object getInstance() {
-            return this.implementation;
-        }
-
-        @Override
-        public void registerPath(Object context, Path path) {
-
-        }
-
-        @Override
-        public void unregisterPath(Object context, Path path) {
-
-        }
-
-        @Override
-        public Class getServiceType() {
-            return null;
-        }
-
-        @Override
-        public void close() {
-
+        
+        /**
+         * @return the threadExitCounter
+         */
+        public int getThreadExitCounter() {
+            return threadExitCounter;
         }
     }
-
 }
index cbd180342952d532d4483ba4384f8130e53916b9..9bc8fd9bac60b9ffa653ba1f3f59a8f931734922 100644 (file)
@@ -12,12 +12,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
 
 import java.math.BigInteger;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -35,6 +37,7 @@ import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.NotificationEnqueuer;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.api.openflow.md.core.session.IMessageDispatchService;
+import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.api.openflow.md.core.session.SwitchSessionKeyOF;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.UpdateFlowOutput;
@@ -74,13 +77,19 @@ public class SalRegistrationManagerTest {
     private BindingAwareBroker.ProviderContext providerContext;
     @Mock
     private NotificationEnqueuer notificationEnqueuer;
+    @Mock
+    private ListeningExecutorService rpcPool;
+    @Mock
+    private NotificationProviderService notificationProviderService;
 
     private ModelDrivenSwitch mdSwitchOF13;
 
     CompositeObjectRegistration<ModelDrivenSwitch> registration;
 
+
     @Before
     public void setUp() {
+        OFSessionUtil.getSessionManager().setRpcPool(rpcPool);
 
         Mockito.when(context.getPrimaryConductor()).thenReturn(conductor);
         Mockito.when(context.getMessageDispatchService()).thenReturn(messageDispatchService);
@@ -106,7 +115,15 @@ public class SalRegistrationManagerTest {
 
         salRegistrationManager = new SalRegistrationManager();
         salRegistrationManager.onSessionInitiated(providerContext);
-        salRegistrationManager.setPublishService(new MockNotificationProviderService());
+        salRegistrationManager.setPublishService(notificationProviderService);
+    }
+    
+    /**
+     * free sesion manager
+     */
+    @After
+    public void tearDown() {
+        OFSessionUtil.releaseSessionManager();
     }
 
     /**
@@ -163,35 +180,5 @@ public class SalRegistrationManagerTest {
         SwitchSessionKeyOF switchSessionKeyOF = new SwitchSessionKeyOF();
         salRegistrationManager.onSessionAdded(switchSessionKeyOF, context);
     }
-
-
-    private class MockNotificationProviderService implements NotificationProviderService {
-
-        @Override
-        public void publish(Notification notification) {
-
-        }
-
-        @Override
-        public void publish(Notification notification, ExecutorService executorService) {
-
-        }
-
-        @Override
-        public ListenerRegistration<NotificationInterestListener> registerInterestListener(NotificationInterestListener notificationInterestListener) {
-            return null;
-        }
-
-        @Override
-        public <T extends Notification> ListenerRegistration<NotificationListener<T>> registerNotificationListener(Class<T> tClass, NotificationListener<T> tNotificationListener) {
-            return null;
-        }
-
-        @Override
-        public ListenerRegistration<org.opendaylight.yangtools.yang.binding.NotificationListener> registerNotificationListener(org.opendaylight.yangtools.yang.binding.NotificationListener notificationListener) {
-            return null;
-        }
-    }
-
 }