sending hello upon connection established 57/2457/1
authorMichal Rehak <mirehak@cisco.com>
Thu, 31 Oct 2013 16:15:59 +0000 (17:15 +0100)
committerMichal Rehak <mirehak@cisco.com>
Wed, 6 Nov 2013 18:32:54 +0000 (19:32 +0100)
hello with versionBitmap
update tests regarding handshake

Change-Id: If75c944fbb3f7a180c4e2d292042e59db52b6273
Signed-off-by: Michal Rehak <mirehak@cisco.com>
openflowplugin-it/src/test/java/org/opendaylight/openflowplugin/openflow/md/it/OFPluginToLibraryTest.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactoryTest.java [new file with mode: 0644]
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/plan/ConnectionAdapterStackImpl.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java

index ca8f76b2e233a3fcc7b219f89310b7806763392d..621a18f54ae721e3d71e85f65474f69d7e8f02d1 100644 (file)
@@ -21,6 +21,8 @@ import org.opendaylight.openflowjava.protocol.impl.clients.ScenarioHandler;
 import org.opendaylight.openflowjava.protocol.impl.clients.SendEvent;
 import org.opendaylight.openflowjava.protocol.impl.clients.SimpleClient;
 import org.opendaylight.openflowjava.protocol.impl.clients.SleepEvent;
+import org.opendaylight.openflowjava.protocol.impl.clients.WaitForMessageEvent;
+import org.opendaylight.openflowjava.protocol.impl.util.ByteBufUtils;
 import org.opendaylight.openflowjava.protocol.spi.connection.SwitchConnectionProvider;
 import org.ops4j.pax.exam.Configuration;
 import org.ops4j.pax.exam.Option;
