Bug 5540 - Remove ConvertorManager singleton
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImplTest.java
index fbbeb3c0848008f95bf9ca960995f5d180078a79..8eb350f358900cfd1cc51805db5fde840b0b2160 100644 (file)
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 2013, 2014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -8,6 +8,8 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import static org.junit.Assert.assertNotNull;
+
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -15,30 +17,42 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
+import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
+import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
-import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.openflow.md.queue.PopListener;
-import org.opendaylight.openflowplugin.openflow.md.queue.QueueKeeperLightImpl;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
+import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeaturesV10;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortReason;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
@@ -46,39 +60,37 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
+import org.opendaylight.yangtools.yang.binding.DataContainer;
 import org.opendaylight.yangtools.yang.binding.DataObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
-/**
- * @author mirehak
- */
+@RunWith(MockitoJUnitRunner.class)
 public class ConnectionConductorImplTest {
 
     protected static final Logger LOG = LoggerFactory
             .getLogger(ConnectionConductorImplTest.class);
 
-    /** in [ms] */
+    /**
+     * in [ms]
+     */
     private final int maxProcessingTimeout = 500;
-    
+
     protected ConnectionAdapterStackImpl adapter;
     private ConnectionConductorImpl connectionConductor;
     private MDController controller;
     private Stack<SwitchTestEvent> eventPlan;
 
     private Thread libSimulation;
-    private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
+    private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
             8);
 
-    private QueueKeeperLightImpl queueKeeper;
+    protected QueueProcessorLightImpl queueProcessor;
 
-    private PopListener<DataObject> popListener;
+    private PopListenerCountingImpl<DataObject> popListener;
 
     private int experimenterMessageCounter;
     private int packetinMessageCounter;
