Bug 5540 - Remove ConvertorManager singleton
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImplTest.java
index 5d586ca12ac27e50eba47d574ab537dc0b54f5af..8eb350f358900cfd1cc51805db5fde840b0b2160 100644 (file)
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 20132014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -9,7 +9,6 @@
 package org.opendaylight.openflowplugin.openflow.md.core;
 
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -18,7 +17,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -30,17 +31,20 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 import org.opendaylight.openflowplugin.api.OFConstants;
+import org.opendaylight.openflowplugin.api.openflow.connection.HandshakeContext;
 import org.opendaylight.openflowplugin.api.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.api.openflow.md.core.IMDMessageTranslator;
 import org.opendaylight.openflowplugin.api.openflow.md.core.SwitchConnectionDistinguisher;
 import org.opendaylight.openflowplugin.api.openflow.md.core.TranslatorKey;
+import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
+import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
-import org.opendaylight.openflowplugin.api.openflow.md.core.session.SessionContext;
-import org.opendaylight.openflowplugin.api.openflow.md.queue.PopListener;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
-import org.opendaylight.openflowplugin.api.statistics.MessageSpy;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeaturesV10;
@@ -56,6 +60,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatus;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
 import org.opendaylight.yangtools.yang.binding.DataContainer;
@@ -80,12 +85,12 @@ public class ConnectionConductorImplTest {
     private Stack<SwitchTestEvent> eventPlan;
 
     private Thread libSimulation;
-    private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
+    private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
             8);
 
     protected QueueProcessorLightImpl queueProcessor;
 
-    private PopListener<DataObject> popListener;
+    private PopListenerCountingImpl<DataObject> popListener;
 
     private int experimenterMessageCounter;
     private int packetinMessageCounter;
