Bug 5540 - Remove ConvertorManager singleton
[openflowplugin.git] / openflowplugin / src / test / java / org / opendaylight / openflowplugin / openflow / md / core / ConnectionConductorImplTest.java
index 9c2ee4d41ce1b783cf06034d57662fa682bcf3b8..8eb350f358900cfd1cc51805db5fde840b0b2160 100644 (file)
@@ -1,5 +1,5 @@
 /**
- * Copyright (c) 2013-2014 Cisco Systems, Inc. and others.  All rights reserved.
+ * Copyright (c) 20132014 Cisco Systems, Inc. and others.  All rights reserved.
  *
  * This program and the accompanying materials are made available under the
  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
@@ -17,7 +17,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -40,6 +42,8 @@ import org.opendaylight.openflowplugin.api.openflow.statistics.MessageSpy;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManager;
+import org.opendaylight.openflowplugin.openflow.md.core.sal.convertor.ConvertorManagerFactory;
 import org.opendaylight.openflowplugin.openflow.md.queue.QueueProcessorLightImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.Capabilities;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.PortFeatures;
@@ -81,12 +85,12 @@ public class ConnectionConductorImplTest {
     private Stack<SwitchTestEvent> eventPlan;
 
     private Thread libSimulation;
-    private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
+    private final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
             8);
 
     protected QueueProcessorLightImpl queueProcessor;
 
-    private PopListener<DataObject> popListener;
+    private PopListenerCountingImpl<DataObject> popListener;
 
     private int experimenterMessageCounter;
     private int packetinMessageCounter;
@@ -99,11 +103,12 @@ public class ConnectionConductorImplTest {
     @Mock
     private ErrorHandlerSimpleImpl errorHandler;
 
-    private int expectedErrors = 0;
+    private final int expectedErrors = 0;
     @Mock
     private MessageSpy<DataContainer> messageSpy;
     @Mock
     HandshakeContext handshakeContext;
+    private ErrorMessageService objErms;
 
     public void incrExperimenterMessageCounter() {
         this.experimenterMessageCounter++;
@@ -151,7 +156,8 @@ public class ConnectionConductorImplTest {
 
         popListener = new PopListenerCountingImpl<>();
 
-        controller = new MDController();
+        final ConvertorManager convertorManager = ConvertorManagerFactory.createDefaultManager();
+        controller = new MDController(convertorManager);
         controller.init();
         controller.getMessageTranslators().putAll(assembleTranslatorMapping());
 
@@ -205,7 +211,7 @@ public class ConnectionConductorImplTest {
             if (eventPlan.size() > 0) {
                 LOG.debug("eventPlan size: " + eventPlan.size());
                 for (SwitchTestEvent event : eventPlan) {
-                    LOG.debug(" # EVENT:: " + event.toString());
+                    LOG.debug(" # EVENT:: " + event);
                 }
             }
         }
@@ -260,10 +266,6 @@ public class ConnectionConductorImplTest {
                 EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
 
         int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
         executeNow();
 
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
@@ -288,12 +290,6 @@ public class ConnectionConductorImplTest {
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(43,
                 EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
 
-        int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-
         executeNow();
 
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
@@ -328,9 +324,6 @@ public class ConnectionConductorImplTest {
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
                 (short) 0x01, getFeatureResponseMsg()));
 
-        int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-
         executeNow();
 
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
@@ -366,9 +359,6 @@ public class ConnectionConductorImplTest {
         eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
                 (short) 0x01, getFeatureResponseMsg()));
 
-        int i = 1;
-        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(i++, "multipartRequestInput"));
-
         executeNow();
 
         Assert.assertEquals(ConnectionConductor.CONDUCTOR_STATE.WORKING,
@@ -386,26 +376,23 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnFlowRemovedMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, List<DataObject>> objFms = new FlowRemovedMessageService();
+        FlowRemovedMessageService objFms = new FlowRemovedMessageService();
         controller.addMessageTranslator(FlowRemovedMessage.class, 4, objFms);
 
         simulateV13PostHandshakeState(connectionConductor);
+        objFms.resetLatch(2);
 
         // Now send Flow Removed messages
         FlowRemovedMessageBuilder builder1 = new FlowRemovedMessageBuilder();
         builder1.setVersion((short) 4);
         builder1.setXid(1L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
-        synchronized (popListener) {
             LOG.debug("about to wait for popListener");
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, flowremovedMessageCounter);
         builder1.setXid(2L);
         connectionConductor.onFlowRemovedMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objFms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
         Assert.assertEquals(2, flowremovedMessageCounter);
     }
 
@@ -429,7 +416,7 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnPacketInMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, List<DataObject>> objPms = new PacketInMessageService();
+        PacketInMessageService objPms = new PacketInMessageService();
         controller.addMessageTranslator(PacketInMessage.class, 4, objPms);
 
         simulateV13PostHandshakeState(connectionConductor);
@@ -439,15 +426,11 @@ public class ConnectionConductorImplTest {
         builder1.setVersion((short) 4);
         builder1.setBufferId((long) 1);
         connectionConductor.onPacketInMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, packetinMessageCounter);
         builder1.setBufferId((long) 2);
         connectionConductor.onPacketInMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objPms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
         Assert.assertEquals(2, packetinMessageCounter);
     }
 
@@ -460,7 +443,7 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnPortStatusMessage() throws InterruptedException {
-        IMDMessageTranslator<OfHeader, List<DataObject>> objPSms = new PortStatusMessageService();
+        PortStatusMessageService objPSms = new PortStatusMessageService();
         controller.addMessageTranslator(PortStatusMessage.class, 4, objPSms);
 
         simulateV13PostHandshakeState(connectionConductor);
@@ -470,25 +453,28 @@ public class ConnectionConductorImplTest {
         builder1.setVersion((short) 4);
         PortFeatures features = new PortFeatures(true, false, false, false, false, false, false, false, false, false, false, false, false, false, false, false);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRADD).setCurrentFeatures(features);
+        objPSms.resetLatch(3);
+        LOG.debug("sending port message");
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, portstatusAddMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRMODIFY).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, portstatusModifyMessageCounter);
         builder1.setPortNo(90L).setReason(PortReason.OFPPRDELETE).setCurrentFeatures(features);
         connectionConductor.onPortStatusMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
+
+        flushMessageProcessing();
+        Assert.assertTrue(objPSms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+        Assert.assertEquals(1, portstatusModifyMessageCounter);
+        Assert.assertEquals(1, portstatusAddMessageCounter);
         Assert.assertEquals(1, portstatusDeleteMessageCounter);
     }
 
+    private void flushMessageProcessing() throws InterruptedException {
+        // make sure that harvester sleeps deeply
+        Thread.sleep(maxProcessingTimeout);
+        // flushing messages
+        queueProcessor.getHarvesterHandle().ping();
+    }
+
     /**
      * @throws InterruptedException
      */