@@ -88,8 +100,16 @@ public class ConnectionConductorImplTest {
     private int portstatusModifyMessageCounter;
     private int errorMessageCounter;
 
-    private ErrorHandlerQueueImpl errorHandler;
-    
+    @Mock
+    private ErrorHandlerSimpleImpl errorHandler;
+
+    private final int expectedErrors = 0;
+    @Mock
+    private MessageSpy<DataContainer> messageSpy;
+    @Mock
+    HandshakeContext handshakeContext;
+    private ErrorMessageService objErms;
+
     public void incrExperimenterMessageCounter() {
         this.experimenterMessageCounter++;
     }
@@ -118,6 +138,14 @@ public class ConnectionConductorImplTest {
         this.errorMessageCounter++;
     }
 
+    @Test
+    /**
+     * Test for ConnectionConductorFactory#createConductor
+     */
+    public void testCreateConductor() {
+        ConnectionConductor connectionConductor = ConnectionConductorFactory.createConductor(adapter, queueProcessor);
+        assertNotNull(connectionConductor);
+    }
 
     /**
      * @throws java.lang.Exception
@@ -125,28 +153,41 @@ public class ConnectionConductorImplTest {
     @Before
     public void setUp() throws Exception {
         adapter = new ConnectionAdapterStackImpl();
-        
+
         popListener = new PopListenerCountingImpl<>();
-        
-        queueKeeper = new QueueKeeperLightImpl();
-        queueKeeper.init();
-        queueKeeper.addPopListener(popListener);
-        
+
+        final ConvertorManager convertorManager = ConvertorManagerFactory.createDefaultManager();
+        controller = new MDController(convertorManager);
+        controller.init();
+        controller.getMessageTranslators().putAll(assembleTranslatorMapping());
+
+        queueProcessor = new QueueProcessorLightImpl();
+        queueProcessor.setMessageSpy(messageSpy);
+        queueProcessor.setPopListenersMapping(assemblePopListenerMapping());
+        queueProcessor.setTranslatorMapping(controller.getMessageTranslators());
+        queueProcessor.init();
+
         connectionConductor = new ConnectionConductorImpl(adapter);
-        connectionConductor.setQueueKeeper(queueKeeper);
-        connectionConductor.init();
-        errorHandler = new ErrorHandlerQueueImpl();
-        pool.execute(errorHandler);
+        connectionConductor.setQueueProcessor(queueProcessor);
         connectionConductor.setErrorHandler(errorHandler);
-        controller = new MDController();
-        controller.init();
-        queueKeeper.setTranslatorMapping(controller.getMessageTranslators());
+        connectionConductor.init();
+        connectionConductor.setHandshakeContext(handshakeContext);
         eventPlan = new Stack<>();
         adapter.setEventPlan(eventPlan);
         adapter.setProceedTimeout(5000L);
         adapter.checkListeners();
-        
-        controller.getMessageTranslators().putAll(assembleTranslatorMapping());
+    }
+
+    /**
+     * @return
+     */
+    private Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> assemblePopListenerMapping() {
+        Map<Class<? extends DataObject>, Collection<PopListener<DataObject>>> mapping = new HashMap<>();
+        Collection<PopListener<DataObject>> popListenerBag = new ArrayList<>();
+        popListenerBag.add(popListener);
+        //TODO: add testing registered types
+        mapping.put(DataObject.class, popListenerBag);
+        return mapping;
     }
 
     /**
@@ -157,9 +198,9 @@ public class ConnectionConductorImplTest {
         if (libSimulation != null) {
             libSimulation.join();
         }
-        queueKeeper.shutdown();
-        connectionConductor.shutdownPool();
-        
+        queueProcessor.shutdown();
+        connectionConductor.getHsPool().shutdown();
+
         for (Exception problem : adapter.getOccuredExceptions()) {
             LOG.error("during simulation on adapter side: "
                     + problem.getMessage());
@@ -170,24 +211,37 @@ public class ConnectionConductorImplTest {
             if (eventPlan.size() > 0) {
                 LOG.debug("eventPlan size: " + eventPlan.size());
                 for (SwitchTestEvent event : eventPlan) {
-                    LOG.debug(" # EVENT:: " + event.toString());
+                    LOG.debug(" # EVENT:: " + event);
                 }
             }
         }
         Assert.assertTrue("plan is not finished", eventPlan.isEmpty());
         eventPlan = null;
         controller = null;
-        errorHandler = null;
+
+        // logging errors if occurred
+        ArgumentCaptor<Throwable> errorCaptor = ArgumentCaptor.forClass(Throwable.class);
+        Mockito.verify(errorHandler, Mockito.atMost(1)).handleException(
+                errorCaptor.capture(), Matchers.any(SessionContext.class));
+        for (Throwable problem : errorCaptor.getAllValues()) {
+            LOG.warn(problem.getMessage(), problem);
+        }
+
+        Mockito.verify(errorHandler, Mockito.times(expectedErrors)).handleException(
+                Matchers.any(Throwable.class), Matchers.any(SessionContext.class));
     }
 
     /**
      * Test method for
      * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onEchoRequestMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage)}
      * .
+     *
      * @throws Exception
      */
     @Test
     public void testOnEchoRequestMessage() throws Exception {
+        simulateV13PostHandshakeState(connectionConductor);
+
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
                 EventFactory.DEFAULT_VERSION, new EchoRequestMessageBuilder()));
         eventPlan.add(0,
@@ -196,22 +250,48 @@ public class ConnectionConductorImplTest {
     }
 
     /**
-     * Test of handshake, covering version negotiation and features .
+     * Test of handshake, covering version negotiation and features.
+     * Switch delivers first helloMessage with default version.
+     *
      * @throws Exception
      */
     @Test
     public void testHandshake1() throws Exception {
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
                 EventFactory.DEFAULT_VERSION, new HelloMessageBuilder()));
         eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures")));
