handled review comments (sending rpc message to library and version 02/2202/1
authorAnilGujele <angujele@in.ibm.com>
Mon, 28 Oct 2013 06:57:39 +0000 (12:27 +0530)
committerAnilGujele <angujele@in.ibm.com>
Mon, 28 Oct 2013 06:57:39 +0000 (12:27 +0530)
negotiation )

Change-Id: I7c6162aacb6bcf985ef7993575302e3608dcc380
Signed-off-by: AnilGujele <angujele@in.ibm.com>
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductor.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImpl.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/IMessageDispatchService.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java [new file with mode: 0644]
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContext.java
openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/SessionContextOFImpl.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/ConnectionConductorImplTest.java
openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java [new file with mode: 0644]

index fec9ea98e695e14eaed7899c4d9e596cdb25d60d..50dd784ad391f98c964031fb1fa4b2935052157c 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.openflowplugin.openflow.md.core;
 
 import java.util.concurrent.Future;
 
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 
 
@@ -82,4 +83,10 @@ public interface ConnectionConductor {
      * @return the auxiliaryKey (null if this is a primary connection)
      */
     public SwitchConnectionDistinguisher getAuxiliaryKey();
+
+    /**
+     * @return the connectionAdapter
+     */
+    public ConnectionAdapter getConnectionAdapter();
+
 }
index af3238a4312645841f9bd746c72284ee49bd1707..ea9c2f6c7681a5fe1a7646861230748f8c470166 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Future;
@@ -19,6 +20,7 @@ import org.opendaylight.openflowplugin.openflow.core.IMessageListener;
 import org.opendaylight.openflowplugin.openflow.md.core.session.OFSessionUtil;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionContext;
 import org.opendaylight.openflowplugin.openflow.md.core.session.SessionManager;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
@@ -35,6 +37,8 @@ import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketInMessage;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortStatusMessage;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.DisconnectEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SwitchIdleEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
@@ -56,6 +60,11 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     private static final Logger LOG = LoggerFactory
             .getLogger(ConnectionConductorImpl.class);
 
+    /* variable to make BitMap-based negotiation enabled / disabled.
+     * it will help while testing and isolating issues related to processing of
+     * BitMaps from switches.
+     */
+    private static final boolean isBitmapNegotiationEnable = true;
     private LinkedBlockingQueue<Exception> errorQueue = new LinkedBlockingQueue<>();
 
     private final ConnectionAdapter connectionAdapter;
@@ -69,6 +78,10 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
 
     private ImmutableMap<Class<? extends DataObject>, Collection<IMDMessageListener>> listenerMapping;
 
+    private boolean isFirstHelloNegotiation = true;
+
+
+
     /**
      * @param connectionAdapter
      */
@@ -76,6 +89,7 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         this.connectionAdapter = connectionAdapter;
         conductorState = CONDUCTOR_STATE.HANDSHAKING;
         versionOrder = Lists.newArrayList((short) 0x04, (short) 0x01);
+        // TODO: add a thread pool to handle ErrorQueueHandler
         new Thread(new ErrorQueueHandler(errorQueue)).start();
     }
 
@@ -83,6 +97,48 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
     public void init() {
         connectionAdapter.setMessageListener(this);
         connectionAdapter.setSystemListener(this);
+        //TODO : Wait for library to provide interface from which we can send first hello message
+//        sendFirstHelloMessage();
+    }
+
+
+    /**
+     * send first hello message to switch
+     */
+    private void sendFirstHelloMessage() {
+        short highestVersion = versionOrder.get(0);
+        Long helloXid = 1L;
+        HelloInputBuilder helloInputbuilder = new HelloInputBuilder();
+        helloInputbuilder.setVersion(highestVersion);
+        helloInputbuilder.setXid(helloXid);
+        if (isBitmapNegotiationEnable) {
+            int elementsCount = highestVersion / Integer.SIZE;
+            ElementsBuilder elementsBuilder = new ElementsBuilder();
+
+            List<Elements> elementList = new ArrayList<Elements>();
+            int orderIndex = versionOrder.size();
+            int value = versionOrder.get(--orderIndex);
+            for (int index = 0; index <= elementsCount; index++) {
+                List<Boolean> booleanList = new ArrayList<Boolean>();
+                for (int i = 0; i < Integer.SIZE; i++) {
+                    if (value == ((index * Integer.SIZE) + i)) {
+                        booleanList.add(true);
+                        value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
+                    } else {
+                        booleanList.add(false);
+                    }
+                }
+                elementsBuilder.setType(HelloElementType.forValue(1));
+                elementsBuilder.setVersionBitmap(booleanList);
+                elementList.add(elementsBuilder.build());
+            }
+            helloInputbuilder.setElements(elementList);
+            LOG.debug("sending first hello message: version header={} , version bitmap={}", highestVersion, elementList);
+        } else {
+            LOG.debug("sending first hello message: version header={} ", highestVersion);
+        }
+        connectionAdapter.hello(helloInputbuilder.build());
+
     }
 
     @Override