@@ -507,7 +493,7 @@ public class ConnectionConductorImplTest {
     /**
      * @throws InterruptedException
      */
-    private void execute(boolean join) throws InterruptedException {
+    private void execute(final boolean join) throws InterruptedException {
         libSimulation = new Thread(adapter, "junit-adapter");
         libSimulation.start();
         if (join) {
@@ -530,7 +516,7 @@ public class ConnectionConductorImplTest {
     /**
      * @return
      */
-    private static Capabilities createCapabilities(long input) {
+    private static Capabilities createCapabilities(final long input) {
         final Boolean FLOW_STATS = (input & (1 << 0)) != 0;
         final Boolean TABLE_STATS = (input & (1 << 1)) != 0;
         final Boolean PORT_STATS = (input & (1 << 2)) != 0;
@@ -543,36 +529,54 @@ public class ConnectionConductorImplTest {
         return capabilities;
     }
 
+    public abstract class ProcessingLatchService {
+        private CountDownLatch processingLatch = new CountDownLatch(0);
+
+        public void resetLatch(int passAmount) {
+            processingLatch = new CountDownLatch(passAmount);
+        }
+
+        protected void countDown() {
+            processingLatch.countDown();
+        }
+
+        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+            return processingLatch.await(timeout, unit);
+        }
+    }
+
     public class ExperimenterMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in Experimenter Service");
             ConnectionConductorImplTest.this.incrExperimenterMessageCounter();
             return null;
         }
     }
 
-    public class PacketInMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class PacketInMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in PacketIn Service");
             ConnectionConductorImplTest.this.incrPacketinMessageCounter();
+            countDown();
             return null;
         }
     }
 
-    public class FlowRemovedMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class FlowRemovedMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in FlowRemoved Service");
             ConnectionConductorImplTest.this.incrFlowremovedMessageCounter();
+            countDown();
             return null;
         }
     }
 