+                EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"),
+                EventFactory.createDefaultWaitForRpcEvent(44, "getFeatures")));
+        eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(44,
+                EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+
+        int i = 1;
+        executeNow();
+
+        Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
+                connectionConductor.getConductorState());
+        Assert.assertEquals((short) 0x04, connectionConductor.getVersion()
+                .shortValue());
+    }
+
+    /**
+     * Test of handshake, covering version negotiation and features.
+     * Controller sends first helloMessage with default version
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testHandshake1SwitchStarts() throws Exception {
+        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
+                EventFactory.DEFAULT_VERSION, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures"));
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(43,
                 EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
 
         executeNow();
-        
+
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
                 connectionConductor.getConductorState());
         Assert.assertEquals((short) 0x04, connectionConductor.getVersion()
@@ -219,17 +299,19 @@ public class ConnectionConductorImplTest {
     }
 
     /**
-     * Test of handshake, covering version negotiation and features .
+     * Test of handshake, covering version negotiation and features.
+     * Switch delivers first helloMessage with version 0x05
+     * and negotiates following versions: 0x03, 0x01
+     *
      * @throws Exception
      */
     @Test
     public void testHandshake2() throws Exception {
         connectionConductor.setBitmapNegotiationEnable(false);
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
                 (short) 0x05, new HelloMessageBuilder()));
         eventPlan.add(0,
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"));
+                EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(43L,
                 (short) 0x03, new HelloMessageBuilder()));
         eventPlan.add(0,
@@ -240,10 +322,10 @@ public class ConnectionConductorImplTest {
                 EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
 
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
-                EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+                (short) 0x01, getFeatureResponseMsg()));
 
         executeNow();
-        
+
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
                 connectionConductor.getConductorState());
         Assert.assertEquals((short) 0x01, connectionConductor.getVersion()
@@ -251,121 +333,66 @@ public class ConnectionConductorImplTest {
     }
 
     /**
-     * Test of handshake, covering version negotiation and features .
+     * Test of handshake, covering version negotiation and features.
+     * Controller sends first helloMessage with default version
+     * and switch negotiates following versions: 0x05, 0x03, 0x01
+     *
      * @throws Exception
      */
     @Test
-    public void testHandshake3() throws Exception {
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
-                (short) 0x00, new HelloMessageBuilder()));
-
-        executeNow();
-        Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.HANDSHAKING,
-                connectionConductor.getConductorState());
-        Assert.assertNull(connectionConductor.getVersion());
-    }
-
-    /**
-     * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage)}
-     * .
-     * @throws InterruptedException
-     */
-    @Test
-    public void testOnExperimenterMessage1() throws InterruptedException {
+    public void testHandshake2SwitchStarts() throws Exception {
+        connectionConductor.setBitmapNegotiationEnable(false);
+        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
         eventPlan.add(0,
-                EventFactory.createDefaultWaitForRpcEvent(42, "experimenter"));
-        ExperimenterMessageBuilder builder1 = new ExperimenterMessageBuilder();
-        builder1.setExperimenter(84L).setExpType(4L);
+                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"));
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
-                EventFactory.DEFAULT_VERSION, builder1));
-        
-        executeLater();
-
-        Runnable sendExperimenterCmd = new Runnable() {
-
-            @Override
-            public void run() {
-                ExperimenterInputBuilder builder2 = new ExperimenterInputBuilder();
-                builder2.setExperimenter(84L).setExpType(4L);
-                EventFactory.setupHeader(42L, builder2);
-                adapter.experimenter(builder2.build());
-            }
-        };
-        pool.schedule(sendExperimenterCmd,
-                ConnectionAdapterStackImpl.JOB_DELAY, TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage)}
-     * .
-     * @throws InterruptedException
-     */
-    @Test
-    public void testOnExperimenterMessage2() throws InterruptedException {
+                (short) 0x05, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(43L,
+                (short) 0x03, new HelloMessageBuilder()));
         eventPlan.add(0,
-                EventFactory.createDefaultWaitForRpcEvent(42, "experimenter"));
-        ErrorMessageBuilder builder1 = new ErrorMessageBuilder();
-        builder1.setType(ErrorType.BADREQUEST).setCode(3)
-                .setData(new byte[] { 1, 2, 3 });
-
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
-                EventFactory.DEFAULT_VERSION, builder1));
+                EventFactory.createDefaultWaitForRpcEvent(44, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44L,
+                (short) 0x01, new HelloMessageBuilder()));
+        eventPlan.add(0,
+                EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
 
-        executeLater();
+        eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
+                (short) 0x01, getFeatureResponseMsg()));
 