@@ -115,6 +171,18 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         // TODO Auto-generated method stub
     }
 
+
+    /**
+     * version negotiation happened as per following steps:
+     * 1. If HelloMessage version field has same version, continue connection processing.
+     *    If HelloMessage version is lower than supported versions, just disconnect.
+     * 2. If HelloMessage contains bitmap and common version found in bitmap
+     *    then continue connection processing. if no common version found, just disconnect.
+     * 3. If HelloMessage version is not supported, send HelloMessage with lower supported version.
+     *    If Hello message received again with not supported version, just disconnect.
+     *
+     *   TODO: Better to handle handshake into a maintainable innerclass which uses State-Pattern.
+     */
     @Override
     public void onHelloMessage(HelloMessage hello) {
         // do handshake
@@ -122,53 +190,101 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         checkState(CONDUCTOR_STATE.HANDSHAKING);
 
         Short remoteVersion = hello.getVersion();
+        List<Elements> elements = hello.getElements();
         long xid = hello.getXid();
         short proposedVersion;
+        LOG.debug("Hello message version={} and bitmap={}", remoteVersion, elements);
         try {
+            // find the version from header version field
             proposedVersion = proposeVersion(remoteVersion);
-        } catch (Exception e) {
+
+        } catch (IllegalArgumentException e) {
             handleException(e);
+            connectionAdapter.disconnect();
             throw e;
         }
+
+        // sent version is equal to remote --> version is negotiated
+        if (proposedVersion == remoteVersion) {
+            LOG.debug("sending helloReply as version in header is supported: {}", proposedVersion);
+            sendHelloReply(proposedVersion, ++xid);
+            postHandshake(proposedVersion, ++xid);
+
+        } else if (isBitmapNegotiationEnable && null != elements && 0 != elements.size()) {
+            try {
+                // hello contains version bitmap, checking highest common
+                // version in bitmap
+                proposedVersion = proposeBitmapVersion(elements);
+            } catch (IllegalArgumentException ex) {
+                handleException(ex);
+                connectionAdapter.disconnect();
+                throw ex;
+            }
+            LOG.debug("sending helloReply for common bitmap version : {}", proposedVersion);
+            sendHelloReply(proposedVersion, ++xid);
+            postHandshake(proposedVersion, ++xid);
+        } else {
+            if (isFirstHelloNegotiation) {
+                isFirstHelloNegotiation = false;
+                LOG.debug("sending helloReply for lowest supported version : {}", proposedVersion);
+                // send hello reply with lower version number supported
+                sendHelloReply(proposedVersion, ++xid);
+            } else {
+                // terminate the connection.
+                LOG.debug("Version negotiation failed. unsupported version : {}", remoteVersion);
+                connectionAdapter.disconnect();
+            }
+        }
+    }
+
+    /**
+     * send hello reply
+     * @param proposedVersion
+     * @param hello
+     */
+    private void sendHelloReply(Short proposedVersion, Long xid)
+    {
         HelloInputBuilder helloBuilder = new HelloInputBuilder();
-        xid++;
         helloBuilder.setVersion(proposedVersion).setXid(xid);
-        LOG.debug("sending helloReply");
         connectionAdapter.hello(helloBuilder.build());
+    }
 
