-
/*
* Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
*
package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
import java.io.IOException;
+import java.net.SocketException;
import java.nio.channels.AsynchronousCloseException;
+import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
-import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.openflow.protocol.OFBarrierReply;
+import org.openflow.protocol.OFBarrierRequest;
import org.openflow.protocol.OFEchoReply;
import org.openflow.protocol.OFError;
import org.openflow.protocol.OFFeaturesReply;
private static final Logger logger = LoggerFactory
.getLogger(SwitchHandler.class);
private static final int SWITCH_LIVENESS_TIMER = 5000;
- private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
- private int MESSAGE_RESPONSE_TIMER = 2000;
+ private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
+ private final int MESSAGE_RESPONSE_TIMER = 2000;
- private String instanceName;
- private ISwitch thisISwitch;
- private IController core;
+ private final String instanceName;
+ private final ISwitch thisISwitch;
+ private final IController core;
private Long sid;
private Integer buffers;
private Integer capabilities;
private Byte tables;
private Integer actions;
private Selector selector;
- private SocketChannel socket;
- private BasicFactory factory;
- private AtomicInteger xid;
+ private final SocketChannel socket;
+ private final BasicFactory factory;
+ private final AtomicInteger xid;
private SwitchState state;
private Timer periodicTimer;
- private Map<Short, OFPhysicalPort> physicalPorts;
- private Map<Short, Integer> portBandwidth;
- private Date connectedDate;
+ private final Map<Short, OFPhysicalPort> physicalPorts;
+ private final Map<Short, Integer> portBandwidth;
+ private final Date connectedDate;
private Long lastMsgReceivedTimeStamp;
private Boolean probeSent;
- private ExecutorService executor;
- private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
+ private final ExecutorService executor;
+ private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
private boolean running;
private IMessageReadWrite msgReadWriteService;
private Thread switchHandlerThread;
private Integer responseTimerValue;
- private PriorityBlockingQueue<PriorityMessage> transmitQ;
+ private PriorityBlockingQueue<PriorityMessage> transmitQ;
private Thread transmitThread;
-
+
private enum SwitchState {
NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
3);
this.instanceName = name;
this.thisISwitch = this;
this.sid = (long) 0;
- this.buffers = (int)0;
- this.capabilities = (int)0;
- this.tables = (byte)0;
- this.actions = (int)0;
+ this.buffers = (int) 0;
+ this.capabilities = (int) 0;
+ this.tables = (byte) 0;
+ this.actions = (int) 0;
this.core = core;
this.socket = sc;
this.factory = new BasicFactory();
this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
String rTimer = System.getProperty("of.messageResponseTimer");
if (rTimer != null) {
- try {
- responseTimerValue = Integer.decode(rTimer);
- } catch (NumberFormatException e) {
- logger.warn("Invalid of.messageResponseTimer: {} use default({})",
- rTimer, MESSAGE_RESPONSE_TIMER);
- }
+ try {
+ responseTimerValue = Integer.decode(rTimer);
+ } catch (NumberFormatException e) {
+ logger.warn(
+ "Invalid of.messageResponseTimer: {} use default({})",
+ rTimer, MESSAGE_RESPONSE_TIMER);
+ }
}
- }
+ }
public void start() {
try {
- startTransmitThread();
- setupCommChannel();
- sendFirstHello();
+ startTransmitThread();
+ setupCommChannel();
+ sendFirstHello();
startHandlerThread();
} catch (Exception e) {
reportError(e);
running = true;
while (running) {
try {
- // wait for an incoming connection
+ // wait for an incoming connection
selector.select(0);
Iterator<SelectionKey> selectedKeys = selector
.selectedKeys().iterator();
}
}
} catch (Exception e) {
- reportError(e);
+ reportError(e);
}
}
}
}
public void stop() {
- running = false;
- cancelSwitchTimer();
- try {
- selector.wakeup();
- selector.close();
- } catch (Exception e) {
- }
- try {
- socket.close();
- } catch (Exception e) {
- }
- try {
- msgReadWriteService.stop();
- } catch (Exception e) {
- }
- executor.shutdown();
-
- selector = null;
- socket = null;
- msgReadWriteService = null;
-
- if (switchHandlerThread != null) {
- switchHandlerThread.interrupt();
- }
- if (transmitThread != null) {
- transmitThread.interrupt();
- }
+ running = false;
+ cancelSwitchTimer();
+ try {
+ selector.wakeup();
+ selector.close();
+ } catch (Exception e) {
+ }
+ try {
+ socket.close();
+ } catch (Exception e) {
+ }
+ try {
+ msgReadWriteService.stop();
+ } catch (Exception e) {
+ }
+ executor.shutdown();
+
+ msgReadWriteService = null;
+
+ if (switchHandlerThread != null) {
+ switchHandlerThread.interrupt();
+ }
+ if (transmitThread != null) {
+ transmitThread.interrupt();
+ }
}
@Override
return this.xid.incrementAndGet();
}
- /**
- * This method puts the message in an outgoing priority queue with normal
- * priority. It will be served after high priority messages. The method
- * should be used for non-critical messages such as statistics request,
- * discovery packets, etc. An unique XID is generated automatically and
- * inserted into the message.
- *
- * @param msg The OF message to be sent
- * @return The XID used
- */
+ /**
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. An unique XID is generated automatically and
+ * inserted into the message.
+ *
+ * @param msg
+ * The OF message to be sent
+ * @return The XID used
+ */
@Override
public Integer asyncSend(OFMessage msg) {
- return asyncSend(msg, getNextXid());
- }
-
- /**
- * This method puts the message in an outgoing priority queue with normal
- * priority. It will be served after high priority messages. The method
- * should be used for non-critical messages such as statistics request,
- * discovery packets, etc. The specified XID is inserted into the message.
- *
- * @param msg The OF message to be Sent
- * @param xid The XID to be used in the message
- * @return The XID used
- */
+ return asyncSend(msg, getNextXid());
+ }
+
+ private Object syncSend(OFMessage msg, int xid) {
+ return syncMessageInternal(msg, xid, true);
+ }
+
+ /**
+ * This method puts the message in an outgoing priority queue with normal
+ * priority. It will be served after high priority messages. The method
+ * should be used for non-critical messages such as statistics request,
+ * discovery packets, etc. The specified XID is inserted into the message.
+ *
+ * @param msg
+ * The OF message to be Sent
+ * @param xid
+ * The XID to be used in the message
+ * @return The XID used
+ */
@Override
public Integer asyncSend(OFMessage msg, int xid) {
- msg.setXid(xid);
- transmitQ.add(new PriorityMessage(msg, 0));
+ msg.setXid(xid);
+ if (transmitQ != null) {
+ transmitQ.add(new PriorityMessage(msg, 0));
+ }
return xid;
}
- /**
- * This method puts the message in an outgoing priority queue with high
- * priority. It will be served first before normal priority messages. The
- * method should be used for critical messages such as hello, echo reply
- * etc. An unique XID is generated automatically and inserted into the
- * message.
- *
- * @param msg The OF message to be sent
- * @return The XID used
- */
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. An unique XID is generated automatically and inserted into the
+ * message.
+ *
+ * @param msg
+ * The OF message to be sent
+ * @return The XID used
+ */
@Override
public Integer asyncFastSend(OFMessage msg) {
- return asyncFastSend(msg, getNextXid());
- }
-
- /**
- * This method puts the message in an outgoing priority queue with high
- * priority. It will be served first before normal priority messages. The
- * method should be used for critical messages such as hello, echo reply
- * etc. The specified XID is inserted into the message.
- *
- * @param msg The OF message to be sent
- * @return The XID used
- */
+ return asyncFastSend(msg, getNextXid());
+ }
+
+ /**
+ * This method puts the message in an outgoing priority queue with high
+ * priority. It will be served first before normal priority messages. The
+ * method should be used for critical messages such as hello, echo reply
+ * etc. The specified XID is inserted into the message.
+ *
+ * @param msg
+ * The OF message to be sent
+ * @return The XID used
+ */
@Override
public Integer asyncFastSend(OFMessage msg, int xid) {
- msg.setXid(xid);
- transmitQ.add(new PriorityMessage(msg, 1));
+ msg.setXid(xid);
+ if (transmitQ != null) {
+ transmitQ.add(new PriorityMessage(msg, 1));
+ }
return xid;
}
- public void resumeSend() {
+ public void resumeSend() {
try {
- msgReadWriteService.resumeSend();
- } catch (Exception e) {
- reportError(e);
- }
+ if (msgReadWriteService != null) {
+ msgReadWriteService.resumeSend();
+ }
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
+ /**
+ * This method bypasses the transmit queue and sends the message over the
+ * socket directly. If the input xid is not null, the specified xid is
+ * inserted into the message. Otherwise, an unique xid is generated
+ * automatically and inserted into the message.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message xid
+ */
+ private void asyncSendNow(OFMessage msg, Integer xid) {
+ if (xid == null) {
+ xid = getNextXid();
+ }
+ msg.setXid(xid);
+
+ asyncSendNow(msg);
+ }
+
+ /**
+ * This method bypasses the transmit queue and sends the message over the
+ * socket directly.
+ *
+ * @param msg
+ * Message to be sent
+ */
+ private void asyncSendNow(OFMessage msg) {
+ if (msgReadWriteService == null) {
+ logger.warn(
+ "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
+ msg);
+ return;
+ }
+
+ try {
+ msgReadWriteService.asyncSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
}
public void handleMessages() {
List<OFMessage> msgs = null;
-
+
try {
- msgs = msgReadWriteService.readMessages();
- } catch (Exception e) {
- reportError(e);
- }
-
+ if (msgReadWriteService != null) {
+ msgs = msgReadWriteService.readMessages();
+ }
+ } catch (Exception e) {
+ reportError(e);
+ }
+
if (msgs == null) {
- logger.debug("{} is down", toString());
+ logger.debug("{} is down", this);
// the connection is down, inform core
reportSwitchStateChange(false);
return;
}
for (OFMessage msg : msgs) {
- logger.trace("Message received: {}", msg.toString());
- /*
- if ((msg.getType() != OFType.ECHO_REQUEST) &&
- (msg.getType() != OFType.ECHO_REPLY)) {
- logger.debug(msg.getType().toString() + " received from sw " + toString());
- }
- */
+ logger.trace("Message received: {}", msg);
this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
OFType type = msg.getType();
switch (type) {
OFFlowMod flowMod = (OFFlowMod) factory
.getMessage(OFType.FLOW_MOD);
flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
- .setOutPort(OFPort.OFPP_NONE).setLength(
- (short) OFFlowMod.MINIMUM_LENGTH);
+ .setOutPort(OFPort.OFPP_NONE)
+ .setLength((short) OFFlowMod.MINIMUM_LENGTH);
asyncFastSend(flowMod);
this.state = SwitchState.WAIT_FEATURES_REPLY;
startSwitchTimer();
case ECHO_REQUEST:
OFEchoReply echoReply = (OFEchoReply) factory
.getMessage(OFType.ECHO_REPLY);
- asyncFastSend(echoReply);
+ // respond immediately
+ asyncSendNow(echoReply, msg.getXid());
break;
case ECHO_REPLY:
this.probeSent = false;
processFeaturesReply((OFFeaturesReply) msg);
break;
case GET_CONFIG_REPLY:
- // make sure that the switch can send the whole packet to the controller
+ // make sure that the switch can send the whole packet to the
+ // controller
if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
this.state = SwitchState.OPERATIONAL;
}
}
private void processPortStatusMsg(OFPortStatus msg) {
- //short portNumber = msg.getDesc().getPortNumber();
OFPhysicalPort port = msg.getDesc();
if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
updatePhysicalPort(port);
- //logger.debug("Port " + portNumber + " on " + toString() + " modified");
} else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
updatePhysicalPort(port);
- //logger.debug("Port " + portNumber + " on " + toString() + " added");
} else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
.ordinal()) {
deletePhysicalPort(port);
- //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
}
}
public void run() {
try {
Long now = System.currentTimeMillis();
- if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
+ if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
if (probeSent) {
- // switch failed to respond to our probe, consider it down
- logger.warn("{} is idle for too long, disconnect", toString());
+ // switch failed to respond to our probe, consider
+ // it down
+ logger.warn("{} is idle for too long, disconnect",
+ toString());
reportSwitchStateChange(false);
} else {
// send a probe to see if the switch is still alive
- //logger.debug("Send idle probe (Echo Request) to " + switchName());
+ logger.debug(
+ "Send idle probe (Echo Request) to {}",
+ this);
probeSent = true;
OFMessage echo = factory
.getMessage(OFType.ECHO_REQUEST);
asyncFastSend(request);
} else {
if (state == SwitchState.WAIT_CONFIG_REPLY) {
- // send another config request
+ // send another config request
OFSetConfig config = (OFSetConfig) factory
.getMessage(OFType.SET_CONFIG);
config.setMissSendLength((short) 0xffff)
}
private void reportError(Exception e) {
- if (e instanceof AsynchronousCloseException) {
- logger.debug("Caught exception {}", e.getMessage());
- } else {
- logger.warn("Caught exception {}", e.getMessage());
- }
+ if (e instanceof AsynchronousCloseException
+ || e instanceof InterruptedException
+ || e instanceof SocketException || e instanceof IOException
+ || e instanceof ClosedSelectorException) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Caught exception {}", e.getMessage());
+ }
+ } else {
+ logger.warn("Caught exception ", e);
+ }
// notify core of this error event and disconnect the switch
((Controller) core).takeSwitchEventError(this);
}
private void reportSwitchStateChange(boolean added) {
if (added) {
- ((Controller) core).takeSwtichEventAdd(this);
+ ((Controller) core).takeSwitchEventAdd(this);
} else {
((Controller) core).takeSwitchEventDelete(this);
}
config.setMissSendLength((short) 0xffff).setLengthU(
OFSetConfig.MINIMUM_LENGTH);
asyncFastSend(config);
- // send config request to make sure the switch can handle the set config
+ // send config request to make sure the switch can handle the set
+ // config
OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
asyncFastSend(getConfig);
this.state = SwitchState.WAIT_CONFIG_REPLY;
Short portNumber = port.getPortNumber();
physicalPorts.put(portNumber, port);
portBandwidth
- .put(
- portNumber,
+ .put(portNumber,
port.getCurrentFeatures()
& (OFPortFeatures.OFPPF_10MB_FD.getValue()
| OFPortFeatures.OFPPF_10MB_HD
.getValue()
| OFPortFeatures.OFPPF_1GB_HD
.getValue() | OFPortFeatures.OFPPF_10GB_FD
- .getValue()));
+ .getValue()));
}
private void deletePhysicalPort(OFPhysicalPort port) {
@Override
public String toString() {
- return ("["
- + this.socket.toString()
- + " SWID "
- + (isOperational() ? HexString.toHexString(this.sid)
- : "unkbown") + "]");
+ try {
+ return ("Switch:"
+ + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
+ + " SWID:" + (isOperational() ? HexString
+ .toHexString(this.sid) : "unknown"));
+ } catch (Exception e) {
+ return (isOperational() ? HexString.toHexString(this.sid)
+ : "unknown");
+ }
+
}
@Override
Future<Object> submit = executor.submit(worker);
Object result = null;
try {
- result = submit
- .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
+ result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
logger.warn("Timeout while waiting for {} replies", req.getType());
result = null; // to indicate timeout has occurred
+ worker.wakeup();
return result;
}
}
@Override
public Object syncSend(OFMessage msg) {
- Integer xid = getNextXid();
- SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
- messageWaitingDone.put(xid, worker);
- Object result = null;
- Boolean status = false;
- Future<Object> submit = executor.submit(worker);
- try {
- result = submit
- .get(responseTimerValue, TimeUnit.MILLISECONDS);
- messageWaitingDone.remove(xid);
- if (result == null) {
- // if result is null, then it means the switch can handle this message successfully
- // convert the result into a Boolean with value true
- status = true;
- //logger.debug("Successfully send " + msg.getType().toString());
- result = status;
- } else {
- // if result is not null, this means the switch can't handle this message
- // the result if OFError already
- logger.debug("Send {} failed --> {}",
- msg.getType().toString(), ((OFError) result).toString());
- }
- return result;
- } catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
- // convert the result into a Boolean with value false
- status = false;
- result = status;
- return result;
- }
+ int xid = getNextXid();
+ return syncSend(msg, xid);
}
/*
- * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
- * wake up associated task so that it can continue
+ * Either a BarrierReply or a OFError is received. If this is a reply for an
+ * outstanding sync message, wake up associated task so that it can continue
*/
private void processBarrierReply(OFBarrierReply msg) {
Integer xid = msg.getXid();
xid = errorMsg.getXid();
}
/*
- * the error can be a reply to a synchronous message or to a statistic request message
+ * the error can be a reply to a synchronous message or to a statistic
+ * request message
*/
Callable<?> worker = messageWaitingDone.remove(xid);
if (worker == null) {
worker.wakeup();
}
}
-
+
@Override
public Map<Short, OFPhysicalPort> getPhysicalPorts() {
return this.physicalPorts;
public Byte getTables() {
return this.tables;
}
-
+
@Override
public Integer getActions() {
return this.actions;
}
-
+
@Override
public Integer getCapabilities() {
return this.capabilities;
return result;
}
- /*
- * Transmit thread polls the message out of the priority queue and invokes
- * messaging service to transmit it over the socket channel
- */
+ /*
+ * Transmit thread polls the message out of the priority queue and invokes
+ * messaging service to transmit it over the socket channel
+ */
class PriorityMessageTransmit implements Runnable {
+ @Override
public void run() {
running = true;
while (running) {
- try {
- if (!transmitQ.isEmpty()) {
- PriorityMessage pmsg = transmitQ.poll();
- msgReadWriteService.asyncSend(pmsg.msg);
- logger.trace("Message sent: {}", pmsg.toString());
- }
- Thread.sleep(10);
- } catch (Exception e) {
- reportError(e);
- }
+ try {
+ PriorityMessage pmsg = transmitQ.take();
+ msgReadWriteService.asyncSend(pmsg.msg);
+ /*
+ * If syncReply is set to true, wait for the response back.
+ */
+ if (pmsg.syncReply) {
+ syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
+ }
+ } catch (InterruptedException ie) {
+ reportError(new InterruptedException(
+ "PriorityMessageTransmit thread interrupted"));
+ } catch (Exception e) {
+ reportError(e);
+ }
}
- transmitQ = null;
+ transmitQ = null;
}
}
/*
* Setup and start the transmit thread
*/
- private void startTransmitThread() {
- this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
- new Comparator<PriorityMessage>() {
- public int compare(PriorityMessage p1, PriorityMessage p2) {
- return p2.priority - p1.priority;
- }
- });
+ private void startTransmitThread() {
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
+ new Comparator<PriorityMessage>() {
+ @Override
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ if (p2.priority != p1.priority) {
+ return p2.priority - p1.priority;
+ } else {
+ return (p2.seqNum < p1.seqNum) ? 1 : -1;
+ }
+ }
+ });
this.transmitThread = new Thread(new PriorityMessageTransmit());
this.transmitThread.start();
}
-
+
/*
* Setup communication services
*/
private void setupCommChannel() throws Exception {
this.selector = SelectorProvider.provider().openSelector();
this.socket.configureBlocking(false);
- this.socket.socket().setTcpNoDelay(true);
+ this.socket.socket().setTcpNoDelay(true);
this.msgReadWriteService = getMessageReadWriteService();
}
private void sendFirstHello() {
- try {
- OFMessage msg = factory.getMessage(OFType.HELLO);
- asyncFastSend(msg);
- } catch (Exception e) {
- reportError(e);
- }
- }
-
+ try {
+ OFMessage msg = factory.getMessage(OFType.HELLO);
+ asyncFastSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
private IMessageReadWrite getMessageReadWriteService() throws Exception {
- String str = System.getProperty("secureChannelEnabled");
- return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ?
- new SecureMessageReadWriteService(socket, selector) :
- new MessageReadWriteService(socket, selector);
+ String str = System.getProperty("secureChannelEnabled");
+ return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
+ socket, selector) : new MessageReadWriteService(socket,
+ selector);
+ }
+
+ /**
+ * Send Barrier message synchronously. The caller will be blocked until the
+ * Barrier reply is received.
+ */
+ @Override
+ public Object syncSendBarrierMessage() {
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ return syncSend(barrierMsg);
+ }
+
+ /**
+ * Send Barrier message asynchronously. The caller is not blocked. The
+ * Barrier message will be sent in a transmit thread which will be blocked
+ * until the Barrier reply is received.
+ */
+ @Override
+ public Object asyncSendBarrierMessage() {
+ if (transmitQ == null) {
+ return Boolean.FALSE;
+ }
+
+ OFBarrierRequest barrierMsg = new OFBarrierRequest();
+ int xid = getNextXid();
+
+ barrierMsg.setXid(xid);
+ transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
+
+ return Boolean.TRUE;
+ }
+
+ /**
+ * This method returns the switch liveness timeout value. If controller did
+ * not receive any message from the switch for such a long period,
+ * controller will tear down the connection to the switch.
+ *
+ * @return The timeout value
+ */
+ private static int getSwitchLivenessTimeout() {
+ String timeout = System.getProperty("of.switchLivenessTimeout");
+ int rv = 60500;
+
+ try {
+ if (timeout != null) {
+ rv = Integer.parseInt(timeout);
+ }
+ } catch (Exception e) {
+ }
+
+ return rv;
+ }
+
+ /**
+ * This method performs synchronous operations for a given message. If
+ * syncRequest is set to true, the message will be sent out followed by a
+ * Barrier request message. Then it's blocked until the Barrier rely arrives
+ * or timeout. If syncRequest is false, it simply skips the message send and
+ * just waits for the response back.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message XID
+ * @param request
+ * If set to true, the message the message will be sent out
+ * followed by a Barrier request message. If set to false, it
+ * simply skips the sending and just waits for the Barrier reply.
+ * @return the result
+ */
+ private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
+ SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
+ messageWaitingDone.put(xid, worker);
+ Object result = null;
+ Boolean status = false;
+ Future<Object> submit = executor.submit(worker);
+ try {
+ result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
+ messageWaitingDone.remove(xid);
+ if (result == null) {
+ // if result is null, then it means the switch can handle this
+ // message successfully
+ // convert the result into a Boolean with value true
+ status = true;
+ // logger.debug("Successfully send " +
+ // msg.getType().toString());
+ result = status;
+ } else {
+ // if result is not null, this means the switch can't handle
+ // this message
+ // the result if OFError already
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send {} failed --> {}", msg.getType(),
+ (result));
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ logger.warn("Timeout while waiting for {} reply", msg.getType()
+ .toString());
+ // convert the result into a Boolean with value false
+ status = false;
+ result = status;
+ worker.wakeup();
+ return result;
+ }
}
}