-        Runnable sendExperimenterCmd = new Runnable() {
+        executeNow();
 
-            @Override
-            public void run() {
-                ExperimenterInputBuilder builder2 = new ExperimenterInputBuilder();
-                builder2.setExperimenter(84L).setExpType(4L);
-                EventFactory.setupHeader(42L, builder2);
-                adapter.experimenter(builder2.build());
-            }
-        };
-        pool.schedule(sendExperimenterCmd,
-                ConnectionAdapterStackImpl.JOB_DELAY, TimeUnit.MILLISECONDS);
+        Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
+                connectionConductor.getConductorState());
+        Assert.assertEquals((short) 0x01, connectionConductor.getVersion()
+                .shortValue());
     }
 
     /**
      * Test method for
      * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onFlowRemovedMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage)}
      * .
-     * @throws InterruptedException 
+     *
+     * @throws InterruptedException
      */
     @Test
     public void testOnFlowRemovedMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, DataObject> objFms = new FlowRemovedMessageService() ;
+        FlowRemovedMessageService objFms = new FlowRemovedMessageService();
         controller.addMessageTranslator(FlowRemovedMessage.class, 4, objFms);
-        
-        // Complete HandShake
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
-                EventFactory.DEFAULT_VERSION, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures")));
-        eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(43,
-          EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-        execute(true);
-        
+
+        simulateV13PostHandshakeState(connectionConductor);
+        objFms.resetLatch(2);
+
         // Now send Flow Removed messages
         FlowRemovedMessageBuilder builder1 = new FlowRemovedMessageBuilder();
         builder1.setVersion((short) 4);
         builder1.setXid(1L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
-        synchronized (popListener) {
             LOG.debug("about to wait for popListener");
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, flowremovedMessageCounter);
         builder1.setXid(2L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objFms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
         Assert.assertEquals(2, flowremovedMessageCounter);
     }
 
@@ -380,53 +407,30 @@ public class ConnectionConductorImplTest {
         // TODO:: add test
     }
 
-    /**
-     * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onMultipartRequestMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage)}
-     * .
-     */
-    @Test
-    public void testOnMultipartRequestMessage() {
-        // fail("Not yet implemented");
-        // TODO:: add test
-    }
-
     /**
      * Test method for
      * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onPacketInMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage)}
      * .
-     * @throws InterruptedException 
+     *
+     * @throws InterruptedException
      */
     @Test
     public void testOnPacketInMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, DataObject> objPms = new PacketInMessageService() ;
+        PacketInMessageService objPms = new PacketInMessageService();
         controller.addMessageTranslator(PacketInMessage.class, 4, objPms);
-        
-        // Complete HandShake
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
-        EventFactory.DEFAULT_VERSION, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures")));
-        eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(43,
-          EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-        execute(true);
-        
+
+        simulateV13PostHandshakeState(connectionConductor);
+
         // Now send PacketIn
         PacketInMessageBuilder builder1 = new PacketInMessageBuilder();
         builder1.setVersion((short) 4);
-        builder1.setBufferId((long)1);
+        builder1.setBufferId((long) 1);
         connectionConductor.onPacketInMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, packetinMessageCounter);
-        builder1.setBufferId((long)2);
+        builder1.setBufferId((long) 2);
         connectionConductor.onPacketInMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objPms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
         Assert.assertEquals(2, packetinMessageCounter);
     }
 
@@ -434,49 +438,43 @@ public class ConnectionConductorImplTest {
      * Test method for
      * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onPortStatusMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage)}
      * .
-     * @throws InterruptedException 
+     *
+     * @throws InterruptedException
      */
     @Test
     public void testOnPortStatusMessage() throws InterruptedException {
-        
-        IMDMessageTranslator<OfHeader, DataObject> objPSms = new PortStatusMessageService() ;
+        PortStatusMessageService objPSms = new PortStatusMessageService();
         controller.addMessageTranslator(PortStatusMessage.class, 4, objPSms);
-        
-        // Complete HandShake
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
-        EventFactory.DEFAULT_VERSION, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures")));
-        eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(43,
-          EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-        execute(true);
-        
+
+        simulateV13PostHandshakeState(connectionConductor);
+
         // Send Port Status messages
         PortStatusMessageBuilder builder1 = new PortStatusMessageBuilder();
         builder1.setVersion((short) 4);
-        PortFeatures features = new PortFeatures(true,false,false,false,false,false,false,false,false,false,false,false,false,false,false,false);
+        PortFeatures features = new PortFeatures(true, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRADD).setCurrentFeatures(features);
+        objPSms.resetLatch(3);
+        LOG.debug("sending port message");
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, portstatusAddMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRMODIFY).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, portstatusModifyMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRDELETE).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objPSms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+        Assert.assertEquals(1, portstatusModifyMessageCounter);
+        Assert.assertEquals(1, portstatusAddMessageCounter);
         Assert.assertEquals(1, portstatusDeleteMessageCounter);
     }
 