@@ -61,6 +63,9 @@ public class OFPluginToLibraryTest {
 
     private SimpleClient switchSim;
 
+    /**
+     * test tear down
+     */
     @After
     public void tearDown() {
         try {
@@ -78,7 +83,7 @@ public class OFPluginToLibraryTest {
      * @throws Exception
      */
     @Test
-    public void handshakeOk() throws Exception {
+    public void handshakeOk1() throws Exception {
         LOG.debug("handshake integration test");
         LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
 
@@ -99,6 +104,38 @@ public class OFPluginToLibraryTest {
         //TODO: dump errors of plugin
     }
     
+    /**
+     * test basic integration with OFLib running the handshake (with version bitmap)
+     * @throws Exception
+     */
+    @Test
+    public void handshakeOk2() throws Exception {
+        LOG.debug("handshake integration test");
+        LOG.debug("switchConnectionProvider: "+switchConnectionProvider);
+
+        switchSim = new SimpleClient("localhost", 6653);
+        switchSim.setSecuredClient(false);
+        Stack<ClientEvent> handshakeScenario = new Stack<>();
+        // handshake with versionbitmap
+        handshakeScenario.add(0, new SendEvent(ByteBufUtils.hexStringToBytes("04 00 00 10 00 00 00 01 00 01 00 08 00 00 00 10")));
+        handshakeScenario.add(0, new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 00 00 10 00 00 00 15 00 01 00 08 00 00 00 12")));
+        handshakeScenario.add(0, new WaitForMessageEvent(ByteBufUtils.hexStringToBytes("04 05 00 08 00 00 00 03")));
+        handshakeScenario.add(0, new SendEvent(ByteBufUtils.hexStringToBytes("04 06 00 20 00 00 00 03 "
+                + "00 01 02 03 04 05 06 07 00 01 02 03 01 00 00 00 00 01 02 03 00 01 02 03")));
+        
+        ScenarioHandler scenario = new ScenarioHandler(handshakeScenario);
+        switchSim.setScenarioHandler(scenario);
+        switchSim.start();
+        try {
+            switchSim.getScenarioDone().get(getFailSafeTimeout(), TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            String msg = "waiting for scenario to finish failed: "+e.getMessage();
+            LOG.error(msg, e);
+            Assert.fail(msg);
+        }
+        //TODO: dump errors of plugin
+    }
+    
     /**
      * test basic integration with OFLib running the handshake
      * @throws Exception
index 50dd784ad391f98c964031fb1fa4b2935052157c..e96ae57d887b58ce5cb4ea10b728894d2154511d 100644 (file)
@@ -8,11 +8,14 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import java.util.List;
 import java.util.concurrent.Future;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 
+import com.google.common.collect.Lists;
+
 
 /**
  * @author mirehak
@@ -35,6 +38,9 @@ public interface ConnectionConductor {
         RIP
     }
 
+    /** supported version ordered by height (highest version is at the beginning) */
+    public static final List<Short> versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
+
     /**
      * initialize wiring around {@link #connectionAdapter}
      */
index d6098b1c726ccaa442f581ff4b4ed9a4577caff5..985e54a130234cf90f3fbaadc76235078e10a1f8 100644 (file)
@@ -8,20 +8,21 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowjava.protocol.api.connection.ConnectionReadyListener;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
@@ -31,7 +32,7 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartReplyMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MultipartRequestMessage;
@@ -39,7 +40,6 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
@@ -49,7 +49,6 @@ import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Futures;
 
 /**
@@ -58,7 +57,7 @@ import com.google.common.util.concurrent.Futures;
 public class ConnectionConductorImpl implements OpenflowProtocolListener,
         SystemNotificationsListener, ConnectionConductor, ConnectionReadyListener {
 
-    private static final Logger LOG = LoggerFactory
+    protected static final Logger LOG = LoggerFactory
             .getLogger(ConnectionConductorImpl.class);
 
     /* variable to make BitMap-based negotiation enabled / disabled.
@@ -68,8 +67,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     private static final boolean isBitmapNegotiationEnable = true;
     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
 
-    private final ConnectionAdapter connectionAdapter;
-    private final List<Short> versionOrder;
+    protected final ConnectionAdapter connectionAdapter;
     private ConnectionConductor.CONDUCTOR_STATE conductorState;
     private Short version;
 
@@ -79,7 +77,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
 
-    private boolean isFirstHelloNegotiation = true;
+    protected boolean isFirstHelloNegotiation = true;
 
 
 
@@ -89,8 +87,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     public ConnectionConductorImpl(ConnectionAdapter connectionAdapter) {
         this.connectionAdapter = connectionAdapter;
         conductorState = CONDUCTOR_STATE.HANDSHAKING;
-        versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
-        // TODO: add a thread pool to handle ErrorQueueHandler
         new Thread(new ErrorQueueHandler(errorQueue)).start();
     }
 
@@ -105,51 +101,44 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     /**
      * send first hello message to switch
      */
-    private void sendFirstHelloMessage() {
-        short highestVersion = versionOrder.get(0);
-        Long helloXid = 1L;
-        HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
-        helloInputbuilder.setVersion(highestVersion);
-        helloInputbuilder.setXid(helloXid);
+    protected void sendFirstHelloMessage() {
+        Short highestVersion = ConnectionConductor.versionOrder.get(0);
+        Long helloXid = 21L;
+        HelloInput helloInput = null;
+        
         if (isBitmapNegotiationEnable) {
-            int elementsCount = highestVersion / Integer.SIZE;
-            ElementsBuilder elementsBuilder = new ElementsBuilder();
-
-            List<Elements> elementList = new ArrayList<Elements>();
-            int orderIndex = versionOrder.size();
-            int value = versionOrder.get(--orderIndex);
-            for (int index = 0; index <= elementsCount; index++) {
-                List<Boolean> booleanList = new ArrayList<Boolean>();
-                for (int i = 0; i < Integer.SIZE; i++) {
-                    if (value == ((index * Integer.SIZE) + i)) {
-                        booleanList.add(true);
-                        value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
-                    } else {
-                        booleanList.add(false);
-                    }
-                }
-                elementsBuilder.setType(HelloElementType.forValue(1));
-                elementsBuilder.setVersionBitmap(booleanList);
-                elementList.add(elementsBuilder.build());
-            }
-            helloInputbuilder.setElements(elementList);
-            LOG.debug("sending first hello message: version header={} , version bitmap={}", highestVersion, elementList);
+            helloInput = MessageFactory.createHelloInput(highestVersion, helloXid, ConnectionConductor.versionOrder);
+            LOG.debug("sending first hello message: vertsion header={} , version bitmap={}", 
+                    highestVersion, helloInput.getElements());
         } else {
+            helloInput = MessageFactory.createHelloInput(highestVersion, helloXid);
             LOG.debug("sending first hello message: version header={} ", highestVersion);
         }
-        connectionAdapter.hello(helloInputbuilder.build());
-
+        
+        try {
+            RpcResult<Void> helloResult = connectionAdapter.hello(helloInput).get(getMaxTimeout(), getMaxTimeoutUnit());
+            smokeRpc(helloResult);
+            LOG.debug("FIRST HELLO sent.");
+        } catch (Throwable e) {
+            LOG.debug("FIRST HELLO sending failed.");
+            handleException(e);
+        }
     }
 
     @Override
-    public void onEchoRequestMessage(EchoRequestMessage echoRequestMessage) {
-        LOG.debug("echo request received: " + echoRequestMessage.getXid());
-        EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
-        builder.setVersion(echoRequestMessage.getVersion());
-        builder.setXid(echoRequestMessage.getXid());
-        builder.setData(echoRequestMessage.getData());
-
-        connectionAdapter.echoReply(builder.build());
+    public void onEchoRequestMessage(final EchoRequestMessage echoRequestMessage) {
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                LOG.debug("echo request received: " + echoRequestMessage.getXid());
+                EchoReplyInputBuilder builder = new EchoReplyInputBuilder();
+                builder.setVersion(echoRequestMessage.getVersion());
+                builder.setXid(echoRequestMessage.getXid());
+                builder.setData(echoRequestMessage.getData());
+                
+                connectionAdapter.echoReply(builder.build());
+            }
+        }).start();            
     }
 
     @Override
@@ -186,17 +175,17 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     @Override
     public void onHelloMessage(final HelloMessage hello) {
         // do handshake
-        LOG.info("handshake STARTED");
-        checkState(CONDUCTOR_STATE.HANDSHAKING);
 
         new Thread(new Runnable() {
-
             @Override
             public void run() {
+                LOG.info("handshake STARTED");
+                checkState(CONDUCTOR_STATE.HANDSHAKING);
+                
                 Short remoteVersion = hello.getVersion();
                 List<Elements> elements = hello.getElements();
-                long xid = hello.getXid();
-                short proposedVersion;
+                Long xid = hello.getXid();
+                Short proposedVersion;
                 LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
                 try {
                     // find the version from header version field
@@ -205,7 +194,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                 } catch (IllegalArgumentException e) {
                     handleException(e);
                     connectionAdapter.disconnect();
-                    throw e;
+                    return;
                 }
                 
                 // sent version is equal to remote --> version is negotiated
@@ -222,7 +211,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                     } catch (IllegalArgumentException ex) {
                         handleException(ex);
                         connectionAdapter.disconnect();
-                        throw ex;
+                        return;
                     }
                     LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
                     sendHelloReply(proposedVersion, ++xid);
@@ -240,7 +229,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
                     }
                 }
             }
-            
         }).start();
     }
 
@@ -249,20 +237,44 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
      * @param proposedVersion
      * @param hello
      */
-    private void sendHelloReply(Short proposedVersion, Long xid)
+    protected void sendHelloReply(Short proposedVersion, Long xid)
     {
-        HelloInputBuilder helloBuilder = new HelloInputBuilder();
-        helloBuilder.setVersion(proposedVersion).setXid(xid);
-        connectionAdapter.hello(helloBuilder.build());
+        HelloInput helloMsg = MessageFactory.createHelloInput(proposedVersion, xid);
+        RpcResult<Void> result;
+        try {
+            result = connectionAdapter.hello(helloMsg).get(getMaxTimeout(), getMaxTimeoutUnit());
+            smokeRpc(result);
+        } catch (Throwable e) {
+            handleException(e);
+        }
     }
 
 
+    /**
+     * @param futureResult
+     * @throws Throwable 
+     */
+    private static void smokeRpc(RpcResult<?> result) throws Throwable {
+        if (!result.isSuccessful()) {
+            Throwable firstCause = null;
+            StringBuffer sb = new StringBuffer();
+            for (RpcError error : result.getErrors()) {
+                if (firstCause != null) {
+                    firstCause = error.getCause();
+                }
+                
+                sb.append("rpcError:").append(error.getCause().getMessage()).append(";");
+            }
+            throw new Exception(sb.toString(), firstCause);
+        }
+    }
+
     /**
      * after handshake set features, register to session
      * @param proposedVersion
      * @param xId
      */
-    private void postHandshake(Short proposedVersion, Long xid) {
+    protected void postHandshake(Short proposedVersion, Long xid) {
         // set version
         version = proposedVersion;
         LOG.debug("version set: " + proposedVersion);
@@ -273,27 +285,23 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
                 .getFeatures(featuresBuilder.build());
         LOG.debug("waiting for features");
-        RpcResult<GetFeaturesOutput> rpcFeatures;
         try {
-            rpcFeatures = featuresFuture.get(getMaxTimeout(),
-                    TimeUnit.MILLISECONDS);
-            if (!rpcFeatures.isSuccessful()) {
-                LOG.error("obtained features problem: {}"
-                        , rpcFeatures.getErrors());
-            } else {
-                GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
-                LOG.debug("obtained features: datapathId={}",
-                        featureOutput.getDatapathId());
-                LOG.debug("obtained features: auxiliaryId={}",
-                        featureOutput.getAuxiliaryId());
-                conductorState = CONDUCTOR_STATE.WORKING;
-
-                OFSessionUtil.registerSession(this,
-                        featureOutput, version);
-                this.setListenerMapping(OFSessionUtil.getListenersMap());
-                LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
-            }
-        } catch (Exception e) {
+            RpcResult<GetFeaturesOutput> rpcFeatures = 
+                    featuresFuture.get(getMaxTimeout(), getMaxTimeoutUnit());
+            smokeRpc(rpcFeatures);
+            
+            GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
+            LOG.debug("obtained features: datapathId={}",
+                    featureOutput.getDatapathId());
+            LOG.debug("obtained features: auxiliaryId={}",
+                    featureOutput.getAuxiliaryId());
+            conductorState = CONDUCTOR_STATE.WORKING;
+
+            OFSessionUtil.registerSession(this,
+                    featureOutput, version);
+            this.setListenerMapping(OFSessionUtil.getListenersMap());
+            LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
+        } catch (Throwable e) {
             //handshake failed
             LOG.error("issuing disconnect during handshake, reason: "+e.getMessage());
             handleException(e);
@@ -308,12 +316,28 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         // TODO:: get from configuration
         return 2000;
     }
+    
+    /**
+     * @return milliseconds
+     */
+    private TimeUnit getMaxTimeoutUnit() {
+        // TODO:: get from configuration
+        return TimeUnit.MILLISECONDS;
+    }
+
 
     /**
      * @param e
      */
-    private void handleException(Exception e) {
-        Exception causeAndThread = new Exception("IN THREAD: "+Thread.currentThread().getName(), e);
+    protected void handleException(Throwable e) {
+        String sessionKeyId = null;
+        if (getSessionContext() != null) {
+            sessionKeyId = Arrays.toString(getSessionContext().getSessionKey().getId());
+        }
+        
+        Exception causeAndThread = new Exception(
+                "IN THREAD: "+Thread.currentThread().getName() +
+                "; session:"+sessionKeyId, e);
         try {
             errorQueue.put(causeAndThread);
         } catch (InterruptedException e1) {
@@ -362,7 +386,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
             try {
                 // TODO: read timeout from config
                 RpcResult<EchoOutput> echoReplyValue = echoReplyFuture.get(getMaxTimeout(),
-                        TimeUnit.SECONDS);
+                        getMaxTimeoutUnit());
                 if (echoReplyValue.isSuccessful()) {
                     conductorState = CONDUCTOR_STATE.WORKING;
                 } else {
@@ -402,7 +426,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     /**
      * @param handshaking
      */
-    private void checkState(CONDUCTOR_STATE expectedState) {
+    protected void checkState(CONDUCTOR_STATE expectedState) {
         if (!conductorState.equals(expectedState)) {
             throw new IllegalStateException("Expected state: " + expectedState
                     + ", actual state:" + conductorState);
@@ -422,7 +446,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
      */
     protected short proposeVersion(short remoteVersion) {
         Short proposal = null;
-        for (short offer : versionOrder) {
+        for (short offer : ConnectionConductor.versionOrder) {
             if (offer <= remoteVersion) {
                 proposal = offer;
                 break;
@@ -440,7 +464,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
      * @param list
      * @return
      */
-    protected short proposeBitmapVersion(List<Elements> list)
+    protected Short proposeBitmapVersion(List<Elements> list)
     {
         Short supportedHighestVersion = null;
         if((null != list) && (0 != list.size()))
@@ -449,7 +473,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
            {
               List<Boolean> bitmap = element.getVersionBitmap();
               // check for version bitmap
-              for(short bitPos : versionOrder)
+              for(short bitPos : ConnectionConductor.versionOrder)
               {
                   // with all the version it should work.
                   if(bitmap.get(bitPos % Integer.SIZE))
@@ -514,7 +538,6 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
      */
     public void setListenerMapping(
             Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping) {
-        //TODO: adjust the listener interface
         this.listenerMapping = listenerMapping;
     }
 
@@ -545,6 +568,14 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     @Override
     public void onConnectionReady() {
-        // TODO Auto-generated method stub
+        LOG.debug("connection is ready-to-use");
+        //TODO: fire first helloMessage
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                sendFirstHelloMessage();
+            }
+        }).start();
     }
+    
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactory.java
new file mode 100644 (file)
index 0000000..2fc0db0
--- /dev/null
@@ -0,0 +1,80 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
+
+/**
+ * @author mirehak
+ *
+ */
+public abstract class MessageFactory {
+
+    /**
+     * @param helloVersion
+     * @param helloXid
+     * @return HelloInput without elements
+     */
+    public static HelloInput createHelloInput(short helloVersion, long helloXid) {
+        return createHelloInput(helloVersion, helloXid, null);
+    }
+
+    /**
+     * @param highestVersion
+     * @param xid
+     * @return builder with prepared header
+     */
+    private static HelloInputBuilder prepareHelloInputBuilder(
+            short highestVersion, long xid) {
+        HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
+        helloInputbuilder.setVersion(highestVersion);
+        helloInputbuilder.setXid(xid);
+        return helloInputbuilder;
+    }
+    
+    /**
+     * @param helloVersion
+     * @param helloXid
+     * @param versionOrder
+     * @return HelloInput with elements (version bitmap)
+     */
+    public static HelloInput createHelloInput(short helloVersion, long helloXid, List<Short> versionOrder) {
+        HelloInputBuilder helloInputbuilder = prepareHelloInputBuilder(helloVersion, helloXid);
+        if (versionOrder != null) {
+            List<Elements> elementList = new ArrayList<>();
+            
+            ElementsBuilder elementsBuilder = new ElementsBuilder();
+            elementsBuilder.setType(HelloElementType.VERSIONBITMAP);
+            List<Boolean> booleanList = new ArrayList<>();
+            
+            int versionOrderIndex = versionOrder.size() - 1;
+            
+            while (versionOrderIndex >= 0) {
+                short version = versionOrder.get(versionOrderIndex);
+                if (version == booleanList.size()) {
+                    booleanList.add(true);
+                    versionOrderIndex--;
+                } else {
+                    booleanList.add(false);
+                }
+            }
+            
+            elementsBuilder.setVersionBitmap(booleanList);
+            elementList.add(elementsBuilder.build());
+            helloInputbuilder.setElements(elementList);
+        }
+        return helloInputbuilder.build();
+    }
+}
index bc11fac212d34d2a18ed8f578e2fe0bb6c989899..7d8f43cdbbf2968e9bb3f343f4de99ba4170177e 100644 (file)
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  */
 public class MessageDispatchServiceImpl implements IMessageDispatchService {
 
-    private static final Logger LOG = LoggerFactory.getLogger(OFSessionUtil.class);
+    private static final Logger LOG = LoggerFactory.getLogger(MessageDispatchServiceImpl.class);
 
     private SessionContext session;
 
@@ -64,10 +64,10 @@ public class MessageDispatchServiceImpl implements IMessageDispatchService {
     private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) {
 
         if (!session.isValid()) {
-            LOG.warn("Session for the cookie {} is invalid." + cookie);
+            LOG.warn("Session for the cookie {} is invalid.", cookie);
             throw new IllegalArgumentException("Session for the cookie is invalid.");
         }
-        LOG.debug("finding connecton for cookie value {}. " + cookie);
+        LOG.debug("finding connecton for cookie value {}. ", cookie);
         // set main connection as default
         ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
         if (null != cookie) {
index 27165e245153422a95f8160790e66065362504b0..7f4aa92c4fca8b5510dae6e681c62ece24cf6217 100644 (file)
@@ -10,13 +10,11 @@ package org.opendaylight.openflowplugin.openflow.md.core;
 
 import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Stack;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -27,7 +25,6 @@ import org.junit.Test;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterStackImpl;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
-import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
@@ -40,17 +37,15 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowRemovedMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessageBuilder;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
-import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
 import org.opendaylight.yangtools.yang.binding.DataObject;
-import org.opendaylight.yangtools.yang.binding.Notification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,7 +56,7 @@ import com.google.common.collect.Lists;
  */
 public class ConnectionConductorImplTest {
 
-    private static final Logger LOG = LoggerFactory
+    protected static final Logger LOG = LoggerFactory
             .getLogger(ConnectionConductorImplTest.class);
 
     protected ConnectionAdapterStackImpl adapter;
@@ -72,12 +67,12 @@ public class ConnectionConductorImplTest {
     private Thread libSimulation;
     private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
             8);
-    private int experimenterMessageCounter;
-    private int packetinMessageCounter;
-    private int flowremovedMessageCounter;
-    private int portstatusAddMessageCounter;
-    private int portstatusDeleteMessageCounter;
-    private int portstatusModifyMessageCounter;
+    protected int experimenterMessageCounter;
+    protected int packetinMessageCounter;
+    protected int flowremovedMessageCounter;
+    protected int portstatusAddMessageCounter;
+    protected int portstatusDeleteMessageCounter;
+    protected int portstatusModifyMessageCounter;
 
     /**
      * @throws java.lang.Exception
@@ -236,6 +231,9 @@ public class ConnectionConductorImplTest {
         builder1.setExperimenter(84L).setExpType(4L);
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L,
                 EventFactory.DEFAULT_VERSION, builder1));
+        
+        connectionConductor.setListenerMapping(assembleListenerMapping());
+        
         executeLater();
 
         Runnable sendExperimenterCmd = new Runnable() {
@@ -642,7 +640,7 @@ public class ConnectionConductorImplTest {
         eventPlan.add(
                 0,
                 EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
-                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x04))));
+                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x02))));
         executeNow();
         Assert.assertNull(connectionConductor.getVersion());
     }
@@ -652,11 +650,11 @@ public class ConnectionConductorImplTest {
         int elementsCount = highestVersion / Integer.SIZE;
         ElementsBuilder elementsBuilder = new ElementsBuilder();
 
-        List<Elements> elementList = new ArrayList<Elements>();
+        List<Elements> elementList = new ArrayList<>();
         int orderIndex = versionOrder.size();
         int value = versionOrder.get(--orderIndex);
         for (int index = 0; index <= elementsCount; index++) {
-            List<Boolean> booleanList = new ArrayList<Boolean>();
+            List<Boolean> booleanList = new ArrayList<>();
             for (int i = 0; i < Integer.SIZE; i++) {
                 if (value == ((index * Integer.SIZE) + i)) {
                     booleanList.add(true);
@@ -736,12 +734,7 @@ public class ConnectionConductorImplTest {
      */
     @Test
     public void testOnExperimenterMessage() throws InterruptedException {
-        IMDMessageListener objEms = new ExperimenterMessageService() ;
-        Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping = new HashMap<Class<? extends DataObject>, Collection<IMDMessageListener>>();
-        Collection<IMDMessageListener> existingValues = new ArrayList<IMDMessageListener>();
-        existingValues.add(objEms);
-        listenerMapping.put(ExperimenterMessage.class, existingValues);
-        connectionConductor.setListenerMapping(listenerMapping);
+        connectionConductor.setListenerMapping(assembleListenerMapping());
         ExperimenterMessageBuilder builder1 = new ExperimenterMessageBuilder();
         builder1.setExperimenter(84L).setExpType(4L);
         connectionConductor.onExperimenterMessage(builder1.build());
@@ -751,4 +744,16 @@ public class ConnectionConductorImplTest {
         Assert.assertEquals(2, experimenterMessageCounter);
     }
 
+    /**
+     * @return listener mapping
+     */
+    private Map<Class<? extends DataObject>, Collection<IMDMessageListener>> assembleListenerMapping() {
+        IMDMessageListener objEms = new ExperimenterMessageService() ;
+        Map<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping = new HashMap<>();
+        Collection<IMDMessageListener> existingValues = new ArrayList<>();
+        existingValues.add(objEms);
+        listenerMapping.put(ExperimenterMessage.class, existingValues);
+        return listenerMapping;
+    }
+
 }
diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactoryTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/MessageFactoryTest.java
new file mode 100644 (file)
index 0000000..26d370f
--- /dev/null
@@ -0,0 +1,57 @@
+/**
+ * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.openflowplugin.openflow.md.core;
+
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+
+/**
+ * @author mirehak
+ *
+ */
+public class MessageFactoryTest {
+
+    /**
+     * Test method for {@link org.opendaylight.openflowplugin.openflow.md.core.MessageFactory#createHelloInputWoElements(java.lang.Short, java.lang.Long)}.
+     */
+    @Test
+    public void testCreateHelloInputWoElements() {
+        short highestVersion = (short) 0x04;
+        long xid = 42L;
+        
+        HelloInput helloMsg = MessageFactory.createHelloInput(highestVersion, xid);
+        Assert.assertEquals(highestVersion, helloMsg.getVersion().shortValue());
+        Assert.assertEquals(xid, helloMsg.getXid().longValue());
+        Assert.assertNull(helloMsg.getElements());
+    }
+
+    /**
+     * Test method for {@link org.opendaylight.openflowplugin.openflow.md.core.MessageFactory#createHelloInputWithElements(java.lang.Short, java.lang.Long, java.util.List)}.
+     */
+    @Test
+    public void testCreateHelloInputWithElements() {
+        short highestVersion = (short) 0x04;
+        long xid = 42L;
+        Boolean[] expectedVersionBitmap = new Boolean[]{
+                false, true, false, false, true};
+        
+        HelloInput helloMsg = MessageFactory.createHelloInput(highestVersion, xid, 
+                ConnectionConductor.versionOrder);
+        Assert.assertEquals(highestVersion, helloMsg.getVersion().shortValue());
+        Assert.assertEquals(xid, helloMsg.getXid().longValue());
+        Assert.assertEquals(1, helloMsg.getElements().size());
+        Elements actualElement = helloMsg.getElements().get(0);
+        Assert.assertEquals(HelloElementType.VERSIONBITMAP, actualElement.getType());
+        Assert.assertArrayEquals(expectedVersionBitmap, actualElement.getVersionBitmap().toArray(new Boolean[0]));
+    }
+
+}
index eb9d411a944ffa5b0b98b2d11d86ef51fd4de01e..fe5c29ead3fa68e773ef774b6059b3c66399dcc8 100644 (file)
@@ -15,10 +15,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
-import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 
 import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
@@ -89,8 +86,6 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     protected SystemNotificationsListener systemListener;
 
     protected Map<Long, SettableFuture<?>> rpcResults = new HashMap<>();
-    private ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(
-            8);
     protected boolean planTouched = false;
 
     private long proceedTimeout;
@@ -98,6 +93,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     protected List<Exception> occuredExceptions = new ArrayList<>();
 
     private ConnectionReadyListener connectionReadyListener;
+    
+    private int planItemCounter;
 
     /**
      * default ctor
@@ -293,8 +290,15 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
         String msg = null;
         LOG.debug("checking rpc: " + rpcName);
         if (!(eventPlan.peek() instanceof SwitchTestWaitForRpcEvent)) {
-            msg = "expected [rpc], got [" + rpcInput.getClass().getSimpleName()
-                    + "]";
+            if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
+                SwitchTestNotificationEvent notifEvent = (SwitchTestNotificationEvent) (eventPlan.peek());
+                msg = "expected [notification: " +notifEvent.getPlannedNotification()+ "], got [" + rpcInput.getClass().getSimpleName()
+                        + "]";
+            } else if (eventPlan.peek() instanceof SwitchTestRcpResponseEvent) {
+                SwitchTestRcpResponseEvent rpcEvent = (SwitchTestRcpResponseEvent) (eventPlan.peek());
+                msg = "expected [rpc: " +rpcEvent.getPlannedRpcResponse()+ "], got [" + rpcInput.getClass().getSimpleName()
+                        + "]";
+            }
         } else {
             SwitchTestWaitForRpcEvent switchTestRpcEvent = (SwitchTestWaitForRpcEvent) eventPlan
                     .peek();
@@ -308,18 +312,19 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
         }
 
         if (msg != null) {
-            LOG.debug("check .. FAILED: " + msg);
+            LOG.debug("rpc check .. FAILED: " + msg);
             occuredExceptions.add(new IllegalArgumentException(msg));
         }
-        LOG.debug("check .. OK");
+        LOG.debug("rpc check .. OK");
     }
 
     /**
      * discard current event, execute next, if possible
      */
     private synchronized void next() {
-        LOG.debug("STEPPING TO NEXT event in plan");
+        LOG.debug("<---> STEPPING TO NEXT event in plan (leaving [{}] {})", planItemCounter, eventPlan.peek());
         eventPlan.pop();
+        planItemCounter ++;
         planTouched = true;
         notify();
     }
@@ -329,7 +334,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      */
     private synchronized void proceed() {
         boolean processed = false;
-        LOG.debug("proceeding plan item: " + eventPlan.peek());
+        LOG.debug("proceeding plan item[{}]: {}", planItemCounter, eventPlan.peek());
         if (eventPlan.peek() instanceof SwitchTestNotificationEvent) {
             SwitchTestNotificationEvent notification = (SwitchTestNotificationEvent) eventPlan
                     .peek();
@@ -346,7 +351,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
             next();
         } else {
             try {
-                LOG.debug("now waiting for HANDLER to act");
+                LOG.debug("now WAITING for OF_LISTENER to act ..");
                 wait(proceedTimeout);
             } catch (InterruptedException e) {
                 LOG.error(e.getMessage(), e);
@@ -356,7 +361,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void run() {
-        LOG.debug("evenPlan STARTING ..");
+        LOG.debug("|---> evenPlan STARTING ..");
+        planItemCounter = 0;
         while (!eventPlan.isEmpty()) {
             planTouched = false;
             proceed();
@@ -367,11 +373,11 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
         }
 
         try {
-            pool.awaitTermination(10 * JOB_DELAY, TimeUnit.MILLISECONDS);
+            Thread.sleep(JOB_DELAY);
         } catch (InterruptedException e) {
             LOG.error(e.getMessage(), e);
         }
-        LOG.debug("eventPlan done");
+        LOG.debug("<---| eventPlan DONE");
     }
 
     /**
@@ -380,116 +386,98 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
     private void processNotification(
             final SwitchTestNotificationEvent notificationEvent) {
 
-        Callable<Void> notifyCmd = new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                Notification notification = notificationEvent
-                        .getPlannedNotification();
-                LOG.debug("notificating HANDLER: "
-                        + notification.getClass().getSimpleName());
-
-                // system events
-                if (notification instanceof DisconnectEvent) {
-                    systemListener
-                            .onDisconnectEvent((DisconnectEvent) notification);
-                }
-                // of notifications
-                else if (notification instanceof EchoRequestMessage) {
-                    ofListener
-                            .onEchoRequestMessage((EchoRequestMessage) notification);
-                } else if (notification instanceof ErrorMessage) {
-                    ofListener.onErrorMessage((ErrorMessage) notification);
-                } else if (notification instanceof ExperimenterMessage) {
-                    ofListener
-                            .onExperimenterMessage((ExperimenterMessage) notification);
-                } else if (notification instanceof FlowRemovedMessage) {
-                    ofListener
-                            .onFlowRemovedMessage((FlowRemovedMessage) notification);
-                } else if (notification instanceof HelloMessage) {
-                    ofListener.onHelloMessage((HelloMessage) notification);
-                } else if (notification instanceof MultipartReplyMessage) {
-                    ofListener
-                            .onMultipartReplyMessage((MultipartReplyMessage) notification);
-                } else if (notification instanceof MultipartRequestMessage) {
-                    ofListener
-                            .onMultipartRequestMessage((MultipartRequestMessage) notification);
-                } else if (notification instanceof PacketInMessage) {
-                    ofListener
-                            .onPacketInMessage((PacketInMessage) notification);
-                } else if (notification instanceof PortStatusMessage) {
-                    ofListener
-                            .onPortStatusMessage((PortStatusMessage) notification);
-                }
-                // default
-                else {
-                    occuredExceptions.add(new IllegalStateException(
-                            "message listening not supported for type: "
-                                    + notification.getClass()));
-                }
-
-                LOG.debug("thread finished");
-                return null;
-            }
+        Notification notification = notificationEvent
+                .getPlannedNotification();
+        LOG.debug("notificating OF_LISTENER: "
+                + notification.getClass().getSimpleName());
 
-        };
+        // system events
+        if (notification instanceof DisconnectEvent) {
+            systemListener
+            .onDisconnectEvent((DisconnectEvent) notification);
+        }
+        // of notifications
+        else if (notification instanceof EchoRequestMessage) {
+            ofListener
+            .onEchoRequestMessage((EchoRequestMessage) notification);
+        } else if (notification instanceof ErrorMessage) {
+            ofListener.onErrorMessage((ErrorMessage) notification);
+        } else if (notification instanceof ExperimenterMessage) {
+            ofListener
+            .onExperimenterMessage((ExperimenterMessage) notification);
+        } else if (notification instanceof FlowRemovedMessage) {
+            ofListener
+            .onFlowRemovedMessage((FlowRemovedMessage) notification);
+        } else if (notification instanceof HelloMessage) {
+            ofListener.onHelloMessage((HelloMessage) notification);
+        } else if (notification instanceof MultipartReplyMessage) {
+            ofListener
+            .onMultipartReplyMessage((MultipartReplyMessage) notification);
+        } else if (notification instanceof MultipartRequestMessage) {
+            ofListener
+            .onMultipartRequestMessage((MultipartRequestMessage) notification);
+        } else if (notification instanceof PacketInMessage) {
+            ofListener
+            .onPacketInMessage((PacketInMessage) notification);
+        } else if (notification instanceof PortStatusMessage) {
+            ofListener
+            .onPortStatusMessage((PortStatusMessage) notification);
+        }
+        // default
+        else {
+            occuredExceptions.add(new IllegalStateException(
+                    "message listening not supported for type: "
+                            + notification.getClass()));
+        }
 
-        pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+        LOG.debug("notification ["+notification.getClass().getSimpleName()+"] .. done");
     }
 
     /**
      * @param rpcResponse
      */
     private void processRpcResponse(final SwitchTestRcpResponseEvent rpcResponse) {
-        Callable<Void> notifyCmd = new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-
-                OfHeader plannedRpcResponseValue = rpcResponse
-                        .getPlannedRpcResponse();
-                LOG.debug("rpc-responding to HANDLER: " + rpcResponse.getXid());
-
-                @SuppressWarnings("unchecked")
-                SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
-                        .get(rpcResponse.getXid());
-
-                if (response != null) {
-                    boolean successful = plannedRpcResponseValue != null;
-                    Collection<RpcError> errors;
-                    if (successful) {
-                        errors = Collections.emptyList();
-                    } else {
-                        errors = Lists
-                                .newArrayList(RpcErrors
-                                        .getRpcError(
-                                                "unit",
-                                                "unit",
-                                                "not requested",
-                                                ErrorSeverity.ERROR,
-                                                "planned response to RPC.id = "
-                                                        + rpcResponse.getXid(),
+        OfHeader plannedRpcResponseValue = rpcResponse
+                .getPlannedRpcResponse();
+        LOG.debug("rpc-responding to OF_LISTENER: " + rpcResponse.getXid());
+
+        @SuppressWarnings("unchecked")
+        SettableFuture<RpcResult<?>> response = (SettableFuture<RpcResult<?>>) rpcResults
+        .get(rpcResponse.getXid());
+
+        if (response != null) {
+            boolean successful = plannedRpcResponseValue != null;
+            Collection<RpcError> errors;
+            if (successful) {
+                errors = Collections.emptyList();
+            } else {
+                errors = Lists
+                        .newArrayList(RpcErrors
+                                .getRpcError(
+                                        "unit",
+                                        "unit",
+                                        "not requested",
+                                        ErrorSeverity.ERROR,
+                                        "planned response to RPC.id = "
+                                                + rpcResponse.getXid(),
                                                 ErrorType.RPC,
                                                 new Exception(
                                                         "rpc response failed (planned behavior)")));
-                    }
-                    RpcResult<?> result = Rpcs.getRpcResult(successful,
-                            plannedRpcResponseValue, errors);
-                    response.set(result);
-                } else {
-                    String msg = "RpcResponse not expected: xid="
-                            + rpcResponse.getXid()
-                            + ", "
-                            + plannedRpcResponseValue.getClass()
-                                    .getSimpleName();
-                    LOG.error(msg);
-                    occuredExceptions.add(new IllegalStateException(msg));
-                }
-
-                LOG.debug("thread finished");
-                return null;
             }
-        };
+            RpcResult<?> result = Rpcs.getRpcResult(successful,
+                    plannedRpcResponseValue, errors);
+            response.set(result);
+        } else {
+            String msg = "RpcResponse not expected: xid="
+                    + rpcResponse.getXid()
+                    + ", "
+                    + plannedRpcResponseValue.getClass()
+                    .getSimpleName();
+            LOG.error(msg);
+            occuredExceptions.add(new IllegalStateException(msg));
+        }
 
-        pool.schedule(notifyCmd, JOB_DELAY, TimeUnit.MILLISECONDS);
+        LOG.debug("rpc ["+rpcResponse.getXid()+"] .. done");
     }
 
     /**
@@ -509,7 +497,8 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
      */
     private static SettableFuture<RpcResult<Void>> createOneWayRpcResult() {
         SettableFuture<RpcResult<Void>> result = SettableFuture.create();
-        result.set(null);
+        List<RpcError> errors = Collections.emptyList();
+        result.set(Rpcs.getRpcResult(true, (Void) null, errors));
         return result;
     }
 
@@ -538,9 +527,7 @@ public class ConnectionAdapterStackImpl implements ConnectionAdapter, Runnable {
 
     @Override
     public void fireConnectionReadyNotification() {
-        if (connectionReadyListener != null) {
             connectionReadyListener.onConnectionReady();
-        }
     }
 
     @Override
index 8195950fb5b8f0877d81b0ae46aeced372768f66..d55ed8826e8388a99feddfd2217856e0cf879a98 100644 (file)
@@ -363,7 +363,6 @@ class MockConnectionConductor implements ConnectionConductor {
     public MessageType getMessageType() {
         return adapter.getMessageType();
     }
-
 }
 
 enum MessageType {
@@ -534,9 +533,7 @@ class MockConnectionAdapter implements ConnectionAdapter {
 
     @Override
     public void fireConnectionReadyNotification() {
-        if (connectionReadyListener != null) {
             connectionReadyListener.onConnectionReady();
-        }
     }
 
     @Override