- Application is no longer blocked when programming hundreds of flows. The Barrier... 66/266/1
authorJason Ye <yisye@cisco.com>
Mon, 29 Apr 2013 21:15:27 +0000 (14:15 -0700)
committerJason Ye <yisye@cisco.com>
Mon, 29 Apr 2013 21:15:27 +0000 (14:15 -0700)
- Waiting for Barrier reply is still blocked in low level plugin, but it's per switch basis.
Signed-off-by: Jason Ye <yisye@cisco.com>
opendaylight/forwardingrulesmanager/api/src/main/java/org/opendaylight/controller/forwardingrulesmanager/IForwardingRulesManager.java
opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SynchronousMessage.java
opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/FlowProgrammerService.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IFlowProgrammerService.java
opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IPluginInFlowProgrammerService.java
opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/FlowProgrammerService.java

index 78917c9..087c0bc 100644 (file)
@@ -145,17 +145,24 @@ public interface IForwardingRulesManager {
     public Status modifyOrAddFlowEntryAsync(FlowEntry newone);
 
     /**
-     * Requests ForwardingRulesManager to solicit the network node to inform
-     * us about the status of his execution on the asynchronous requests that
-     * were sent to it so far. It is a way for an application to poke the
-     * network node in order to get a feedback asap on the asynchronous
-     * requests generated by the application. It is a non-blocking call
-     * and does not guarantee the node will respond in any given time. 
+     * Requests ForwardingRulesManager to solicit the network node to inform us
+     * about the status of its execution on the asynchronous requests that were
+     * sent to it so far. It is a way for an application to poke the network
+     * node in order to get a feedback asap on the asynchronous requests
+     * generated by the application. The caller may decide if this is a blocking
+     * or non-blocking operation. If blocking is set to true, the caller will be
+     * blocked until the solicitation response is received from the network node
+     * or receive timeout. Otherwise, it is a non-blocking call and does not
+     * guarantee the node will respond in any given time.
      * 
      * @param node
-     *          The network node to solicit a response
+     *            The network node to solicit a response
+     * @param blocking
+     *            The blocking mode
+     * @return the status of this request containing the request id associated
+     *         to this asynchronous request
      */
-    public void solicitStatusResponse(Node node);
+    public Status solicitStatusResponse(Node node, boolean blocking);
 
     /**
      * Check whether the passed flow entry conflicts with the Container flows
index fe2cdc4..110d338 100644 (file)
@@ -2577,10 +2577,17 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager,
     }
     
     @Override
-    public void solicitStatusResponse(Node node) {
+    public Status solicitStatusResponse(Node node, boolean blocking) {
+        Status rv = new Status(StatusCode.INTERNALERROR);
+        
         if (this.programmer != null) {
-            programmer.sendBarrierMessage(node);
-        }        
+            if (blocking) {
+                rv = programmer.syncSendBarrierMessage(node);
+            } else {
+                rv = programmer.asyncSendBarrierMessage(node);                
+            }
+        }
+        
+        return rv;
     }
-
 }
index ee46ce2..196fbbf 100644 (file)
@@ -217,7 +217,15 @@ public interface ISwitch {
     public boolean isOperational();
 
     /**
-     * Sends synchronous Barrier message 
+     * Send Barrier message synchronously. The caller will be blocked until the
+     * Barrier reply arrives.
      */
-    public Object sendBarrierMessage();
+    Object syncSendBarrierMessage();
+
+    /**
+     * Send Barrier message asynchronously. The caller is not blocked. The
+     * Barrier message will be sent in a transmit thread which will be blocked
+     * until the Barrier reply arrives.
+     */
+    Object asyncSendBarrierMessage();
 }