+    private void flushMessageProcessing() throws InterruptedException {
+        // make sure that harvester sleeps deeply
+        Thread.sleep(maxProcessingTimeout);
+        // flushing messages
+        queueProcessor.getHarvesterHandle().ping();
+    }
+
     /**
      * @throws InterruptedException
      */
@@ -489,13 +487,13 @@ public class ConnectionConductorImplTest {
      */
     private void executeNow() throws InterruptedException {
         execute(true);
-        connectionConductor.shutdownPool();
+        connectionConductor.getHsPool().shutdown();
     }
 
     /**
      * @throws InterruptedException
      */
-    private void execute(boolean join) throws InterruptedException {
+    private void execute(final boolean join) throws InterruptedException {
         libSimulation = new Thread(adapter, "junit-adapter");
         libSimulation.start();
         if (join) {
@@ -503,233 +501,7 @@ public class ConnectionConductorImplTest {
         }
     }
 
-    //////// Start - Version Negotiation Test //////////////
-
-    /**
-     * Test of version negotiation Where switch version = 1.0
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation10() throws Exception {
-        LOG.debug("testVersionNegotiation10");
-        Short version = (short) 0x01;
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42, version, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "helloReply")));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(44, "getFeatures"));
-        eventPlan.add(0,
-                EventFactory.createDefaultRpcResponseEvent(44, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-
-        executeNow();
-        Assert.assertEquals(version, connectionConductor.getVersion());
-    }
-
-    /**
-     * Test of version negotiation Where switch version < 1.0
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation00() throws Exception {
-        Short version = (short) 0x00;
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
-        executeNow();
-        Assert.assertNull(connectionConductor.getVersion());
-    }
-
-    /**
-     * Test of version negotiation Where 1.0 < switch version < 1.3
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation11() throws Exception {
-        LOG.debug("testVersionNegotiation11");
-        connectionConductor.setBitmapNegotiationEnable(false);
-        Short version = (short) 0x02;
-        Short expVersion = (short) 0x01;
-        Assert.assertNull(connectionConductor.getVersion());
-        
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "helloReply")));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44, expVersion, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
-        eventPlan.add(0,
-                EventFactory.createDefaultRpcResponseEvent(45, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-        executeNow();
-        Assert.assertEquals(expVersion, connectionConductor.getVersion());
-
-    }
-
-    /**
-     * Test of version negotiation Where switch version = 1.3
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation13() throws Exception {
-        LOG.debug("testVersionNegotiation13");
-        Short version = (short) 0x04;
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures")));
-        eventPlan.add(0,
-                EventFactory.createDefaultRpcResponseEvent(43, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-
-        executeNow();
-        Assert.assertEquals(version, connectionConductor.getVersion());
-    }
-
-    /**
-     * Test of version negotiation Where switch version >= 1.3
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation15() throws Exception {
-        LOG.debug("testVersionNegotiation15");
-        connectionConductor.setBitmapNegotiationEnable(false);
-        Short version = (short) 0x06;
-        Short expVersion = (short) 0x04;
-        Assert.assertNull(connectionConductor.getVersion());
-        
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44, expVersion, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
-        eventPlan.add(0,
-                EventFactory.createDefaultRpcResponseEvent(45, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-        executeNow();
-        Assert.assertEquals(expVersion, connectionConductor.getVersion());
-    }
-
-    /**
-     * Test of version negotiation Where switch version > 1.3
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation15_MultipleCall() throws Exception {
-        LOG.debug("testVersionNegotiation15_MultipleCall");
-        connectionConductor.setBitmapNegotiationEnable(false);
-        Short version = (short) 0x06;
-        Assert.assertNull(connectionConductor.getVersion());
-
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42, version, new HelloMessageBuilder()));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44, version, new HelloMessageBuilder()));
-        executeNow();
-        // TODO : check for connection termination
-        Assert.assertNull(connectionConductor.getVersion());
-    }
-
-    /**
-     * Test of version negotiation Where bitmap version {0x05,0x01}
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation10InBitmap() throws Exception {
-        LOG.debug("testVersionNegotiation10InBitmap");
-        Short version = (short) 0x01;
-        
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(
-                0,
-                EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
-                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x01))));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures")));
-        eventPlan.add(0,
-                EventFactory.createDefaultRpcResponseEvent(43, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-
-        executeNow();
-        Assert.assertEquals(version, connectionConductor.getVersion());
-    }
-
-    /**
-     * Test of version negotiation Where bitmap version {0x05,0x04}
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiation13InBitmap() throws Exception {
-        LOG.debug("testVersionNegotiation13InBitmap");
-        Short version = (short) 0x04;
-        
-        eventPlan.add(0, EventFactory.createConnectionReadyCallback(connectionConductor));
-        eventPlan.add(
-                0,
-                EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
-                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x04))));
-        eventPlan.add(0, EventFactory.createDefaultWaitForAllEvent(
-                EventFactory.createDefaultWaitForRpcEvent(21, "helloReply"),
-                EventFactory.createDefaultWaitForRpcEvent(43, "getFeatures")));
-        eventPlan.add(0,
-                EventFactory.createDefaultRpcResponseEvent(43, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-
-        executeNow();
-        Assert.assertEquals(version, connectionConductor.getVersion());
-    }
-
-    /**
-     * Test of version negotiation Where bitmap version {0x05,0x02}
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testVersionNegotiationNoCommonVersionInBitmap() throws Exception {
-        eventPlan.add(
-                0,
-                EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
-                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x02))));
-        executeNow();
-        Assert.assertNull(connectionConductor.getVersion());
-    }
-
-    private HelloMessageBuilder getHelloBitmapMessage(List<Short> versionOrder) {
-        short highestVersion = versionOrder.get(0);
-        int elementsCount = highestVersion / Integer.SIZE;
-        ElementsBuilder elementsBuilder = new ElementsBuilder();
-
-        List<Elements> elementList = new ArrayList<>();
-        int orderIndex = versionOrder.size();
-        int value = versionOrder.get(--orderIndex);
-        for (int index = 0; index <= elementsCount; index++) {
-            List<Boolean> booleanList = new ArrayList<>();
-            for (int i = 0; i < Integer.SIZE; i++) {
-                if (value == ((index * Integer.SIZE) + i)) {
-                    booleanList.add(true);
-                    value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
-                } else {
-                    booleanList.add(false);
-                }
-            }
-            elementsBuilder.setType(HelloElementType.forValue(1));
-            elementsBuilder.setVersionBitmap(booleanList);
-            elementList.add(elementsBuilder.build());
-        }
-
-        HelloMessageBuilder builder = new HelloMessageBuilder();
-        builder.setXid(10L);
-        builder.setVersion(highestVersion);
-        builder.setElements(elementList);
-        return builder;
-
-    }
-
-    private GetFeaturesOutputBuilder getFeatureResponseMsg() {
+    private static GetFeaturesOutputBuilder getFeatureResponseMsg() {
         GetFeaturesOutputBuilder getFeaturesOutputBuilder = new GetFeaturesOutputBuilder();
         getFeaturesOutputBuilder.setDatapathId(new BigInteger("102030405060"));
         getFeaturesOutputBuilder.setAuxiliaryId((short) 0);
@@ -744,7 +516,7 @@ public class ConnectionConductorImplTest {
     /**
      * @return
      */
-    private static Capabilities createCapabilities(long input) {
+    private static Capabilities createCapabilities(final long input) {
         final Boolean FLOW_STATS = (input & (1 << 0)) != 0;
         final Boolean TABLE_STATS = (input & (1 << 1)) != 0;
         final Boolean PORT_STATS = (input & (1 << 2)) != 0;
@@ -756,128 +528,195 @@ public class ConnectionConductorImplTest {
                 PORT_BLOCKED, PORT_STATS, QUEUE_STATS, TABLE_STATS);
         return capabilities;
     }
-    
-    public class ExperimenterMessageService implements IMDMessageTranslator<OfHeader, DataObject> {
+
+    public abstract class ProcessingLatchService {
+        private CountDownLatch processingLatch = new CountDownLatch(0);
+
+        public void resetLatch(int passAmount) {
+            processingLatch = new CountDownLatch(passAmount);
+        }
+
+        protected void countDown() {
+            processingLatch.countDown();
+        }
+
+        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+            return processingLatch.await(timeout, unit);
+        }
+    }
+
+    public class ExperimenterMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public DataObject translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in Experimenter Service");
             ConnectionConductorImplTest.this.incrExperimenterMessageCounter();
             return null;
         }
     }
 
-    public class PacketInMessageService implements IMDMessageTranslator<OfHeader, DataObject> {
+    public class PacketInMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public DataObject translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in PacketIn Service");
             ConnectionConductorImplTest.this.incrPacketinMessageCounter();
+            countDown();
             return null;
         }
     }
 
-    public class FlowRemovedMessageService implements IMDMessageTranslator<OfHeader, DataObject> {
+    public class FlowRemovedMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public DataObject translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in FlowRemoved Service");
             ConnectionConductorImplTest.this.incrFlowremovedMessageCounter();
+            countDown();
             return null;
         }
     }
 