-    public class PortStatusMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class PortStatusMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in PortStatus Service");
             if ((((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRADD))) {
                 ConnectionConductorImplTest.this.incrPortstatusAddMessageCounter();
@@ -581,15 +585,17 @@ public class ConnectionConductorImplTest {
             } else if (((PortStatusMessage) msg).getReason().equals(PortReason.OFPPRMODIFY)) {
                 ConnectionConductorImplTest.this.incrPortstatusModifyMessageCounter();
             }
+            countDown();
             return null;
         }
     }
 
-    public class ErrorMessageService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
+    public class ErrorMessageService extends ProcessingLatchService implements IMDMessageTranslator<OfHeader, List<DataObject>> {
         @Override
-        public List<DataObject> translate(SwitchConnectionDistinguisher cookie, SessionContext sw, OfHeader msg) {
+        public List<DataObject> translate(final SwitchConnectionDistinguisher cookie, final SessionContext sw, final OfHeader msg) {
             LOG.debug("Received a packet in Experimenter Service");
             ConnectionConductorImplTest.this.incrErrorMessageCounter();
+            countDown();
             return null;
         }
     }
@@ -604,21 +610,19 @@ public class ConnectionConductorImplTest {
     @Test
     public void testOnErrorMessage() throws InterruptedException {
         simulateV13PostHandshakeState(connectionConductor);
+        final int messageAmount = 2;
 
+        objErms.resetLatch(messageAmount);
         ErrorMessageBuilder builder1 = new ErrorMessageBuilder();
         builder1.setVersion((short) 4);
         builder1.setCode(100);
         connectionConductor.onErrorMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(1, errorMessageCounter);
         builder1.setCode(200);
         connectionConductor.onErrorMessage(builder1.build());
-        synchronized (popListener) {
-            popListener.wait(maxProcessingTimeout);
-        }
-        Assert.assertEquals(2, errorMessageCounter);
+
+        flushMessageProcessing();
+        Assert.assertTrue(objErms.await(maxProcessingTimeout, TimeUnit.MILLISECONDS));
+        Assert.assertEquals(messageAmount, errorMessageCounter);
     }
 
     /**
@@ -637,7 +641,7 @@ public class ConnectionConductorImplTest {
         existingValues.add(objEms);
         tKey = new TranslatorKey(4, ExperimenterMessage.class.getName());
         translatorMapping.put(tKey, existingValues);
-        IMDMessageTranslator<OfHeader, List<DataObject>> objErms = new ErrorMessageService();
+        objErms = new ErrorMessageService();
         existingValues.add(objErms);
         tKey = new TranslatorKey(4, ErrorMessage.class.getName());
         translatorMapping.put(tKey, existingValues);
@@ -710,8 +714,9 @@ public class ConnectionConductorImplTest {
         connectionConductor.onHandshakeFailure();
         connectionConductor.checkState(ConnectionConductor.CONDUCTOR_STATE.RIP);
     }
-    private void simulateV13PostHandshakeState(ConnectionConductorImpl conductor) {
+    private static void simulateV13PostHandshakeState(final ConnectionConductorImpl conductor) {
         GetFeaturesOutputBuilder featureOutput = getFeatureResponseMsg();
         conductor.postHandshakeBasic(featureOutput.build(), OFConstants.OFP_VERSION_1_3);
+        LOG.debug("simulating post handshake event done");
     }
 }