index 2f23d36..0c66dcc 100644 (file)
@@ -23,11 +23,18 @@ class PriorityMessage {
     int priority;
     final static AtomicLong seq = new AtomicLong();
     final long seqNum;
-
+    boolean syncReply; // set to true if we want to be blocked until the response arrives
+    
     public PriorityMessage(OFMessage msg, int priority) {
         this.msg = msg;
         this.priority = priority;
         this.seqNum = seq.getAndIncrement();
+        this.syncReply = false;
+    }
+
+    public PriorityMessage(OFMessage msg, int priority, boolean syncReply) {
+        this(msg, priority);
+        this.syncReply = syncReply;
     }
 
     public OFMessage getMsg() {
index 57098ae..5c51cc5 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
 import java.io.IOException;
 import java.net.SocketException;
 import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
@@ -239,38 +240,7 @@ public class SwitchHandler implements ISwitch {
     }
 
     private Object syncSend(OFMessage msg, int xid) {
-        SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
-        messageWaitingDone.put(xid, worker);
-        Object result = null;
-        Boolean status = false;
-        Future<Object> submit = executor.submit(worker);
-        try {
-            result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
-            messageWaitingDone.remove(xid);
-            if (result == null) {
-                // if result is null, then it means the switch can handle this
-                // message successfully
-                // convert the result into a Boolean with value true
-                status = true;
-                // logger.debug("Successfully send " +
-                // msg.getType().toString());
-                result = status;
-            } else {
-                // if result is not null, this means the switch can't handle
-                // this message
-                // the result if OFError already
-                logger.debug("Send {} failed --> {}", msg.getType().toString(),
-                        ((OFError) result).toString());
-            }
-            return result;
-        } catch (Exception e) {
-            logger.warn("Timeout while waiting for {} reply", msg.getType()
-                    .toString());
-            // convert the result into a Boolean with value false
-            status = false;
-            result = status;
-            return result;
-        }
+        return syncMessageInternal(msg, xid, true);
     }
 
     /**
@@ -489,7 +459,8 @@ public class SwitchHandler implements ISwitch {
     private void reportError(Exception e) {
         if (e instanceof AsynchronousCloseException
                 || e instanceof InterruptedException
-                || e instanceof SocketException || e instanceof IOException) {
+                || e instanceof SocketException || e instanceof IOException
+                || e instanceof ClosedSelectorException) {
             logger.debug("Caught exception {}", e.getMessage());
         } else {
             logger.warn("Caught exception ", e);
@@ -758,6 +729,13 @@ public class SwitchHandler implements ISwitch {
                         PriorityMessage pmsg = transmitQ.poll();
                         msgReadWriteService.asyncSend(pmsg.msg);
                         logger.trace("Message sent: {}", pmsg.toString());
+                        /*
+                         * If syncReply is set to true, wait for the response
+                         * back.
+                         */
+                        if (pmsg.syncReply) {
+                            syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+                        }
                     }
                     Thread.sleep(10);
                 } catch (InterruptedException ie) {
@@ -816,14 +794,35 @@ public class SwitchHandler implements ISwitch {
     }
 
     /**
-     * Sends synchronous Barrier message
+     * Send Barrier message synchronously. The caller will be blocked until the
+     * Barrier reply is received.
      */
     @Override
-    public Object sendBarrierMessage() {
+    public Object syncSendBarrierMessage() {
         OFBarrierRequest barrierMsg = new OFBarrierRequest();
         return syncSend(barrierMsg);
     }
 
+    /**
+     * Send Barrier message asynchronously. The caller is not blocked. The
+     * Barrier message will be sent in a transmit thread which will be blocked
+     * until the Barrier reply is received.
+     */
+    @Override
+    public Object asyncSendBarrierMessage() {
+        if (transmitQ == null) {
+            return Boolean.FALSE;
+        }
+
+        OFBarrierRequest barrierMsg = new OFBarrierRequest();
+        int xid = getNextXid();
+
+        barrierMsg.setXid(xid);
+        transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+        
+        return Boolean.TRUE;
+    }
+
     /**
      * This method returns the switch liveness timeout value. If controller did
      * not receive any message from the switch for such a long period,
@@ -844,4 +843,56 @@ public class SwitchHandler implements ISwitch {
 
         return rv;
     }
+
+    /**
+     * This method performs synchronous operations for a given message. If
+     * syncRequest is set to true, the message will be sent out followed by a
+     * Barrier request message. Then it's blocked until the Barrier rely arrives
+     * or timeout. If syncRequest is false, it simply skips the message send and
+     * just waits for the response back.
+     * 
+     * @param msg
+     *            Message to be sent
+     * @param xid
+     *            Message XID
+     * @param request
+     *            If set to true, the message the message will be sent out
+     *            followed by a Barrier request message. If set to false, it
+     *            simply skips the sending and just waits for the Barrier reply.
+     * @return the result
+     */
+    private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+        SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+        messageWaitingDone.put(xid, worker);
+        Object result = null;
+        Boolean status = false;
+        Future<Object> submit = executor.submit(worker);
+        try {
+            result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+            messageWaitingDone.remove(xid);
+            if (result == null) {
+                // if result is null, then it means the switch can handle this
+                // message successfully
+                // convert the result into a Boolean with value true
+                status = true;
+                // logger.debug("Successfully send " +
+                // msg.getType().toString());
+                result = status;
+            } else {
+                // if result is not null, this means the switch can't handle
+                // this message
+                // the result if OFError already
+                logger.debug("Send {} failed --> {}", msg.getType().toString(),
+                        ((OFError) result).toString());
+            }
+            return result;
+        } catch (Exception e) {
+            logger.warn("Timeout while waiting for {} reply", msg.getType()
+                    .toString());
+            // convert the result into a Boolean with value false
+            status = false;
+            result = status;
+            return result;
+        }
+    }
 }