-    public class PortStatusMessageService implements IMDMessageTranslator<OfHeader, DataObject> {
+    public class PortStatusMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public DataObject translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in PortStatus Service");
-            if ( (((PortStatusMessage)msg).getReason().equals(PortReason.OFPPRADD))  ) {
+            if ((((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRADD))) {
                 ConnectionConductorImplTest.this.incrPortstatusAddMessageCounter();
-            } else if (((PortStatusMessage)msg).getReason().equals(PortReason.OFPPRDELETE)){
+            } else if (((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRDELETE)) {
                 ConnectionConductorImplTest.this.incrPortstatusDeleteMessageCounter();
-            } else if (((PortStatusMessage)msg).getReason().equals(PortReason.OFPPRMODIFY)) {
+            } else if (((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRMODIFY)) {
                 ConnectionConductorImplTest.this.incrPortstatusModifyMessageCounter();
             }
+            countDown();
             return null;
         }
     }
-    
-    public class ErrorMessageService implements IMDMessageTranslator<OfHeader, DataObject> {
+
+    public class ErrorMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public DataObject translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in Experimenter Service");
             ConnectionConductorImplTest.this.incrErrorMessageCounter();
+            countDown();
             return null;
         }
     }
 
     /**
      * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessage)}
-     * .
-     * @throws InterruptedException
-     */
-    @Test
-    public void testOnExperimenterMessage() throws InterruptedException {
-        ExperimenterMessageBuilder builder1 = new ExperimenterMessageBuilder();
-        builder1.setVersion((short) 4);
-        builder1.setExperimenter(84L).setExpType(4L);
-        connectionConductor.onExperimenterMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, experimenterMessageCounter);
-        
-        builder1.setExperimenter(85L).setExpType(4L);
-        connectionConductor.onExperimenterMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(2, experimenterMessageCounter);
-    }
-    
-    /**
-     * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage)}
+     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(ExperimenterMessage)}
      * .
+     *
      * @throws InterruptedException
      */
     @Test
     public void testOnErrorMessage() throws InterruptedException {
+        simulateV13PostHandshakeState(connectionConductor);
+        final int messageAmount = 2;
+
+        objErms.resetLatch(messageAmount);
         ErrorMessageBuilder builder1 = new ErrorMessageBuilder();
         builder1.setVersion((short) 4);
         builder1.setCode(100);
         connectionConductor.onErrorMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, errorMessageCounter);
         builder1.setCode(200);
         connectionConductor.onErrorMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(2, errorMessageCounter);
+
+        flushMessageProcessing();
+        Assert.assertTrue(objErms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+        Assert.assertEquals(messageAmount, errorMessageCounter);
     }
 
     /**
-     * @return listener mapping for :
+     * @return listener mapping for:
      * <ul>
      * <li>experimenter</li>
      * <li>error</li>
      * </ul>
      */
-    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> assembleTranslatorMapping() {
-        Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, DataObject>>> translatorMapping = new HashMap<>();
+    private Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> assembleTranslatorMapping() {
+        Map<TranslatorKey, Collection<IMDMessageTranslator<OfHeader, List<DataObject>>>> translatorMapping = new HashMap<>();
         TranslatorKey tKey;
-        
-        IMDMessageTranslator<OfHeader, DataObject> objEms = new ExperimenterMessageService() ;
-        Collection<IMDMessageTranslator<OfHeader, DataObject>> existingValues = new ArrayList<>();
+
+        IMDMessageTranslator<OfHeader, List<DataObject>> objEms = new ExperimenterMessageService();
+        Collection<IMDMessageTranslator<OfHeader, List<DataObject>>> existingValues = new ArrayList<>();
         existingValues.add(objEms);
         tKey = new TranslatorKey(4, ExperimenterMessage.class.getName());
         translatorMapping.put(tKey, existingValues);
-        IMDMessageTranslator<OfHeader, DataObject> objErms = new ErrorMessageService() ;
+        objErms = new ErrorMessageService();
         existingValues.add(objErms);
         tKey = new TranslatorKey(4, ErrorMessage.class.getName());
         translatorMapping.put(tKey, existingValues);
         return translatorMapping;
     }
 
+    /**
+     * Test method for
+     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#processPortStatusMsg(PortStatus)}
+     * <br><br>
+     * Tests for getting features from port status message by port version
+     * <ul>
+     * <li>features are malformed - one of them is null</li>
+     * <li>mismatch between port version and port features</li>
+     * <li>mismatch between port version and port features</li>
+     * <li>non-existing port version</li>
+     * <li>port version OF 1.0</li>
+     * <li>port version OF 1.3</li>
+     * </ul>
+     */
+    @Test
+    public void testProcessPortStatusMsg() {
+        simulateV13PostHandshakeState(connectionConductor);
+
+        long portNumber = 90L;
+        long portNumberV10 = 91L;
+        PortStatusMessage msg;
+
+        PortStatusMessageBuilder builder = new PortStatusMessageBuilder();
+        PortFeatures features = new PortFeatures(true, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false);
+        PortFeatures featuresMal = new PortFeatures(true, false, false, false, null, false, false, false, false, false, false, false, false, false, false, false);
+        PortFeaturesV10 featuresV10 = new PortFeaturesV10(true, false, false, false, false, false, false, false, false, false, false, false);
+
+        //Malformed features
+        builder.setVersion((short) 1).setPortNo(portNumber).setReason(PortReason.OFPPRADD).setCurrentFeatures(featuresMal);
+        connectionConductor.processPortStatusMsg(builder.build());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPhysicalPorts().isEmpty());
+
+        //Version-features mismatch
+        builder.setCurrentFeatures(features);
+        connectionConductor.processPortStatusMsg(builder.build());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPhysicalPorts().isEmpty());
+
+        //Non existing version
+        builder.setVersion((short) 0);
+        connectionConductor.processPortStatusMsg(builder.build());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPhysicalPorts().isEmpty());
+
+        //Version OF 1.3
+        builder.setVersion((short) 4);
+        msg = builder.build();
+        connectionConductor.processPortStatusMsg(builder.build());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPortBandwidth(portNumber));
+        Assert.assertEquals(connectionConductor.getSessionContext().getPhysicalPort(portNumber), msg);
+
+        //Version OF 1.0
+        builder.setVersion((short) 1).setPortNo(portNumberV10).setCurrentFeatures(null).setCurrentFeaturesV10(featuresV10);
+        msg = builder.build();
+        connectionConductor.processPortStatusMsg(builder.build());
+        Assert.assertTrue(connectionConductor.getSessionContext().getPortBandwidth(portNumberV10));
+        Assert.assertEquals(connectionConductor.getSessionContext().getPhysicalPort(portNumberV10), msg);
+    }
+
+
+    @Test
+    public void testHandshakeFailOperations(){
+        connectionConductor.onHandshakeFailure();
+        connectionConductor.checkState(ConnectionConductor.CONDUCTOR_STATE.RIP);
+    }
+    private static void simulateV13PostHandshakeState(final ConnectionConductorImpl conductor) {
+        GetFeaturesOutputBuilder featureOutput = getFeatureResponseMsg();
+        conductor.postHandshakeBasic(featureOutput.build(), OFConstants.OFP_VERSION_1_3);
+        LOG.debug("simulating post handshake event done");
+    }
 }