-        if (proposedVersion != remoteVersion) {
-            // need to wait for another hello
-        } else {
-            // sent version is equal to remote --> version is negotiated
-            version = proposedVersion;
-            LOG.debug("version set: " + proposedVersion);
 
-            // request features
-            GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
-            xid++;
+    /**
+     * after handshake set features, register to session
+     * @param proposedVersion
+     * @param xId
+     */
+    private void postHandshake(Short proposedVersion, Long xid) {
+        // set version
+        version = proposedVersion;
+        LOG.debug("version set: " + proposedVersion);
+        // request features
+        GetFeaturesInputBuilder featuresBuilder = new GetFeaturesInputBuilder();
             featuresBuilder.setVersion(version).setXid(xid);
-            Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
-                    .getFeatures(featuresBuilder.build());
-            LOG.debug("waiting for features");
-            RpcResult<GetFeaturesOutput> rpcFeatures;
-            try {
-                rpcFeatures = featuresFuture.get(getMaxTimeout(),
-                        TimeUnit.MILLISECONDS);
-                if (!rpcFeatures.isSuccessful()) {
-                    LOG.error("obtained features problem: "
-                            + rpcFeatures.getErrors());
-                } else {
-                    LOG.debug("obtained features: datapathId="
-                            + rpcFeatures.getResult().getDatapathId());
-                    conductorState = CONDUCTOR_STATE.WORKING;
-
-                    OFSessionUtil.registerSession(this,
-                            rpcFeatures.getResult(), version);
-                    LOG.info("handshake SETTLED");
-                }
-            } catch (Exception e) {
-                handleException(e);
+        LOG.debug("sending feature request for version={} and xid={}", version, xid);
+        Future<RpcResult<GetFeaturesOutput>> featuresFuture = connectionAdapter
+                .getFeatures(featuresBuilder.build());
+        LOG.debug("waiting for features");
+        RpcResult<GetFeaturesOutput> rpcFeatures;
+        try {
+            rpcFeatures = featuresFuture.get(getMaxTimeout(),
+                    TimeUnit.MILLISECONDS);
+            if (!rpcFeatures.isSuccessful()) {
+                LOG.error("obtained features problem: {}"
+                        , rpcFeatures.getErrors());
+            } else {
+                GetFeaturesOutput featureOutput =  rpcFeatures.getResult();
+                LOG.debug("obtained features: datapathId={}"
+                        , featureOutput.getDatapathId());
+                conductorState = CONDUCTOR_STATE.WORKING;
+
+                OFSessionUtil.registerSession(this,
+                        featureOutput, version);
+                LOG.info("handshake SETTLED: datapathId={}, auxiliaryId={}", featureOutput.getDatapathId(), featureOutput.getAuxiliaryId());
             }
+        } catch (Exception e) {
+            handleException(e);
         }
     }
 
@@ -278,6 +394,11 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         sessionManager.invalidateOnDisconnect(this);
     }
 
+    /**
+     * find supported version based on remoteVersion
+     * @param remoteVersion
+     * @return
+     */
     protected short proposeVersion(short remoteVersion) {
         Short proposal = null;
         for (short offer : versionOrder) {
@@ -293,6 +414,40 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
         return proposal;
     }
 
+    /**
+     * find common highest supported bitmap version
+     * @param list
+     * @return
+     */
+    protected short proposeBitmapVersion(List<Elements> list)
+    {
+        Short supportedHighestVersion = null;
+        if((null != list) && (0 != list.size()))
+        {
+           for(Elements element : list)
+           {
+              List<Boolean> bitmap = element.getVersionBitmap();
+              // check for version bitmap
+              for(short bitPos : versionOrder)
+              {
+                  // with all the version it should work.
+                  if(bitmap.get(bitPos % Integer.SIZE))
+                  {
+                      supportedHighestVersion = bitPos;
+                      break;
+                  }
+              }
+           }
+           if(null == supportedHighestVersion)
+            {
+                throw new IllegalArgumentException("unsupported bitmap version.");
+            }
+
+        }
+
+        return supportedHighestVersion;
+    }
+
     @Override
     public Short getVersion() {
         return version;
@@ -345,4 +500,9 @@ public class ConnectionConductorImpl implements OpenflowProtocolListener,
             }
         }
     }
