import java.io.IOException;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
}
executor.shutdown();
- selector = null;
msgReadWriteService = null;
if (switchHandlerThread != null) {
* should be used for non-critical messages such as statistics request,
* discovery packets, etc. An unique XID is generated automatically and
* inserted into the message.
- *
+ *
* @param msg
* The OF message to be sent
* @return The XID used
}
private Object syncSend(OFMessage msg, int xid) {
- SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
- messageWaitingDone.put(xid, worker);
- Object result = null;
- Boolean status = false;
- Future<Object> submit = executor.submit(worker);
- try {
- result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
- messageWaitingDone.remove(xid);
- if (result == null) {
- // if result is null, then it means the switch can handle this
- // message successfully
- // convert the result into a Boolean with value true
- status = true;
- // logger.debug("Successfully send " +
- // msg.getType().toString());
- result = status;
- } else {
- // if result is not null, this means the switch can't handle
- // this message
- // the result if OFError already
- logger.debug("Send {} failed --> {}", msg.getType().toString(),
- ((OFError) result).toString());
- }
- return result;
- } catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType()
- .toString());
- // convert the result into a Boolean with value false
- status = false;
- result = status;
- return result;
- }
+ return syncMessageInternal(msg, xid, true);
}
/**
* priority. It will be served after high priority messages. The method
* should be used for non-critical messages such as statistics request,
* discovery packets, etc. The specified XID is inserted into the message.
- *
+ *
* @param msg
* The OF message to be Sent
* @param xid
* method should be used for critical messages such as hello, echo reply
* etc. An unique XID is generated automatically and inserted into the
* message.
- *
+ *
* @param msg
* The OF message to be sent
* @return The XID used
* priority. It will be served first before normal priority messages. The
* method should be used for critical messages such as hello, echo reply
* etc. The specified XID is inserted into the message.
- *
+ *
* @param msg
* The OF message to be sent
* @return The XID used
}
}
+ /**
+ * This method bypasses the transmit queue and sends the message over the
+ * socket directly. If the input xid is not null, the specified xid is
+ * inserted into the message. Otherwise, an unique xid is generated
+ * automatically and inserted into the message.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message xid
+ */
+ private void asyncSendNow(OFMessage msg, Integer xid) {
+ if (xid == null) {
+ xid = getNextXid();
+ }
+ msg.setXid(xid);
+
+ asyncSendNow(msg);
+ }
+
+ /**
+ * This method bypasses the transmit queue and sends the message over the
+ * socket directly.
+ *
+ * @param msg
+ * Message to be sent
+ */
+ private void asyncSendNow(OFMessage msg) {
+ if (msgReadWriteService == null) {
+ logger.warn(
+ "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
+ msg);
+ return;
+ }
+
+ try {
+ msgReadWriteService.asyncSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
public void handleMessages() {
List<OFMessage> msgs = null;
try {
- msgs = msgReadWriteService.readMessages();
+ if (msgReadWriteService != null) {
+ msgs = msgReadWriteService.readMessages();
+ }
} catch (Exception e) {
reportError(e);
}
if (msgs == null) {
- logger.debug("{} is down", toString());
+ logger.debug("{} is down", this);
// the connection is down, inform core
reportSwitchStateChange(false);
return;
}
for (OFMessage msg : msgs) {
- logger.trace("Message received: {}", msg.toString());
+ logger.trace("Message received: {}", msg);
this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
OFType type = msg.getType();
switch (type) {
case ECHO_REQUEST:
OFEchoReply echoReply = (OFEchoReply) factory
.getMessage(OFType.ECHO_REPLY);
- asyncFastSend(echoReply);
+ // respond immediately
+ asyncSendNow(echoReply, msg.getXid());
break;
case ECHO_REPLY:
this.probeSent = false;
// send a probe to see if the switch is still alive
logger.debug(
"Send idle probe (Echo Request) to {}",
- toString());
+ this);
probeSent = true;
OFMessage echo = factory
.getMessage(OFType.ECHO_REQUEST);
private void reportError(Exception e) {
if (e instanceof AsynchronousCloseException
|| e instanceof InterruptedException
- || e instanceof SocketException || e instanceof IOException) {
- logger.debug("Caught exception {}", e.getMessage());
+ || e instanceof SocketException || e instanceof IOException
+ || e instanceof ClosedSelectorException) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Caught exception {}", e.getMessage());
+ }
} else {
logger.warn("Caught exception ", e);
}
if (!transmitQ.isEmpty()) {
PriorityMessage pmsg = transmitQ.poll();
msgReadWriteService.asyncSend(pmsg.msg);
- logger.trace("Message sent: {}", pmsg.toString());
+ logger.trace("Message sent: {}", pmsg);
+ /*
+ * If syncReply is set to true, wait for the response
+ * back.
+ */
+ if (pmsg.syncReply) {
+ syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+ }
}
Thread.sleep(10);
} catch (InterruptedException ie) {
}
/**
- * Sends synchronous Barrier message
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * Barrier reply is received.
*/
@Override
- public Object sendBarrierMessage() {
+ public Object syncSendBarrierMessage() {
OFBarrierRequest barrierMsg = new OFBarrierRequest();
return syncSend(barrierMsg);
}
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked. The
+ * Barrier message will be sent in a transmit thread which will be blocked
+ * until the Barrier reply is received.
+ */
+ @Override
+ public Object asyncSendBarrierMessage() {
+ if (transmitQ == null) {
+ return Boolean.FALSE;
+ }
+
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ int xid = getNextXid();
+
+ barrierMsg.setXid(xid);
+ transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+
+ return Boolean.TRUE;
+ }
+
/**
* This method returns the switch liveness timeout value. If controller did
* not receive any message from the switch for such a long period,
* controller will tear down the connection to the switch.
- *
+ *
* @return The timeout value
*/
private static int getSwitchLivenessTimeout() {
return rv;
}
+
+ /**
+ * This method performs synchronous operations for a given message. If
+ * syncRequest is set to true, the message will be sent out followed by a
+ * Barrier request message. Then it's blocked until the Barrier rely arrives
+ * or timeout. If syncRequest is false, it simply skips the message send and
+ * just waits for the response back.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message XID
+ * @param request
+ * If set to true, the message the message will be sent out
+ * followed by a Barrier request message. If set to false, it
+ * simply skips the sending and just waits for the Barrier reply.
+ * @return the result
+ */
+ private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+ SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+ messageWaitingDone.put(xid, worker);
+ Object result = null;
+ Boolean status = false;
+ Future<Object> submit = executor.submit(worker);
+ try {
+ result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+ messageWaitingDone.remove(xid);
+ if (result == null) {
+ // if result is null, then it means the switch can handle this
+ // message successfully
+ // convert the result into a Boolean with value true
+ status = true;
+ // logger.debug("Successfully send " +
+ // msg.getType().toString());
+ result = status;
+ } else {
+ // if result is not null, this means the switch can't handle
+ // this message
+ // the result if OFError already
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send {} failed --> {}", msg.getType(),
+ ((OFError) result));
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ logger.warn("Timeout while waiting for {} reply", msg.getType()
+ .toString());
+ // convert the result into a Boolean with value false
+ status = false;
+ result = status;
+ return result;
+ }
+ }
}