- Waiting for Barrier reply is still blocked in low level plugin, but it's per switch basis.
Signed-off-by: Jason Ye <yisye@cisco.com>
public Status modifyOrAddFlowEntryAsync(FlowEntry newone);
/**
- * Requests ForwardingRulesManager to solicit the network node to inform
- * us about the status of his execution on the asynchronous requests that
- * were sent to it so far. It is a way for an application to poke the
- * network node in order to get a feedback asap on the asynchronous
- * requests generated by the application. It is a non-blocking call
- * and does not guarantee the node will respond in any given time.
+ * Requests ForwardingRulesManager to solicit the network node to inform us
+ * about the status of its execution on the asynchronous requests that were
+ * sent to it so far. It is a way for an application to poke the network
+ * node in order to get a feedback asap on the asynchronous requests
+ * generated by the application. The caller may decide if this is a blocking
+ * or non-blocking operation. If blocking is set to true, the caller will be
+ * blocked until the solicitation response is received from the network node
+ * or receive timeout. Otherwise, it is a non-blocking call and does not
+ * guarantee the node will respond in any given time.
*
* @param node
- * The network node to solicit a response
+ * The network node to solicit a response
+ * @param blocking
+ * The blocking mode
+ * @return the status of this request containing the request id associated
+ * to this asynchronous request
*/
- public void solicitStatusResponse(Node node);
+ public Status solicitStatusResponse(Node node, boolean blocking);
/**
* Check whether the passed flow entry conflicts with the Container flows
}
@Override
- public void solicitStatusResponse(Node node) {
+ public Status solicitStatusResponse(Node node, boolean blocking) {
+ Status rv = new Status(StatusCode.INTERNALERROR);
+
if (this.programmer != null) {
- programmer.sendBarrierMessage(node);
- }
+ if (blocking) {
+ rv = programmer.syncSendBarrierMessage(node);
+ } else {
+ rv = programmer.asyncSendBarrierMessage(node);
+ }
+ }
+
+ return rv;
}
-
}
public boolean isOperational();
/**
- * Sends synchronous Barrier message
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * Barrier reply arrives.
*/
- public Object sendBarrierMessage();
+ Object syncSendBarrierMessage();
+
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked. The
+ * Barrier message will be sent in a transmit thread which will be blocked
+ * until the Barrier reply arrives.
+ */
+ Object asyncSendBarrierMessage();
}
int priority;
final static AtomicLong seq = new AtomicLong();
final long seqNum;
-
+ boolean syncReply; // set to true if we want to be blocked until the response arrives
+
public PriorityMessage(OFMessage msg, int priority) {
this.msg = msg;
this.priority = priority;
this.seqNum = seq.getAndIncrement();
+ this.syncReply = false;
+ }
+
+ public PriorityMessage(OFMessage msg, int priority, boolean syncReply) {
+ this(msg, priority);
+ this.syncReply = syncReply;
}
public OFMessage getMsg() {
import java.io.IOException;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
}
private Object syncSend(OFMessage msg, int xid) {
- SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
- messageWaitingDone.put(xid, worker);
- Object result = null;
- Boolean status = false;
- Future<Object> submit = executor.submit(worker);
- try {
- result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
- messageWaitingDone.remove(xid);
- if (result == null) {
- // if result is null, then it means the switch can handle this
- // message successfully
- // convert the result into a Boolean with value true
- status = true;
- // logger.debug("Successfully send " +
- // msg.getType().toString());
- result = status;
- } else {
- // if result is not null, this means the switch can't handle
- // this message
- // the result if OFError already
- logger.debug("Send {} failed --> {}", msg.getType().toString(),
- ((OFError) result).toString());
- }
- return result;
- } catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType()
- .toString());
- // convert the result into a Boolean with value false
- status = false;
- result = status;
- return result;
- }
+ return syncMessageInternal(msg, xid, true);
}
/**
private void reportError(Exception e) {
if (e instanceof AsynchronousCloseException
|| e instanceof InterruptedException
- || e instanceof SocketException || e instanceof IOException) {
+ || e instanceof SocketException || e instanceof IOException
+ || e instanceof ClosedSelectorException) {
logger.debug("Caught exception {}", e.getMessage());
} else {
logger.warn("Caught exception ", e);
PriorityMessage pmsg = transmitQ.poll();
msgReadWriteService.asyncSend(pmsg.msg);
logger.trace("Message sent: {}", pmsg.toString());
+ /*
+ * If syncReply is set to true, wait for the response
+ * back.
+ */
+ if (pmsg.syncReply) {
+ syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+ }
}
Thread.sleep(10);
} catch (InterruptedException ie) {
}
/**
- * Sends synchronous Barrier message
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * Barrier reply is received.
*/
@Override
- public Object sendBarrierMessage() {
+ public Object syncSendBarrierMessage() {
OFBarrierRequest barrierMsg = new OFBarrierRequest();
return syncSend(barrierMsg);
}
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked. The
+ * Barrier message will be sent in a transmit thread which will be blocked
+ * until the Barrier reply is received.
+ */
+ @Override
+ public Object asyncSendBarrierMessage() {
+ if (transmitQ == null) {
+ return Boolean.FALSE;
+ }
+
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ int xid = getNextXid();
+
+ barrierMsg.setXid(xid);
+ transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+
+ return Boolean.TRUE;
+ }
+
/**
* This method returns the switch liveness timeout value. If controller did
* not receive any message from the switch for such a long period,
return rv;
}
+
+ /**
+ * This method performs synchronous operations for a given message. If
+ * syncRequest is set to true, the message will be sent out followed by a
+ * Barrier request message. Then it's blocked until the Barrier rely arrives
+ * or timeout. If syncRequest is false, it simply skips the message send and
+ * just waits for the response back.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message XID
+ * @param request
+ * If set to true, the message the message will be sent out
+ * followed by a Barrier request message. If set to false, it
+ * simply skips the sending and just waits for the Barrier reply.
+ * @return the result
+ */
+ private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+ SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+ messageWaitingDone.put(xid, worker);
+ Object result = null;
+ Boolean status = false;
+ Future<Object> submit = executor.submit(worker);
+ try {
+ result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+ messageWaitingDone.remove(xid);
+ if (result == null) {
+ // if result is null, then it means the switch can handle this
+ // message successfully
+ // convert the result into a Boolean with value true
+ status = true;
+ // logger.debug("Successfully send " +
+ // msg.getType().toString());
+ result = status;
+ } else {
+ // if result is not null, this means the switch can't handle
+ // this message
+ // the result if OFError already
+ logger.debug("Send {} failed --> {}", msg.getType().toString(),
+ ((OFError) result).toString());
+ }
+ return result;
+ } catch (Exception e) {
+ logger.warn("Timeout while waiting for {} reply", msg.getType()
+ .toString());
+ // convert the result into a Boolean with value false
+ status = false;
+ result = status;
+ return result;
+ }
+ }
}
-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
import org.openflow.protocol.OFMessage;
/**
- * Implements the synchronous message send to a switch
- * It sends the requested message to the switch followed by a barrier request message
- * It returns the result once it gets the reply from the switch or after a timeout
- * If the protocol does not dictate the switch to reply the processing status for a particular message
- * the barrier request forces the switch to reply saying whether or not the message processing was
- * successful for messages sent to the switch up to this point
- *
- *
- *
+ * This class implements synchronous operations on message send to a switch. If
+ * syncRequest is set to true, it sends the requested message to the switch
+ * followed by a Barrier request message. It returns the result once it gets the
+ * reply from the switch or after a timeout. If the protocol does not dictate
+ * the switch to reply the processing status for a particular message, the
+ * Barrier request forces the switch to reply saying whether or not the message
+ * processing was successful for messages sent to the switch up to this point.
+ * If syncRequest is false, it simply skips the message send and just waits for
+ * the response back.
*/
public class SynchronousMessage implements Callable<Object> {
private ISwitch sw;
private OFMessage syncMsg;
protected CountDownLatch latch;
private Object result;
+ private boolean syncRequest;
- public SynchronousMessage(ISwitch sw, Integer xid, OFMessage msg) {
+ public SynchronousMessage(ISwitch sw, Integer xid, OFMessage msg,
+ boolean syncRequest) {
this.sw = sw;
this.xid = xid;
syncMsg = msg;
latch = new CountDownLatch(1);
result = null;
+ this.syncRequest = syncRequest;
}
@Override
public Object call() throws Exception {
- sw.asyncSend(syncMsg, xid);
- if (!(syncMsg instanceof OFBarrierRequest)) {
- OFBarrierRequest barrierMsg = new OFBarrierRequest();
- sw.asyncSend(barrierMsg, xid);
+ /*
+ * Send out message only if syncRequest is set to true. Otherwise, just
+ * wait for the Barrier response back.
+ */
+ if (syncRequest) {
+ sw.asyncSend(syncMsg, xid);
+ if (!(syncMsg instanceof OFBarrierRequest)) {
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ sw.asyncSend(barrierMsg, xid);
+ }
}
latch.await();
return result;
@Override
public Status removeAllFlows(Node node) {
- return new Status(StatusCode.SUCCESS, null);
+ return new Status(StatusCode.SUCCESS);
}
private String errorString(String phase, String action, String cause) {
}
@Override
- public Status sendBarrierMessage(Node node) {
+ public Status syncSendBarrierMessage(Node node) {
if (!node.getType().equals(NodeIDType.OPENFLOW)) {
return new Status(StatusCode.NOTACCEPTABLE,
"The node does not support Barrier message.");
long swid = (Long) node.getID();
ISwitch sw = controller.getSwitch(swid);
if (sw != null) {
- sw.sendBarrierMessage();
+ sw.syncSendBarrierMessage();
clearXid2Rid(swid);
- return (new Status(StatusCode.SUCCESS, null));
+ return (new Status(StatusCode.SUCCESS));
+ } else {
+ return new Status(StatusCode.GONE,
+ "The node does not have a valid Switch reference.");
+ }
+ }
+ return new Status(StatusCode.INTERNALERROR,
+ "Failed to send Barrier message.");
+ }
+
+ @Override
+ public Status asyncSendBarrierMessage(Node node) {
+ if (!node.getType().equals(NodeIDType.OPENFLOW)) {
+ return new Status(StatusCode.NOTACCEPTABLE,
+ "The node does not support Barrier message.");
+ }
+
+ if (controller != null) {
+ long swid = (Long) node.getID();
+ ISwitch sw = controller.getSwitch(swid);
+ if (sw != null) {
+ sw.asyncSendBarrierMessage();
+ clearXid2Rid(swid);
+ return (new Status(StatusCode.SUCCESS));
} else {
return new Status(StatusCode.GONE,
"The node does not have a valid Switch reference.");
int size = swxid2rid.size();
if (size % barrierMessagePriorCount == 0) {
- result = sendBarrierMessage(node);
+ result = asyncSendBarrierMessage(node);
}
return result;
Status removeAllFlows(Node node);
/**
- * Send synchronous Barrier message
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * solicitation response arrives.
*
* Solicit the network node to report whether all the requests sent so far
* are completed. When this call is done, caller knows that all past flow
* The network node to solicit
* @return The status of this request containing the unique request id
*/
- Status sendBarrierMessage(Node node);
+ Status syncSendBarrierMessage(Node node);
+
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked.
+ *
+ * Solicit the network node to report whether all the requests sent so far
+ * are completed. When this call is done, caller knows that all past flow
+ * operations requested to the node in asynchronous fashion were satisfied
+ * by the network node and that in case of any failure, a message was sent
+ * to the controller.
+ *
+ * @param node
+ * The network node to solicit
+ * @return The status of this request containing the unique request id
+ */
+ Status asyncSendBarrierMessage(Node node);
}
Status removeAllFlows(Node node);
/**
- * Send synchronous Barrier message
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * Barrier reply arrives.
*
* @param node
*/
- Status sendBarrierMessage(Node node);
+ Status syncSendBarrierMessage(Node node);
+
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked.
+ *
+ * @param node
+ */
+ Status asyncSendBarrierMessage(Node node);
}
}
@Override
- public Status sendBarrierMessage(Node node) {
+ public Status syncSendBarrierMessage(Node node) {
if (this.pluginFlowProgrammer != null) {
if (this.pluginFlowProgrammer.get(node.getType()) != null) {
return this.pluginFlowProgrammer.get(node.getType())
- .sendBarrierMessage(node);
+ .syncSendBarrierMessage(node);
+ }
+ }
+ return new Status(StatusCode.NOSERVICE, "Plugin unuvailable");
+ }
+
+ @Override
+ public Status asyncSendBarrierMessage(Node node) {
+ if (this.pluginFlowProgrammer != null) {
+ if (this.pluginFlowProgrammer.get(node.getType()) != null) {
+ return this.pluginFlowProgrammer.get(node.getType())
+ .asyncSendBarrierMessage(node);
}
}
return new Status(StatusCode.NOSERVICE, "Plugin unuvailable");