+
+    @Override
+    public ConnectionAdapter getConnectionAdapter() {
+        return connectionAdapter;
+    }
 }
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/IMessageDispatchService.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/IMessageDispatchService.java
new file mode 100644 (file)
index 0000000..504596a
--- /dev/null
@@ -0,0 +1,218 @@
+package org.opendaylight.openflowplugin.openflow.md.core.session;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+/**
+ * Message Dispatch Service to send the message to switch.
+ *
+ * @author AnilGujele
+ *
+ */
+public interface IMessageDispatchService {
+
+    /**
+     * send barrier message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send experimenter message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send flow modification message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send get async message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send get config message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send get features message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send get queue config message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
+            SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send group modification message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send meter modification message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send packet out message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send port modification message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> portMod(PortModInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send role request message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send set async message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send set config message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie);
+
+    /**
+     * send table modification message to switch
+     *
+     * @param input
+     *            - message
+     * @param cookie
+     *            - to identify connection if null then feel free to send via
+     *            any connection
+     * @return - the future
+     */
+    Future<RpcResult<java.lang.Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie);
+
+}
diff --git a/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java b/openflowplugin/src/main/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImpl.java
new file mode 100644 (file)
index 0000000..bc11fac
--- /dev/null
@@ -0,0 +1,162 @@
+package org.opendaylight.openflowplugin.openflow.md.core.session;
+
+import java.util.concurrent.Future;
+
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * message dispatch service to send the message to switch.
+ *
+ * @author AnilGujele
+ *
+ */
+public class MessageDispatchServiceImpl implements IMessageDispatchService {
+
+    private static final Logger LOG = LoggerFactory.getLogger(OFSessionUtil.class);
+
+    private SessionContext session;
+
+    /**
+     * constructor
+     *
+     * @param session
+     *            - MessageDispatchService for this session
+     */
+    public MessageDispatchServiceImpl(SessionContext session) {
+        this.session = session;
+    }
+
+    /**
+     * get proper connection adapter to send the message to switch.
+     *
+     * @param - cookie to identify the right connection, it can be null also.
+     * @return connectionAdapter associated with cookie, otherwise return best
+     *         suitable connection.
+     *
+     */
+
+    private ConnectionAdapter getConnectionAdapter(SwitchConnectionDistinguisher cookie) {
+
+        if (!session.isValid()) {
+            LOG.warn("Session for the cookie {} is invalid." + cookie);
+            throw new IllegalArgumentException("Session for the cookie is invalid.");
+        }
+        LOG.debug("finding connecton for cookie value {}. " + cookie);
+        // set main connection as default
+        ConnectionAdapter connectionAdapter = session.getPrimaryConductor().getConnectionAdapter();
+        if (null != cookie) {
+            ConnectionConductor conductor = session.getAuxiliaryConductor(cookie);
+            // check if auxiliary connection exist
+            if (null != conductor) {
+                LOG.debug("found auxiliary connection for the cookie.");
+                connectionAdapter = conductor.getConnectionAdapter();
+            }
+        } else {
+            // TODO: pick connection to utilize all the available connection.
+        }
+        return connectionAdapter;
+    }
+
+    @Override
+    public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).barrier(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> experimenter(ExperimenterInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).experimenter(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> flowMod(FlowModInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).flowMod(input);
+    }
+
+    @Override
+    public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).getAsync(input);
+    }
+
+    @Override
+    public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).getConfig(input);
+    }
+
+    @Override
+    public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).getFeatures(input);
+    }
+
+    @Override
+    public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input,
+            SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).getQueueConfig(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> groupMod(GroupModInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).groupMod(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> meterMod(MeterModInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).meterMod(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> packetOut(PacketOutInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).packetOut(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> portMod(PortModInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).portMod(input);
+    }
+
+    @Override
+    public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).roleRequest(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> setAsync(SetAsyncInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).setAsync(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> setConfig(SetConfigInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).setConfig(input);
+    }
+
+    @Override
+    public Future<RpcResult<Void>> tableMod(TableModInput input, SwitchConnectionDistinguisher cookie) {
+        return getConnectionAdapter(cookie).tableMod(input);
+    }
+
+}
index e42e13ac4182613a27fb4ae28fb676afab2e774c..5b607ea5b4820838c6a4960f88ded94834321df6 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.openflowplugin.openflow.md.core.session;
 
+
 import java.util.Map.Entry;
 import java.util.Set;
 
@@ -76,4 +77,18 @@ public interface SessionContext {
 
     // TODO:: add listeners here, manager will set them and conductor use them
 
+    /**
+     *  get message dispatch service to send the message to switch
+     *
+     * @return the message service
+     */
+    public IMessageDispatchService getMessageDispatchService();
+
+   /**
+    * @return the unique xid for this session
+    */
+    public Long getNextXid();
+
+
+
 }
index 1349dd425362d9eac8ade47f019f4e9ffbff01db..96dee119422fa90b69546f92468d06941e83254c 100644 (file)
@@ -12,6 +12,7 @@ import java.util.Collections;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
 import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
@@ -27,12 +28,16 @@ public class SessionContextOFImpl implements SessionContext {
     private ConcurrentHashMap<SwitchConnectionDistinguisher, ConnectionConductor> auxiliaryConductors;
     private boolean valid;
     private SwitchConnectionDistinguisher sessionKey;
+    private IMessageDispatchService mdService;
+    private final AtomicLong xid;
 
     /**
      * default ctor
      */
     public SessionContextOFImpl() {
         auxiliaryConductors = new ConcurrentHashMap<>();
+        mdService = new MessageDispatchServiceImpl(this);
+        xid = new AtomicLong();
     }
 
     @Override
@@ -106,4 +111,14 @@ public class SessionContextOFImpl implements SessionContext {
     public SwitchConnectionDistinguisher getSessionKey() {
         return sessionKey;
     }
+
+    @Override
+    public IMessageDispatchService getMessageDispatchService() {
+        return mdService;
+    }
+
+    @Override
+    public Long getNextXid() {
+        return xid.incrementAndGet();
+    }
 }
index fa8395bae3907ef383e928c6a7ec36eacb150032..9c8488a8b213a2bb27dcec5ee8973e9459e731e2 100644 (file)
@@ -9,6 +9,8 @@
 package org.opendaylight.openflowplugin.openflow.md.core;
 
 import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Stack;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -21,15 +23,20 @@ import org.opendaylight.openflowplugin.openflow.md.core.plan.ConnectionAdapterSt
 import org.opendaylight.openflowplugin.openflow.md.core.plan.EventFactory;
 import org.opendaylight.openflowplugin.openflow.md.core.plan.SwitchTestEvent;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.ErrorType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.common.types.rev130731.HelloElementType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ErrorMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterMessageBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutputBuilder;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloMessageBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.Elements;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.hello.ElementsBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * @author mirehak
  */
@@ -142,15 +149,17 @@ public class ConnectionConductorImplTest {
         eventPlan.add(0,
                 EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
         eventPlan.add(0, EventFactory.createDefaultNotificationEvent(43L,
-                (short) 0x03, new HelloMessageBuilder()));
-        eventPlan.add(0,
-                EventFactory.createDefaultWaitForRpcEvent(44, "helloReply"));
-        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44L,
                 (short) 0x01, new HelloMessageBuilder()));
         eventPlan.add(0,
-                EventFactory.createDefaultWaitForRpcEvent(45, "helloReply"));
+                EventFactory.createDefaultWaitForRpcEvent(44, "helloReply"));
+        // Commented : connection will terminate if hello message is sent again
+        // with not supported version
+//        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44L,
+//                (short) 0x01, new HelloMessageBuilder()));
+//        eventPlan.add(0,
+//                EventFactory.createDefaultWaitForRpcEvent(45, "helloReply"));
         eventPlan.add(0,
-                EventFactory.createDefaultWaitForRpcEvent(46, "getFeatures"));
+                EventFactory.createDefaultWaitForRpcEvent(45, "getFeatures"));
         GetFeaturesOutputBuilder getFeaturesOutputBuilder = new GetFeaturesOutputBuilder();
         getFeaturesOutputBuilder.setDatapathId(new BigInteger("102030405060"));
         getFeaturesOutputBuilder.setAuxiliaryId((short) 0);
@@ -159,7 +168,7 @@ public class ConnectionConductorImplTest {
         getFeaturesOutputBuilder.setTables((short) 2);
         getFeaturesOutputBuilder.setCapabilities(84L);
 
-        eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(46,
+        eventPlan.add(0, EventFactory.createDefaultRpcResponseEvent(45,
                 EventFactory.DEFAULT_VERSION, getFeaturesOutputBuilder));
 
         executeNow();
@@ -355,4 +364,214 @@ public class ConnectionConductorImplTest {
         }
     }
 
+    //////// Start - Version Negotiation Test //////////////
+
+    /**
+     * Test of version negotiation Where switch version = 1.0
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation10() throws Exception {
+        Short version = (short) 0x01;
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42, version, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(44, "getFeatures"));
+        eventPlan.add(0,
+                EventFactory.createDefaultRpcResponseEvent(44, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+
+        executeNow();
+        Assert.assertEquals(version, connectionConductor.getVersion());
+    }
+
+    /**
+     * Test of version negotiation Where switch version < 1.0
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation00() throws Exception {
+        Short version = (short) 0x00;
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
+        executeNow();
+        Assert.assertNull(connectionConductor.getVersion());
+    }
+
+    /**
+     * Test of version negotiation Where 1.0 < switch version < 1.3
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation11() throws Exception {
+        Short version = (short) 0x02;
+        Short expVersion = (short) 0x01;
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
+        Assert.assertNull(connectionConductor.getVersion());
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44, expVersion, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(45, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(46, "getFeatures"));
+        eventPlan.add(0,
+                EventFactory.createDefaultRpcResponseEvent(46, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+        executeNow();
+        Assert.assertEquals(expVersion, connectionConductor.getVersion());
+
+    }
+
+    /**
+     * Test of version negotiation Where switch version = 1.3
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation13() throws Exception {
+        Short version = (short) 0x04;
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(44, "getFeatures"));
+        eventPlan.add(0,
+                EventFactory.createDefaultRpcResponseEvent(44, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+
+        executeNow();
+        Assert.assertEquals(version, connectionConductor.getVersion());
+    }
+
+    /**
+     * Test of version negotiation Where switch version >= 1.3
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation15() throws Exception {
+        Short version = (short) 0x06;
+        Short expVersion = (short) 0x04;
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
+        Assert.assertNull(connectionConductor.getVersion());
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44, expVersion, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(45, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(46, "getFeatures"));
+        eventPlan.add(0,
+                EventFactory.createDefaultRpcResponseEvent(46, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+        executeNow();
+        Assert.assertEquals(expVersion, connectionConductor.getVersion());
+    }
+
+    /**
+     * Test of version negotiation Where switch version > 1.3
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation15_MultipleCall() throws Exception {
+        Short version = (short) 0x06;
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(42L, version, new HelloMessageBuilder()));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
+        Assert.assertNull(connectionConductor.getVersion());
+        eventPlan.add(0, EventFactory.createDefaultNotificationEvent(44, version, new HelloMessageBuilder()));
+        executeNow();
+        // TODO : check for connection termination
+        Assert.assertNull(connectionConductor.getVersion());
+    }
+
+    /**
+     * Test of version negotiation Where bitmap version {0x05,0x01}
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation10InBitmap() throws Exception {
+        Short version = (short) 0x01;
+        eventPlan.add(
+                0,
+                EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
+                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x01))));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(44, "getFeatures"));
+        eventPlan.add(0,
+                EventFactory.createDefaultRpcResponseEvent(44, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+
+        executeNow();
+        Assert.assertEquals(version, connectionConductor.getVersion());
+    }
+
+    /**
+     * Test of version negotiation Where bitmap version {0x05,0x04}
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiation13InBitmap() throws Exception {
+        Short version = (short) 0x04;
+        eventPlan.add(
+                0,
+                EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
+                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x04))));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(43, "helloReply"));
+        eventPlan.add(0, EventFactory.createDefaultWaitForRpcEvent(44, "getFeatures"));
+        eventPlan.add(0,
+                EventFactory.createDefaultRpcResponseEvent(44, EventFactory.DEFAULT_VERSION, getFeatureResponseMsg()));
+
+        executeNow();
+        Assert.assertEquals(version, connectionConductor.getVersion());
+    }
+
+    /**
+     * Test of version negotiation Where bitmap version {0x05,0x02}
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testVersionNegotiationNoCommonVersionInBitmap() throws Exception {
+        eventPlan.add(
+                0,
+                EventFactory.createDefaultNotificationEvent(42L, (short) 0x05,
+                        getHelloBitmapMessage(Lists.newArrayList((short) 0x05, (short) 0x04))));
+        executeNow();
+        Assert.assertNull(connectionConductor.getVersion());
+    }
+
+    private HelloMessageBuilder getHelloBitmapMessage(List<Short> versionOrder) {
+        short highestVersion = versionOrder.get(0);
+        int elementsCount = highestVersion / Integer.SIZE;
+        ElementsBuilder elementsBuilder = new ElementsBuilder();
+
+        List<Elements> elementList = new ArrayList<Elements>();
+        int orderIndex = versionOrder.size();
+        int value = versionOrder.get(--orderIndex);
+        for (int index = 0; index <= elementsCount; index++) {
+            List<Boolean> booleanList = new ArrayList<Boolean>();
+            for (int i = 0; i < Integer.SIZE; i++) {
+                if (value == ((index * Integer.SIZE) + i)) {
+                    booleanList.add(true);
+                    value = (orderIndex == 0) ? highestVersion : versionOrder.get(--orderIndex);
+                } else {
+                    booleanList.add(false);
+                }
+            }
+            elementsBuilder.setType(HelloElementType.forValue(1));
+            elementsBuilder.setVersionBitmap(booleanList);
+            elementList.add(elementsBuilder.build());
+        }
+
+        HelloMessageBuilder builder = new HelloMessageBuilder();
+        builder.setXid(10L);
+        builder.setVersion(highestVersion);
+        builder.setElements(elementList);
+        return builder;
+
+    }
+
+    private GetFeaturesOutputBuilder getFeatureResponseMsg() {
+        GetFeaturesOutputBuilder getFeaturesOutputBuilder = new GetFeaturesOutputBuilder();
+        getFeaturesOutputBuilder.setDatapathId(new BigInteger("102030405060"));
+        getFeaturesOutputBuilder.setAuxiliaryId((short) 0);
+        getFeaturesOutputBuilder.setBuffers(4L);
+        getFeaturesOutputBuilder.setReserved(0L);
+        getFeaturesOutputBuilder.setTables((short) 2);
+        getFeaturesOutputBuilder.setCapabilities(84L);
+
+        return getFeaturesOutputBuilder;
+    }
 }
diff --git a/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java b/openflowplugin/src/test/java/org/opendaylight/openflowplugin/openflow/md/core/session/MessageDispatchServiceImplTest.java
new file mode 100644 (file)
index 0000000..d4074d1
--- /dev/null
@@ -0,0 +1,482 @@
+package org.opendaylight.openflowplugin.openflow.md.core.session;
+
+import java.math.BigInteger;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+import junit.framework.Assert;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.openflowjava.protocol.api.connection.ConnectionAdapter;
+import org.opendaylight.openflowplugin.openflow.md.core.ConnectionConductor;
+import org.opendaylight.openflowplugin.openflow.md.core.SwitchConnectionDistinguisher;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.BarrierOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.ExperimenterInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.FlowModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetAsyncOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetFeaturesOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GetQueueConfigOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.GroupModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.HelloInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.MeterModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OpenflowProtocolListener;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PacketOutInputBuilder;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.PortModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.RoleRequestOutput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetAsyncInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.SetConfigInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.TableModInput;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.system.rev130927.SystemNotificationsListener;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+
+public class MessageDispatchServiceImplTest {
+
+    MockSessionContext session;
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @Before
+    public void setUp() throws Exception {
+        session = new MockSessionContext(0);
+
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @After
+    public void tearDown() throws Exception {
+
+    }
+
+    /**
+     * Test barrier message for null cookie
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testBarrierMessageForPrimary() throws Exception {
+        session.getMessageDispatchService().barrier(null, null);
+        Assert.assertEquals(MessageType.BARRIER, session.getPrimaryConductor().getMessageType());
+    }
+
+    /**
+     * Test packet out message for primary connection
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testPacketOutMessageForPrimary() throws Exception {
+        session.getMessageDispatchService().packetOut(null, null);
+        Assert.assertEquals(MessageType.PACKETOUT, session.getPrimaryConductor().getMessageType());
+    }
+
+    /**
+     * Test packet out message for auxiliary connection
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testPacketOutMessageForAuxiliary() throws Exception {
+        MockConnectionConductor conductor = new MockConnectionConductor(1);
+        SwitchConnectionDistinguisher cookie = conductor.getAuxiliaryKey();
+        session.addAuxiliaryConductor(cookie, conductor);
+        session.getMessageDispatchService().packetOut(null, cookie);
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+        conductor = (MockConnectionConductor) session.getAuxiliaryConductor(cookie);
+        Assert.assertEquals(MessageType.PACKETOUT, conductor.getMessageType());
+    }
+
+    /**
+     * Test packet out message when multiple auxiliary connection exist
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testPacketOutMessageForMultipleAuxiliary() throws Exception {
+        MockConnectionConductor conductor1 = new MockConnectionConductor(1);
+        SwitchConnectionDistinguisher cookie1 = conductor1.getAuxiliaryKey();
+        session.addAuxiliaryConductor(cookie1, conductor1);
+        MockConnectionConductor conductor2 = new MockConnectionConductor(2);
+        SwitchConnectionDistinguisher cookie2 = conductor2.getAuxiliaryKey();
+        session.addAuxiliaryConductor(cookie2, conductor2);
+        MockConnectionConductor conductor3 = new MockConnectionConductor(3);
+        SwitchConnectionDistinguisher cookie3 = conductor3.getAuxiliaryKey();
+        session.addAuxiliaryConductor(cookie3, conductor3);
+        PacketOutInputBuilder builder = new PacketOutInputBuilder();
+        // send message
+        session.getMessageDispatchService().packetOut(builder.build(), cookie2);
+
+        Assert.assertEquals(MessageType.NONE, session.getPrimaryConductor().getMessageType());
+
+        conductor3 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie3);
+        Assert.assertEquals(MessageType.NONE, conductor3.getMessageType());
+
+        conductor2 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie2);
+        Assert.assertEquals(MessageType.PACKETOUT, conductor2.getMessageType());
+
+        conductor1 = (MockConnectionConductor) session.getAuxiliaryConductor(cookie1);
+        Assert.assertEquals(MessageType.NONE, conductor1.getMessageType());
+
+    }
+
+    /**
+     * Test for invalid session
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testInvalidSession() throws Exception {
+        session.setValid(false);
+        try {
+            session.getMessageDispatchService().packetOut(null, null);
+            Assert.assertTrue(false);
+        } catch (IllegalArgumentException ex) {
+            Assert.assertTrue(true);
+        }
+    }
+
+}
+
+class MockSessionContext implements SessionContext {
+    private MockConnectionConductor conductor;
+    private Map<SwitchConnectionDistinguisher, ConnectionConductor> map;
+    private IMessageDispatchService messageService;
+    private boolean isValid = true;
+
+    MockSessionContext(int conductorNum) {
+        conductor = new MockConnectionConductor(conductorNum);
+        map = new HashMap<SwitchConnectionDistinguisher, ConnectionConductor>();
+        messageService = new MessageDispatchServiceImpl(this);
+    }
+
+    @Override
+    public MockConnectionConductor getPrimaryConductor() {
+        // TODO Auto-generated method stub
+        return conductor;
+    }
+
+    @Override
+    public GetFeaturesOutput getFeatures() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public ConnectionConductor getAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey) {
+
+        return map.get(auxiliaryKey);
+    }
+
+    @Override
+    public Set<Entry<SwitchConnectionDistinguisher, ConnectionConductor>> getAuxiliaryConductors() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void addAuxiliaryConductor(SwitchConnectionDistinguisher auxiliaryKey, ConnectionConductor conductor) {
+        map.put(auxiliaryKey, conductor);
+    }
+
+    @Override
+    public ConnectionConductor removeAuxiliaryConductor(SwitchConnectionDistinguisher connectionCookie) {
+        return map.remove(connectionCookie);
+    }
+
+    @Override
+    public boolean isValid() {
+        // TODO Auto-generated method stub
+        return isValid;
+    }
+
+    @Override
+    public void setValid(boolean valid) {
+        isValid = valid;
+    }
+
+    @Override
+    public SwitchConnectionDistinguisher getSessionKey() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public IMessageDispatchService getMessageDispatchService() {
+        // TODO Auto-generated method stub
+        return messageService;
+    }
+
+    @Override
+    public Long getNextXid() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}
+
+class MockConnectionConductor implements ConnectionConductor {
+
+    private int conductorNum;
+    private MockConnectionAdapter adapter;
+
+    public MockConnectionConductor(int conductorNumber) {
+        conductorNum = conductorNumber;
+        adapter = new MockConnectionAdapter();
+    }
+
+    @Override
+    public void init() {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public Short getVersion() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public CONDUCTOR_STATE getConductorState() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setConductorState(CONDUCTOR_STATE conductorState) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public Future<Boolean> disconnect() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void setSessionContext(SessionContext context) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void setConnectionCookie(SwitchConnectionDistinguisher auxiliaryKey) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public SessionContext getSessionContext() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public SwitchConnectionDistinguisher getAuxiliaryKey() {
+        if (0 != conductorNum) {
+            SwitchConnectionCookieOFImpl key = new SwitchConnectionCookieOFImpl();
+            key.setDatapathId(BigInteger.valueOf(10L));
+            key.setAuxiliaryId((short) conductorNum);
+            key.initId();
+            return key;
+        }
+        return null;
+    }
+
+    @Override
+    public ConnectionAdapter getConnectionAdapter() {
+        // TODO Auto-generated method stub
+        return adapter;
+    }
+
+    public MessageType getMessageType() {
+        return adapter.getMessageType();
+    }
+
+}
+
+enum MessageType {
+    NONE, BARRIER, FLOWMOD, TABLEMOD, PACKETOUT;
+}
+
+class MockConnectionAdapter implements ConnectionAdapter {
+
+    private MessageType messageType;
+
+    public MockConnectionAdapter() {
+        setMessageType(MessageType.NONE);
+    }
+
+    @Override
+    public Future<RpcResult<BarrierOutput>> barrier(BarrierInput input) {
+        setMessageType(MessageType.BARRIER);
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<EchoOutput>> echo(EchoInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> echoReply(EchoReplyInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> experimenter(ExperimenterInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> flowMod(FlowModInput input) {
+        setMessageType(MessageType.FLOWMOD);
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetAsyncOutput>> getAsync(GetAsyncInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetConfigOutput>> getConfig(GetConfigInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetFeaturesOutput>> getFeatures(GetFeaturesInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<GetQueueConfigOutput>> getQueueConfig(GetQueueConfigInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> groupMod(GroupModInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> hello(HelloInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> meterMod(MeterModInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> packetOut(PacketOutInput input) {
+        setMessageType(MessageType.PACKETOUT);
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> portMod(PortModInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<RoleRequestOutput>> roleRequest(RoleRequestInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> setAsync(SetAsyncInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> setConfig(SetConfigInput input) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Future<RpcResult<Void>> tableMod(TableModInput input) {
+        setMessageType(MessageType.TABLEMOD);
+        return null;
+    }
+
+    @Override
+    public Future<Boolean> disconnect() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public boolean isAlive() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+    @Override
+    public void setMessageListener(OpenflowProtocolListener messageListener) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void setSystemListener(SystemNotificationsListener systemListener) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void checkListeners() {
+        // TODO Auto-generated method stub
+
+    }
+
+    /**
+     * @return the messageType
+     */
+    public MessageType getMessageType() {
+        return messageType;
+    }
+
+    /**
+     * @param messageType
+     *            the messageType to set
+     */
+    public void setMessageType(MessageType messageType) {
+        this.messageType = messageType;
+    }
+
+}