From: Jason Ye Date: Mon, 29 Apr 2013 21:15:27 +0000 (-0700) Subject: - Application is no longer blocked when programming hundreds of flows. The Barrier... X-Git-Tag: releasepom-0.1.0~517 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1bad1b7ca9a87f9e1d32cfcf5a181354fc378ad4 - Application is no longer blocked when programming hundreds of flows. The Barrier message is now sent asynchronously. - Waiting for Barrier reply is still blocked in low level plugin, but it's per switch basis. Signed-off-by: Jason Ye --- diff --git a/opendaylight/forwardingrulesmanager/api/src/main/java/org/opendaylight/controller/forwardingrulesmanager/IForwardingRulesManager.java b/opendaylight/forwardingrulesmanager/api/src/main/java/org/opendaylight/controller/forwardingrulesmanager/IForwardingRulesManager.java index 78917c9c80..087c0bc878 100644 --- a/opendaylight/forwardingrulesmanager/api/src/main/java/org/opendaylight/controller/forwardingrulesmanager/IForwardingRulesManager.java +++ b/opendaylight/forwardingrulesmanager/api/src/main/java/org/opendaylight/controller/forwardingrulesmanager/IForwardingRulesManager.java @@ -145,17 +145,24 @@ public interface IForwardingRulesManager { public Status modifyOrAddFlowEntryAsync(FlowEntry newone); /** - * Requests ForwardingRulesManager to solicit the network node to inform - * us about the status of his execution on the asynchronous requests that - * were sent to it so far. It is a way for an application to poke the - * network node in order to get a feedback asap on the asynchronous - * requests generated by the application. It is a non-blocking call - * and does not guarantee the node will respond in any given time. + * Requests ForwardingRulesManager to solicit the network node to inform us + * about the status of its execution on the asynchronous requests that were + * sent to it so far. It is a way for an application to poke the network + * node in order to get a feedback asap on the asynchronous requests + * generated by the application. The caller may decide if this is a blocking + * or non-blocking operation. If blocking is set to true, the caller will be + * blocked until the solicitation response is received from the network node + * or receive timeout. Otherwise, it is a non-blocking call and does not + * guarantee the node will respond in any given time. * * @param node - * The network node to solicit a response + * The network node to solicit a response + * @param blocking + * The blocking mode + * @return the status of this request containing the request id associated + * to this asynchronous request */ - public void solicitStatusResponse(Node node); + public Status solicitStatusResponse(Node node, boolean blocking); /** * Check whether the passed flow entry conflicts with the Container flows diff --git a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java index fe2cdc4738..110d3381df 100644 --- a/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java +++ b/opendaylight/forwardingrulesmanager/implementation/src/main/java/org/opendaylight/controller/forwardingrulesmanager/internal/ForwardingRulesManagerImpl.java @@ -2577,10 +2577,17 @@ public class ForwardingRulesManagerImpl implements IForwardingRulesManager, } @Override - public void solicitStatusResponse(Node node) { + public Status solicitStatusResponse(Node node, boolean blocking) { + Status rv = new Status(StatusCode.INTERNALERROR); + if (this.programmer != null) { - programmer.sendBarrierMessage(node); - } + if (blocking) { + rv = programmer.syncSendBarrierMessage(node); + } else { + rv = programmer.asyncSendBarrierMessage(node); + } + } + + return rv; } - } diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java index ee46ce2302..196fbbfb4c 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/ISwitch.java @@ -217,7 +217,15 @@ public interface ISwitch { public boolean isOperational(); /** - * Sends synchronous Barrier message + * Send Barrier message synchronously. The caller will be blocked until the + * Barrier reply arrives. */ - public Object sendBarrierMessage(); + Object syncSendBarrierMessage(); + + /** + * Send Barrier message asynchronously. The caller is not blocked. The + * Barrier message will be sent in a transmit thread which will be blocked + * until the Barrier reply arrives. + */ + Object asyncSendBarrierMessage(); } diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java index 2f23d36e07..0c66dccd01 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/PriorityMessage.java @@ -23,11 +23,18 @@ class PriorityMessage { int priority; final static AtomicLong seq = new AtomicLong(); final long seqNum; - + boolean syncReply; // set to true if we want to be blocked until the response arrives + public PriorityMessage(OFMessage msg, int priority) { this.msg = msg; this.priority = priority; this.seqNum = seq.getAndIncrement(); + this.syncReply = false; + } + + public PriorityMessage(OFMessage msg, int priority, boolean syncReply) { + this(msg, priority); + this.syncReply = syncReply; } public OFMessage getMsg() { diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index 57098ae3c6..5c51cc5862 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -11,6 +11,7 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; import java.io.IOException; import java.net.SocketException; import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -239,38 +240,7 @@ public class SwitchHandler implements ISwitch { } private Object syncSend(OFMessage msg, int xid) { - SynchronousMessage worker = new SynchronousMessage(this, xid, msg); - messageWaitingDone.put(xid, worker); - Object result = null; - Boolean status = false; - Future submit = executor.submit(worker); - try { - result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); - messageWaitingDone.remove(xid); - if (result == null) { - // if result is null, then it means the switch can handle this - // message successfully - // convert the result into a Boolean with value true - status = true; - // logger.debug("Successfully send " + - // msg.getType().toString()); - result = status; - } else { - // if result is not null, this means the switch can't handle - // this message - // the result if OFError already - logger.debug("Send {} failed --> {}", msg.getType().toString(), - ((OFError) result).toString()); - } - return result; - } catch (Exception e) { - logger.warn("Timeout while waiting for {} reply", msg.getType() - .toString()); - // convert the result into a Boolean with value false - status = false; - result = status; - return result; - } + return syncMessageInternal(msg, xid, true); } /** @@ -489,7 +459,8 @@ public class SwitchHandler implements ISwitch { private void reportError(Exception e) { if (e instanceof AsynchronousCloseException || e instanceof InterruptedException - || e instanceof SocketException || e instanceof IOException) { + || e instanceof SocketException || e instanceof IOException + || e instanceof ClosedSelectorException) { logger.debug("Caught exception {}", e.getMessage()); } else { logger.warn("Caught exception ", e); @@ -758,6 +729,13 @@ public class SwitchHandler implements ISwitch { PriorityMessage pmsg = transmitQ.poll(); msgReadWriteService.asyncSend(pmsg.msg); logger.trace("Message sent: {}", pmsg.toString()); + /* + * If syncReply is set to true, wait for the response + * back. + */ + if (pmsg.syncReply) { + syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); + } } Thread.sleep(10); } catch (InterruptedException ie) { @@ -816,14 +794,35 @@ public class SwitchHandler implements ISwitch { } /** - * Sends synchronous Barrier message + * Send Barrier message synchronously. The caller will be blocked until the + * Barrier reply is received. */ @Override - public Object sendBarrierMessage() { + public Object syncSendBarrierMessage() { OFBarrierRequest barrierMsg = new OFBarrierRequest(); return syncSend(barrierMsg); } + /** + * Send Barrier message asynchronously. The caller is not blocked. The + * Barrier message will be sent in a transmit thread which will be blocked + * until the Barrier reply is received. + */ + @Override + public Object asyncSendBarrierMessage() { + if (transmitQ == null) { + return Boolean.FALSE; + } + + OFBarrierRequest barrierMsg = new OFBarrierRequest(); + int xid = getNextXid(); + + barrierMsg.setXid(xid); + transmitQ.add(new PriorityMessage(barrierMsg, 0, true)); + + return Boolean.TRUE; + } + /** * This method returns the switch liveness timeout value. If controller did * not receive any message from the switch for such a long period, @@ -844,4 +843,56 @@ public class SwitchHandler implements ISwitch { return rv; } + + /** + * This method performs synchronous operations for a given message. If + * syncRequest is set to true, the message will be sent out followed by a + * Barrier request message. Then it's blocked until the Barrier rely arrives + * or timeout. If syncRequest is false, it simply skips the message send and + * just waits for the response back. + * + * @param msg + * Message to be sent + * @param xid + * Message XID + * @param request + * If set to true, the message the message will be sent out + * followed by a Barrier request message. If set to false, it + * simply skips the sending and just waits for the Barrier reply. + * @return the result + */ + private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) { + SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest); + messageWaitingDone.put(xid, worker); + Object result = null; + Boolean status = false; + Future submit = executor.submit(worker); + try { + result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); + messageWaitingDone.remove(xid); + if (result == null) { + // if result is null, then it means the switch can handle this + // message successfully + // convert the result into a Boolean with value true + status = true; + // logger.debug("Successfully send " + + // msg.getType().toString()); + result = status; + } else { + // if result is not null, this means the switch can't handle + // this message + // the result if OFError already + logger.debug("Send {} failed --> {}", msg.getType().toString(), + ((OFError) result).toString()); + } + return result; + } catch (Exception e) { + logger.warn("Timeout while waiting for {} reply", msg.getType() + .toString()); + // convert the result into a Boolean with value false + status = false; + result = status; + return result; + } + } } diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SynchronousMessage.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SynchronousMessage.java index 4737d4cea9..bb94d4b318 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SynchronousMessage.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SynchronousMessage.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -18,15 +17,15 @@ import org.openflow.protocol.OFError; import org.openflow.protocol.OFMessage; /** - * Implements the synchronous message send to a switch - * It sends the requested message to the switch followed by a barrier request message - * It returns the result once it gets the reply from the switch or after a timeout - * If the protocol does not dictate the switch to reply the processing status for a particular message - * the barrier request forces the switch to reply saying whether or not the message processing was - * successful for messages sent to the switch up to this point - * - * - * + * This class implements synchronous operations on message send to a switch. If + * syncRequest is set to true, it sends the requested message to the switch + * followed by a Barrier request message. It returns the result once it gets the + * reply from the switch or after a timeout. If the protocol does not dictate + * the switch to reply the processing status for a particular message, the + * Barrier request forces the switch to reply saying whether or not the message + * processing was successful for messages sent to the switch up to this point. + * If syncRequest is false, it simply skips the message send and just waits for + * the response back. */ public class SynchronousMessage implements Callable { private ISwitch sw; @@ -34,21 +33,30 @@ public class SynchronousMessage implements Callable { private OFMessage syncMsg; protected CountDownLatch latch; private Object result; + private boolean syncRequest; - public SynchronousMessage(ISwitch sw, Integer xid, OFMessage msg) { + public SynchronousMessage(ISwitch sw, Integer xid, OFMessage msg, + boolean syncRequest) { this.sw = sw; this.xid = xid; syncMsg = msg; latch = new CountDownLatch(1); result = null; + this.syncRequest = syncRequest; } @Override public Object call() throws Exception { - sw.asyncSend(syncMsg, xid); - if (!(syncMsg instanceof OFBarrierRequest)) { - OFBarrierRequest barrierMsg = new OFBarrierRequest(); - sw.asyncSend(barrierMsg, xid); + /* + * Send out message only if syncRequest is set to true. Otherwise, just + * wait for the Barrier response back. + */ + if (syncRequest) { + sw.asyncSend(syncMsg, xid); + if (!(syncMsg instanceof OFBarrierRequest)) { + OFBarrierRequest barrierMsg = new OFBarrierRequest(); + sw.asyncSend(barrierMsg, xid); + } } latch.await(); return result; diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/FlowProgrammerService.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/FlowProgrammerService.java index 0dd0ca736b..96ea6cd14f 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/FlowProgrammerService.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/internal/FlowProgrammerService.java @@ -324,7 +324,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService, @Override public Status removeAllFlows(Node node) { - return new Status(StatusCode.SUCCESS, null); + return new Status(StatusCode.SUCCESS); } private String errorString(String phase, String action, String cause) { @@ -445,7 +445,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService, } @Override - public Status sendBarrierMessage(Node node) { + public Status syncSendBarrierMessage(Node node) { if (!node.getType().equals(NodeIDType.OPENFLOW)) { return new Status(StatusCode.NOTACCEPTABLE, "The node does not support Barrier message."); @@ -455,9 +455,32 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService, long swid = (Long) node.getID(); ISwitch sw = controller.getSwitch(swid); if (sw != null) { - sw.sendBarrierMessage(); + sw.syncSendBarrierMessage(); clearXid2Rid(swid); - return (new Status(StatusCode.SUCCESS, null)); + return (new Status(StatusCode.SUCCESS)); + } else { + return new Status(StatusCode.GONE, + "The node does not have a valid Switch reference."); + } + } + return new Status(StatusCode.INTERNALERROR, + "Failed to send Barrier message."); + } + + @Override + public Status asyncSendBarrierMessage(Node node) { + if (!node.getType().equals(NodeIDType.OPENFLOW)) { + return new Status(StatusCode.NOTACCEPTABLE, + "The node does not support Barrier message."); + } + + if (controller != null) { + long swid = (Long) node.getID(); + ISwitch sw = controller.getSwitch(swid); + if (sw != null) { + sw.asyncSendBarrierMessage(); + clearXid2Rid(swid); + return (new Status(StatusCode.SUCCESS)); } else { return new Status(StatusCode.GONE, "The node does not have a valid Switch reference."); @@ -499,7 +522,7 @@ public class FlowProgrammerService implements IPluginInFlowProgrammerService, int size = swxid2rid.size(); if (size % barrierMessagePriorCount == 0) { - result = sendBarrierMessage(node); + result = asyncSendBarrierMessage(node); } return result; diff --git a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IFlowProgrammerService.java b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IFlowProgrammerService.java index dbd7eafa36..d2af1a8c7c 100644 --- a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IFlowProgrammerService.java +++ b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IFlowProgrammerService.java @@ -96,7 +96,8 @@ public interface IFlowProgrammerService { Status removeAllFlows(Node node); /** - * Send synchronous Barrier message + * Send Barrier message synchronously. The caller will be blocked until the + * solicitation response arrives. * * Solicit the network node to report whether all the requests sent so far * are completed. When this call is done, caller knows that all past flow @@ -108,5 +109,20 @@ public interface IFlowProgrammerService { * The network node to solicit * @return The status of this request containing the unique request id */ - Status sendBarrierMessage(Node node); + Status syncSendBarrierMessage(Node node); + + /** + * Send Barrier message asynchronously. The caller is not blocked. + * + * Solicit the network node to report whether all the requests sent so far + * are completed. When this call is done, caller knows that all past flow + * operations requested to the node in asynchronous fashion were satisfied + * by the network node and that in case of any failure, a message was sent + * to the controller. + * + * @param node + * The network node to solicit + * @return The status of this request containing the unique request id + */ + Status asyncSendBarrierMessage(Node node); } diff --git a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IPluginInFlowProgrammerService.java b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IPluginInFlowProgrammerService.java index c72353055d..3c40b96cc7 100644 --- a/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IPluginInFlowProgrammerService.java +++ b/opendaylight/sal/api/src/main/java/org/opendaylight/controller/sal/flowprogrammer/IPluginInFlowProgrammerService.java @@ -76,9 +76,17 @@ public interface IPluginInFlowProgrammerService { Status removeAllFlows(Node node); /** - * Send synchronous Barrier message + * Send Barrier message synchronously. The caller will be blocked until the + * Barrier reply arrives. * * @param node */ - Status sendBarrierMessage(Node node); + Status syncSendBarrierMessage(Node node); + + /** + * Send Barrier message asynchronously. The caller is not blocked. + * + * @param node + */ + Status asyncSendBarrierMessage(Node node); } diff --git a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/FlowProgrammerService.java b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/FlowProgrammerService.java index 0abebf8352..6bea306669 100644 --- a/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/FlowProgrammerService.java +++ b/opendaylight/sal/implementation/src/main/java/org/opendaylight/controller/sal/implementation/internal/FlowProgrammerService.java @@ -502,11 +502,22 @@ public class FlowProgrammerService implements IFlowProgrammerService, } @Override - public Status sendBarrierMessage(Node node) { + public Status syncSendBarrierMessage(Node node) { if (this.pluginFlowProgrammer != null) { if (this.pluginFlowProgrammer.get(node.getType()) != null) { return this.pluginFlowProgrammer.get(node.getType()) - .sendBarrierMessage(node); + .syncSendBarrierMessage(node); + } + } + return new Status(StatusCode.NOSERVICE, "Plugin unuvailable"); + } + + @Override + public Status asyncSendBarrierMessage(Node node) { + if (this.pluginFlowProgrammer != null) { + if (this.pluginFlowProgrammer.get(node.getType()) != null) { + return this.pluginFlowProgrammer.get(node.getType()) + .asyncSendBarrierMessage(node); } } return new Status(StatusCode.NOSERVICE, "Plugin unuvailable");