import java.io.IOException;
import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
}
executor.shutdown();
- selector = null;
msgReadWriteService = null;
if (switchHandlerThread != null) {
}
private Object syncSend(OFMessage msg, int xid) {
- SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
- messageWaitingDone.put(xid, worker);
- Object result = null;
- Boolean status = false;
- Future<Object> submit = executor.submit(worker);
- try {
- result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
- messageWaitingDone.remove(xid);
- if (result == null) {
- // if result is null, then it means the switch can handle this
- // message successfully
- // convert the result into a Boolean with value true
- status = true;
- // logger.debug("Successfully send " +
- // msg.getType().toString());
- result = status;
- } else {
- // if result is not null, this means the switch can't handle
- // this message
- // the result if OFError already
- logger.debug("Send {} failed --> {}", msg.getType().toString(),
- ((OFError) result).toString());
- }
- return result;
- } catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType()
- .toString());
- // convert the result into a Boolean with value false
- status = false;
- result = status;
- return result;
- }
+ return syncMessageInternal(msg, xid, true);
}
/**
List<OFMessage> msgs = null;
try {
- msgs = msgReadWriteService.readMessages();
+ if (msgReadWriteService != null) {
+ msgs = msgReadWriteService.readMessages();
+ }
} catch (Exception e) {
reportError(e);
}
}
for (OFMessage msg : msgs) {
logger.trace("Message received: {}", msg.toString());
- /*
- * if ((msg.getType() != OFType.ECHO_REQUEST) && (msg.getType() !=
- * OFType.ECHO_REPLY)) { logger.debug(msg.getType().toString() +
- * " received from sw " + toString()); }
- */
this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
OFType type = msg.getType();
switch (type) {
}
private void processPortStatusMsg(OFPortStatus msg) {
- // short portNumber = msg.getDesc().getPortNumber();
OFPhysicalPort port = msg.getDesc();
if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
updatePhysicalPort(port);
- // logger.debug("Port " + portNumber + " on " + toString() +
- // " modified");
} else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
updatePhysicalPort(port);
- // logger.debug("Port " + portNumber + " on " + toString() +
- // " added");
} else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
.ordinal()) {
deletePhysicalPort(port);
- // logger.debug("Port " + portNumber + " on " + toString() +
- // " deleted");
}
}
reportSwitchStateChange(false);
} else {
// send a probe to see if the switch is still alive
- // logger.debug("Send idle probe (Echo Request) to "
- // + switchName());
+ logger.debug(
+ "Send idle probe (Echo Request) to {}",
+ toString());
probeSent = true;
OFMessage echo = factory
.getMessage(OFType.ECHO_REQUEST);
private void reportError(Exception e) {
if (e instanceof AsynchronousCloseException
|| e instanceof InterruptedException
- || e instanceof SocketException
- || e instanceof IOException) {
+ || e instanceof SocketException || e instanceof IOException
+ || e instanceof ClosedSelectorException) {
logger.debug("Caught exception {}", e.getMessage());
} else {
logger.warn("Caught exception ", e);
private void reportSwitchStateChange(boolean added) {
if (added) {
- ((Controller) core).takeSwtichEventAdd(this);
+ ((Controller) core).takeSwitchEventAdd(this);
} else {
((Controller) core).takeSwitchEventDelete(this);
}
.getValue()
| OFPortFeatures.OFPPF_1GB_HD
.getValue() | OFPortFeatures.OFPPF_10GB_FD
- .getValue()));
+ .getValue()));
}
private void deletePhysicalPort(OFPhysicalPort port) {
@Override
public String toString() {
- return ("["
- + this.socket.toString()
- + " SWID "
- + (isOperational() ? HexString.toHexString(this.sid)
- : "unkbown") + "]");
+ try {
+ return ("Switch:"
+ + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
+ + " SWID:" + (isOperational() ? HexString
+ .toHexString(this.sid) : "unknown"));
+ } catch (Exception e) {
+ return (isOperational() ? HexString.toHexString(this.sid)
+ : "unknown");
+ }
+
}
@Override
PriorityMessage pmsg = transmitQ.poll();
msgReadWriteService.asyncSend(pmsg.msg);
logger.trace("Message sent: {}", pmsg.toString());
+ /*
+ * If syncReply is set to true, wait for the response
+ * back.
+ */
+ if (pmsg.syncReply) {
+ syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+ }
}
Thread.sleep(10);
} catch (InterruptedException ie) {
}
/**
- * Sends synchronous Barrier message
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * Barrier reply is received.
+ */
+ @Override
+ public Object syncSendBarrierMessage() {
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ return syncSend(barrierMsg);
+ }
+
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked. The
+ * Barrier message will be sent in a transmit thread which will be blocked
+ * until the Barrier reply is received.
*/
@Override
- public Object sendBarrierMessage() {
+ public Object asyncSendBarrierMessage() {
+ if (transmitQ == null) {
+ return Boolean.FALSE;
+ }
+
OFBarrierRequest barrierMsg = new OFBarrierRequest();
- return syncSend(barrierMsg);
+ int xid = getNextXid();
+
+ barrierMsg.setXid(xid);
+ transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+
+ return Boolean.TRUE;
}
-
+
/**
* This method returns the switch liveness timeout value. If controller did
* not receive any message from the switch for such a long period,
return rv;
}
+
+ /**
+ * This method performs synchronous operations for a given message. If
+ * syncRequest is set to true, the message will be sent out followed by a
+ * Barrier request message. Then it's blocked until the Barrier rely arrives
+ * or timeout. If syncRequest is false, it simply skips the message send and
+ * just waits for the response back.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message XID
+ * @param request
+ * If set to true, the message the message will be sent out
+ * followed by a Barrier request message. If set to false, it
+ * simply skips the sending and just waits for the Barrier reply.
+ * @return the result
+ */
+ private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+ SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+ messageWaitingDone.put(xid, worker);
+ Object result = null;
+ Boolean status = false;
+ Future<Object> submit = executor.submit(worker);
+ try {
+ result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+ messageWaitingDone.remove(xid);
+ if (result == null) {
+ // if result is null, then it means the switch can handle this
+ // message successfully
+ // convert the result into a Boolean with value true
+ status = true;
+ // logger.debug("Successfully send " +
+ // msg.getType().toString());
+ result = status;
+ } else {
+ // if result is not null, this means the switch can't handle
+ // this message
+ // the result if OFError already
+ logger.debug("Send {} failed --> {}", msg.getType().toString(),
+ ((OFError) result).toString());
+ }
+ return result;
+ } catch (Exception e) {
+ logger.warn("Timeout while waiting for {} reply", msg.getType()
+ .toString());
+ // convert the result into a Boolean with value false
+ status = false;
+ result = status;
+ return result;
+ }
+ }
}