@@ -98,9 +103,12 @@ public class ConnectionConductorImplTest {
     @Mock
     private ErrorHandlerSimpleImpl errorHandler;
 
-    private int expectedErrors = 0;
+    private final int expectedErrors = 0;
     @Mock
     private MessageSpy<DataContainer> messageSpy;
+    @Mock
+    HandshakeContext handshakeContext;
+    private ErrorMessageService objErms;
 
     public void incrExperimenterMessageCounter() {
         this.experimenterMessageCounter++;
@@ -148,7 +156,8 @@ public class ConnectionConductorImplTest {
 
         popListener = new PopListenerCountingImpl<>();
 
-        controller = new MDController();
+        final ConvertorManager convertorManager = ConvertorManagerFactory.createDefaultManager();
+        controller = new MDController(convertorManager);
         controller.init();
         controller.getMessageTranslators().putAll(assembleTranslatorMapping());
 
@@ -162,6 +171,7 @@ public class ConnectionConductorImplTest {
         connectionConductor.setQueueProcessor(queueProcessor);
         connectionConductor.setErrorHandler(errorHandler);
         connectionConductor.init();
+        connectionConductor.setHandshakeContext(handshakeContext);
         eventPlan = new Stack<>();
         adapter.setEventPlan(eventPlan);
         adapter.setProceedTimeout(5000L);
@@ -189,7 +199,7 @@ public class ConnectionConductorImplTest {
             libSimulation.join();
         }
         queueProcessor.shutdown();
-        connectionConductor.shutdownPool();
+        connectionConductor.getHsPool().shutdown();
 
         for (Exception problem : adapter.getOccuredExceptions()) {
             LOG.error("during simulation on adapter side: "
@@ -201,7 +211,7 @@ public class ConnectionConductorImplTest {
             if (eventPlan.size() > 0) {
                 LOG.debug("eventPlan size: " + eventPlan.size());
                 for (SwitchTestEvent event : eventPlan) {
-                    LOG.debug(" # EVENT:: " + event.toString());
+                    LOG.debug(" # EVENT:: " + event);
                 }
             }
         }
@@ -256,10 +266,6 @@ public class ConnectionConductorImplTest {
                 EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
 
         int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
         executeNow();
 
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
@@ -284,12 +290,6 @@ public class ConnectionConductorImplTest {
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(43,
                 EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
 
-        int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-
         executeNow();
 
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
@@ -322,10 +322,7 @@ public class ConnectionConductorImplTest {
                 EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
 
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
-                EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-
-        int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
+                (short) 0x01, getFeatureResponseMsg()));
 
         executeNow();
 
@@ -360,10 +357,7 @@ public class ConnectionConductorImplTest {
                 EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
 
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
-                EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
-
-        int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
+                (short) 0x01, getFeatureResponseMsg()));
 
         executeNow();
 
@@ -382,26 +376,23 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnFlowRemovedMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, List<DataObject>> objFms = new FlowRemovedMessageService();
+        FlowRemovedMessageService objFms = new FlowRemovedMessageService();
         controller.addMessageTranslator(FlowRemovedMessage.class, 4, objFms);
 
         simulateV13PostHandshakeState(connectionConductor);
+        objFms.resetLatch(2);
 
         // Now send Flow Removed messages
         FlowRemovedMessageBuilder builder1 = new FlowRemovedMessageBuilder();
         builder1.setVersion((short) 4);
         builder1.setXid(1L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
-        synchronized (popListener) {
             LOG.debug("about to wait for popListener");
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, flowremovedMessageCounter);
         builder1.setXid(2L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objFms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
         Assert.assertEquals(2, flowremovedMessageCounter);
     }
 
@@ -425,7 +416,7 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnPacketInMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, List<DataObject>> objPms = new PacketInMessageService();
+        PacketInMessageService objPms = new PacketInMessageService();
         controller.addMessageTranslator(PacketInMessage.class, 4, objPms);
 
         simulateV13PostHandshakeState(connectionConductor);
@@ -435,15 +426,11 @@ public class ConnectionConductorImplTest {
         builder1.setVersion((short) 4);
         builder1.setBufferId((long) 1);
         connectionConductor.onPacketInMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, packetinMessageCounter);
         builder1.setBufferId((long) 2);
         connectionConductor.onPacketInMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objPms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
         Assert.assertEquals(2, packetinMessageCounter);
     }
 
@@ -456,7 +443,7 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnPortStatusMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, List<DataObject>> objPSms = new PortStatusMessageService();
+        PortStatusMessageService objPSms = new PortStatusMessageService();
         controller.addMessageTranslator(PortStatusMessage.class, 4, objPSms);
 
         simulateV13PostHandshakeState(connectionConductor);
@@ -466,25 +453,28 @@ public class ConnectionConductorImplTest {
         builder1.setVersion((short) 4);
         PortFeatures features = new PortFeatures(true, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRADD).setCurrentFeatures(features);
+        objPSms.resetLatch(3);
+        LOG.debug("sending port message");
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, portstatusAddMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRMODIFY).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, portstatusModifyMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRDELETE).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objPSms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+        Assert.assertEquals(1, portstatusModifyMessageCounter);
+        Assert.assertEquals(1, portstatusAddMessageCounter);
         Assert.assertEquals(1, portstatusDeleteMessageCounter);
     }
 
+    private void flushMessageProcessing() throws InterruptedException {
+        // make sure that harvester sleeps deeply
+        Thread.sleep(maxProcessingTimeout);
+        // flushing messages
+        queueProcessor.getHarvesterHandle().ping();
+    }
+
     /**
      * @throws InterruptedException
      */
@@ -497,13 +487,13 @@ public class ConnectionConductorImplTest {
      */
     private void executeNow() throws InterruptedException {
         execute(true);
-        connectionConductor.shutdownPool();
+        connectionConductor.getHsPool().shutdown();
     }
 
     /**
      * @throws InterruptedException
      */
-    private void execute(boolean join) throws InterruptedException {
+    private void execute(final boolean join) throws InterruptedException {
         libSimulation = new Thread(adapter, "junit-adapter");
         libSimulation.start();
         if (join) {
@@ -526,7 +516,7 @@ public class ConnectionConductorImplTest {
     /**
      * @return
      */
-    private static Capabilities createCapabilities(long input) {
+    private static Capabilities createCapabilities(final long input) {
         final Boolean FLOW_STATS = (input & (1 << 0)) != 0;
         final Boolean TABLE_STATS = (input & (1 << 1)) != 0;
         final Boolean PORT_STATS = (input & (1 << 2)) != 0;
@@ -539,36 +529,54 @@ public class ConnectionConductorImplTest {
         return capabilities;
     }
 
+    public abstract class ProcessingLatchService {
+        private CountDownLatch processingLatch = new CountDownLatch(0);
+
+        public void resetLatch(int passAmount) {
+            processingLatch = new CountDownLatch(passAmount);
+        }
+
+        protected void countDown() {
+            processingLatch.countDown();
+        }
+
+        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+            return processingLatch.await(timeout, unit);
+        }
+    }
+
     public class ExperimenterMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in Experimenter Service");
             ConnectionConductorImplTest.this.incrExperimenterMessageCounter();
             return null;
         }
     }
 
-    public class PacketInMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class PacketInMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in PacketIn Service");
             ConnectionConductorImplTest.this.incrPacketinMessageCounter();
+            countDown();
             return null;
         }
     }
 
-    public class FlowRemovedMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class FlowRemovedMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in FlowRemoved Service");
             ConnectionConductorImplTest.this.incrFlowremovedMessageCounter();
+            countDown();
             return null;
         }
     }
 
-    public class PortStatusMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class PortStatusMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in PortStatus Service");
             if ((((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRADD))) {
                 ConnectionConductorImplTest.this.incrPortstatusAddMessageCounter();
@@ -577,22 +585,24 @@ public class ConnectionConductorImplTest {
             } else if (((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRMODIFY)) {
                 ConnectionConductorImplTest.this.incrPortstatusModifyMessageCounter();
             }
+            countDown();
             return null;
         }
     }
 
-    public class ErrorMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class ErrorMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in Experimenter Service");
             ConnectionConductorImplTest.this.incrErrorMessageCounter();
+            countDown();
             return null;
         }
     }
 
     /**
      * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessage)}
+     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#onExperimenterMessage(ExperimenterMessage)}
      * .
      *
      * @throws InterruptedException
@@ -600,21 +610,19 @@ public class ConnectionConductorImplTest {
     @Test
     public void testOnErrorMessage() throws InterruptedException {
         simulateV13PostHandshakeState(connectionConductor);
+        final int messageAmount = 2;
 
+        objErms.resetLatch(messageAmount);
         ErrorMessageBuilder builder1 = new ErrorMessageBuilder();
         builder1.setVersion((short) 4);
         builder1.setCode(100);
         connectionConductor.onErrorMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, errorMessageCounter);
         builder1.setCode(200);
         connectionConductor.onErrorMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(2, errorMessageCounter);
+
+        flushMessageProcessing();
+        Assert.assertTrue(objErms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+        Assert.assertEquals(messageAmount, errorMessageCounter);
     }
 
     /**
@@ -633,7 +641,7 @@ public class ConnectionConductorImplTest {
         existingValues.add(objEms);
         tKey = new TranslatorKey(4, ExperimenterMessage.class.getName());
         translatorMapping.put(tKey, existingValues);
-        IMDMessageTranslator<OfHeader, List<DataObject>> objErms = new ErrorMessageService();
+        objErms = new ErrorMessageService();
         existingValues.add(objErms);
         tKey = new TranslatorKey(4, ErrorMessage.class.getName());
         translatorMapping.put(tKey, existingValues);
@@ -642,7 +650,7 @@ public class ConnectionConductorImplTest {
 
     /**
      * Test method for
-     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#processPortStatusMsg(org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage)}
+     * {@link org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductorImpl#processPortStatusMsg(PortStatus)}
      * <br><br>
      * Tests for getting features from port status message by port version
      * <ul>
@@ -667,13 +675,13 @@ public class ConnectionConductorImplTest {
         PortFeatures featuresMal = new PortFeatures(true, false, false, false, null, false, false, false, false, false, false, false, false, false, false, false);
         PortFeaturesV10 featuresV10 = new PortFeaturesV10(true, false, false, false, false, false, false, false, false, false, false, false);
 
-        //Malformed features           
+        //Malformed features
         builder.setVersion((short) 1).setPortNo(portNumber).setReason(PortReason.OFPPRADD).setCurrentFeatures(featuresMal);
         connectionConductor.processPortStatusMsg(builder.build());
         Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
         Assert.assertTrue(connectionConductor.getSessionContext().getPhysicalPorts().isEmpty());
 
-        //Version-features mismatch            
+        //Version-features mismatch
         builder.setCurrentFeatures(features);
         connectionConductor.processPortStatusMsg(builder.build());
         Assert.assertTrue(connectionConductor.getSessionContext().getPortsBandwidth().isEmpty());
@@ -706,8 +714,9 @@ public class ConnectionConductorImplTest {
         connectionConductor.onHandshakeFailure();
         connectionConductor.checkState(ConnectionConductor.CONDUCTOR_STATE.RIP);
     }
-    private void simulateV13PostHandshakeState(ConnectionConductorImpl conductor) {
+    private static void simulateV13PostHandshakeState(final ConnectionConductorImpl conductor) {
         GetFeaturesOutputBuilder featureOutput = getFeatureResponseMsg();
         conductor.postHandshakeBasic(featureOutput.build(), OFConstants.OFP_VERSION_1_3);
+        LOG.debug("simulating post handshake event done");
     }
 }