index 4737d4c..bb94d4b 100644 (file)
@@ -1,4 +1,3 @@
-
 /*
  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
  *
@@ -18,15 +17,15 @@ import org.openflow.protocol.OFError;
 import org.openflow.protocol.OFMessage;
 
 /**
- * Implements the synchronous message send to a switch
- * It sends the requested message to the switch followed by a barrier request message
- * It returns the result once it gets the reply from the switch or after a timeout
- * If the protocol does not dictate the switch to reply the processing status for a particular message
- * the barrier request forces the switch to reply saying whether or not the message processing was
- * successful for messages sent to the switch up to this point
- *
- *
- *
+ * This class implements synchronous operations on message send to a switch. If
+ * syncRequest is set to true, it sends the requested message to the switch
+ * followed by a Barrier request message. It returns the result once it gets the
+ * reply from the switch or after a timeout. If the protocol does not dictate
+ * the switch to reply the processing status for a particular message, the
+ * Barrier request forces the switch to reply saying whether or not the message
+ * processing was successful for messages sent to the switch up to this point.
+ * If syncRequest is false, it simply skips the message send and just waits for
+ * the response back.
  */
 public class SynchronousMessage implements Callable<Object> {
     private ISwitch sw;
@@ -34,21 +33,30 @@ public class SynchronousMessage implements Callable<Object> {
     private OFMessage syncMsg;
     protected CountDownLatch latch;
     private Object result;
+    private boolean syncRequest;
 
-    public SynchronousMessage(ISwitch sw, Integer xid, OFMessage msg) {
+    public SynchronousMessage(ISwitch sw, Integer xid, OFMessage msg,
+            boolean syncRequest) {
         this.sw = sw;
         this.xid = xid;
         syncMsg = msg;
         latch = new CountDownLatch(1);
         result = null;
+        this.syncRequest = syncRequest;
     }
 
     @Override
     public Object call() throws Exception {
-        sw.asyncSend(syncMsg, xid);
-        if (!(syncMsg instanceof OFBarrierRequest)) {
-            OFBarrierRequest barrierMsg = new OFBarrierRequest();
-            sw.asyncSend(barrierMsg, xid);
+        /*
+         * Send out message only if syncRequest is set to true. Otherwise, just
+         * wait for the Barrier response back.
+         */
+        if (syncRequest) {
+            sw.asyncSend(syncMsg, xid);
+            if (!(syncMsg instanceof OFBarrierRequest)) {
+                OFBarrierRequest barrierMsg = new OFBarrierRequest();
+                sw.asyncSend(barrierMsg, xid);
+            }
         }
         latch.await();
         return result;
index 0dd0ca7..96ea6cd 100644 (file)
@@ -324,7 +324,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
 
     @Override
     public Status removeAllFlows(Node node) {
-        return new Status(StatusCode.SUCCESS, null);
+        return new Status(StatusCode.SUCCESS);
     }
 
     private String errorString(String phase, String action, String cause) {
@@ -445,7 +445,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
     }
 
     @Override
-    public Status sendBarrierMessage(Node node) {
+    public Status syncSendBarrierMessage(Node node) {
         if (!node.getType().equals(NodeIDType.OPENFLOW)) {
             return new Status(StatusCode.NOTACCEPTABLE,
                     "The node does not support Barrier message.");
@@ -455,9 +455,32 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
             long swid = (Long) node.getID();
             ISwitch sw = controller.getSwitch(swid);
             if (sw != null) {
-                sw.sendBarrierMessage();
+                sw.syncSendBarrierMessage();
                 clearXid2Rid(swid);
-                return (new Status(StatusCode.SUCCESS, null));
+                return (new Status(StatusCode.SUCCESS));
+            } else {
+                return new Status(StatusCode.GONE,
+                        "The node does not have a valid Switch reference.");
+            }
+        }
+        return new Status(StatusCode.INTERNALERROR,
+                "Failed to send Barrier message.");
+    }
+    
+    @Override
+    public Status asyncSendBarrierMessage(Node node) {
+        if (!node.getType().equals(NodeIDType.OPENFLOW)) {
+            return new Status(StatusCode.NOTACCEPTABLE,
+                    "The node does not support Barrier message.");
+        }
+
+        if (controller != null) {
+            long swid = (Long) node.getID();
+            ISwitch sw = controller.getSwitch(swid);
+            if (sw != null) {
+                sw.asyncSendBarrierMessage();
+                clearXid2Rid(swid);
+                return (new Status(StatusCode.SUCCESS));
             } else {
                 return new Status(StatusCode.GONE,
                         "The node does not have a valid Switch reference.");
@@ -499,7 +522,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService,
         
         int size = swxid2rid.size();
         if (size % barrierMessagePriorCount == 0) {
-            result = sendBarrierMessage(node);
+            result = asyncSendBarrierMessage(node);
         }
         
         return result;
index dbd7eaf..d2af1a8 100644 (file)
@@ -96,7 +96,8 @@ public interface IFlowProgrammerService {
     Status removeAllFlows(Node node);
 
     /**
-     * Send synchronous Barrier message
+     * Send Barrier message synchronously. The caller will be blocked until the
+     * solicitation response arrives.
      * 
      * Solicit the network node to report whether all the requests sent so far
      * are completed. When this call is done, caller knows that all past flow
@@ -108,5 +109,20 @@ public interface IFlowProgrammerService {
      *            The network node to solicit
      * @return The status of this request containing the unique request id
      */
-    Status sendBarrierMessage(Node node);
+    Status syncSendBarrierMessage(Node node);
+
+    /**
+     * Send Barrier message asynchronously. The caller is not blocked.
+     * 
+     * Solicit the network node to report whether all the requests sent so far
+     * are completed. When this call is done, caller knows that all past flow
+     * operations requested to the node in asynchronous fashion were satisfied
+     * by the network node and that in case of any failure, a message was sent
+     * to the controller.
+     * 
+     * @param node
+     *            The network node to solicit
+     * @return The status of this request containing the unique request id
+     */
+    Status asyncSendBarrierMessage(Node node);
 }
index c723530..3c40b96 100644 (file)
@@ -76,9 +76,17 @@ public interface IPluginInFlowProgrammerService {
     Status removeAllFlows(Node node);
 
     /**
-     * Send synchronous Barrier message 
+     * Send Barrier message synchronously. The caller will be blocked until the
+     * Barrier reply arrives.
      * 
      * @param node
      */
-    Status sendBarrierMessage(Node node);
+    Status syncSendBarrierMessage(Node node);
+
+    /**
+     * Send Barrier message asynchronously. The caller is not blocked.
+     * 
+     * @param node
+     */
+    Status asyncSendBarrierMessage(Node node);
 }
index 0abebf8..6bea306 100644 (file)
@@ -502,11 +502,22 @@ public class FlowProgrammerService implements IFlowProgrammerService,
     }
 
     @Override
-    public Status sendBarrierMessage(Node node) {
+    public Status syncSendBarrierMessage(Node node) {
         if (this.pluginFlowProgrammer != null) {
             if (this.pluginFlowProgrammer.get(node.getType()) != null) {
                 return this.pluginFlowProgrammer.get(node.getType())
-                        .sendBarrierMessage(node);
+                        .syncSendBarrierMessage(node);
+            }
+        }
+        return new Status(StatusCode.NOSERVICE, "Plugin unuvailable");
+    }
+
+    @Override
+    public Status asyncSendBarrierMessage(Node node) {
+        if (this.pluginFlowProgrammer != null) {
+            if (this.pluginFlowProgrammer.get(node.getType()) != null) {
+                return this.pluginFlowProgrammer.get(node.getType())
+                        .asyncSendBarrierMessage(node);
             }
         }
         return new Status(StatusCode.NOSERVICE, "Plugin unuvailable");