X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fprotocol_plugins%2Fopenflow%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fprotocol_plugin%2Fopenflow%2Fcore%2Finternal%2FSwitchHandler.java;h=52ea7fd575a76ae4f2053c5b1c96c7a10e622c0e;hb=cba872bacd7963de38208fde9f37747820340d00;hp=cba8b1d4f17c38a9d232e1b54a3463c96cfe8dbf;hpb=a8a3c93da9702012a3e265e0680cc4663d77ca1e;p=controller.git diff --git a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java index cba8b1d4f1..52ea7fd575 100644 --- a/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java +++ b/opendaylight/protocol_plugins/openflow/src/main/java/org/opendaylight/controller/protocol_plugin/openflow/core/internal/SwitchHandler.java @@ -1,4 +1,3 @@ - /* * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved. * @@ -10,7 +9,9 @@ package org.opendaylight.controller.protocol_plugin.openflow.core.internal; import java.io.IOException; +import java.net.SocketException; import java.nio.channels.AsynchronousCloseException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; @@ -35,9 +36,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.opendaylight.controller.protocol_plugin.openflow.core.IController; -import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite; +import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch; import org.openflow.protocol.OFBarrierReply; +import org.openflow.protocol.OFBarrierRequest; import org.openflow.protocol.OFEchoReply; import org.openflow.protocol.OFError; import org.openflow.protocol.OFFeaturesReply; @@ -65,37 +67,37 @@ public class SwitchHandler implements ISwitch { private static final Logger logger = LoggerFactory .getLogger(SwitchHandler.class); private static final int SWITCH_LIVENESS_TIMER = 5000; - private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500; - private int MESSAGE_RESPONSE_TIMER = 2000; + private static final int switchLivenessTimeout = getSwitchLivenessTimeout(); + private final int MESSAGE_RESPONSE_TIMER = 2000; - private String instanceName; - private ISwitch thisISwitch; - private IController core; + private final String instanceName; + private final ISwitch thisISwitch; + private final IController core; private Long sid; private Integer buffers; private Integer capabilities; private Byte tables; private Integer actions; private Selector selector; - private SocketChannel socket; - private BasicFactory factory; - private AtomicInteger xid; + private final SocketChannel socket; + private final BasicFactory factory; + private final AtomicInteger xid; private SwitchState state; private Timer periodicTimer; - private Map physicalPorts; - private Map portBandwidth; - private Date connectedDate; + private final Map physicalPorts; + private final Map portBandwidth; + private final Date connectedDate; private Long lastMsgReceivedTimeStamp; private Boolean probeSent; - private ExecutorService executor; - private ConcurrentHashMap> messageWaitingDone; + private final ExecutorService executor; + private final ConcurrentHashMap> messageWaitingDone; private boolean running; private IMessageReadWrite msgReadWriteService; private Thread switchHandlerThread; private Integer responseTimerValue; - private PriorityBlockingQueue transmitQ; + private PriorityBlockingQueue transmitQ; private Thread transmitThread; - + private enum SwitchState { NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL( 3); @@ -116,10 +118,10 @@ public class SwitchHandler implements ISwitch { this.instanceName = name; this.thisISwitch = this; this.sid = (long) 0; - this.buffers = (int)0; - this.capabilities = (int)0; - this.tables = (byte)0; - this.actions = (int)0; + this.buffers = (int) 0; + this.capabilities = (int) 0; + this.tables = (byte) 0; + this.actions = (int) 0; this.core = core; this.socket = sc; this.factory = new BasicFactory(); @@ -136,20 +138,21 @@ public class SwitchHandler implements ISwitch { this.responseTimerValue = MESSAGE_RESPONSE_TIMER; String rTimer = System.getProperty("of.messageResponseTimer"); if (rTimer != null) { - try { - responseTimerValue = Integer.decode(rTimer); - } catch (NumberFormatException e) { - logger.warn("Invalid of.messageResponseTimer: {} use default({})", - rTimer, MESSAGE_RESPONSE_TIMER); - } + try { + responseTimerValue = Integer.decode(rTimer); + } catch (NumberFormatException e) { + logger.warn( + "Invalid of.messageResponseTimer: {} use default({})", + rTimer, MESSAGE_RESPONSE_TIMER); + } } - } + } public void start() { try { - startTransmitThread(); - setupCommChannel(); - sendFirstHello(); + startTransmitThread(); + setupCommChannel(); + sendFirstHello(); startHandlerThread(); } catch (Exception e) { reportError(e); @@ -163,7 +166,7 @@ public class SwitchHandler implements ISwitch { running = true; while (running) { try { - // wait for an incoming connection + // wait for an incoming connection selector.select(0); Iterator selectedKeys = selector .selectedKeys().iterator(); @@ -178,7 +181,7 @@ public class SwitchHandler implements ISwitch { } } } catch (Exception e) { - reportError(e); + reportError(e); } } } @@ -187,33 +190,31 @@ public class SwitchHandler implements ISwitch { } public void stop() { - running = false; - cancelSwitchTimer(); - try { - selector.wakeup(); - selector.close(); - } catch (Exception e) { - } - try { - socket.close(); - } catch (Exception e) { - } - try { - msgReadWriteService.stop(); - } catch (Exception e) { - } - executor.shutdown(); - - selector = null; - socket = null; - msgReadWriteService = null; - - if (switchHandlerThread != null) { - switchHandlerThread.interrupt(); - } - if (transmitThread != null) { - transmitThread.interrupt(); - } + running = false; + cancelSwitchTimer(); + try { + selector.wakeup(); + selector.close(); + } catch (Exception e) { + } + try { + socket.close(); + } catch (Exception e) { + } + try { + msgReadWriteService.stop(); + } catch (Exception e) { + } + executor.shutdown(); + + msgReadWriteService = null; + + if (switchHandlerThread != null) { + switchHandlerThread.interrupt(); + } + if (transmitThread != null) { + transmitThread.interrupt(); + } } @Override @@ -221,100 +222,153 @@ public class SwitchHandler implements ISwitch { return this.xid.incrementAndGet(); } - /** - * This method puts the message in an outgoing priority queue with normal - * priority. It will be served after high priority messages. The method - * should be used for non-critical messages such as statistics request, - * discovery packets, etc. An unique XID is generated automatically and - * inserted into the message. - * - * @param msg The OF message to be sent - * @return The XID used - */ + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. An unique XID is generated automatically and + * inserted into the message. + * + * @param msg + * The OF message to be sent + * @return The XID used + */ @Override public Integer asyncSend(OFMessage msg) { - return asyncSend(msg, getNextXid()); - } - - /** - * This method puts the message in an outgoing priority queue with normal - * priority. It will be served after high priority messages. The method - * should be used for non-critical messages such as statistics request, - * discovery packets, etc. The specified XID is inserted into the message. - * - * @param msg The OF message to be Sent - * @param xid The XID to be used in the message - * @return The XID used - */ + return asyncSend(msg, getNextXid()); + } + + private Object syncSend(OFMessage msg, int xid) { + return syncMessageInternal(msg, xid, true); + } + + /** + * This method puts the message in an outgoing priority queue with normal + * priority. It will be served after high priority messages. The method + * should be used for non-critical messages such as statistics request, + * discovery packets, etc. The specified XID is inserted into the message. + * + * @param msg + * The OF message to be Sent + * @param xid + * The XID to be used in the message + * @return The XID used + */ @Override public Integer asyncSend(OFMessage msg, int xid) { - msg.setXid(xid); - transmitQ.add(new PriorityMessage(msg, 0)); + msg.setXid(xid); + if (transmitQ != null) { + transmitQ.add(new PriorityMessage(msg, 0)); + } return xid; } - /** - * This method puts the message in an outgoing priority queue with high - * priority. It will be served first before normal priority messages. The - * method should be used for critical messages such as hello, echo reply - * etc. An unique XID is generated automatically and inserted into the - * message. - * - * @param msg The OF message to be sent - * @return The XID used - */ + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. An unique XID is generated automatically and inserted into the + * message. + * + * @param msg + * The OF message to be sent + * @return The XID used + */ @Override public Integer asyncFastSend(OFMessage msg) { - return asyncFastSend(msg, getNextXid()); - } - - /** - * This method puts the message in an outgoing priority queue with high - * priority. It will be served first before normal priority messages. The - * method should be used for critical messages such as hello, echo reply - * etc. The specified XID is inserted into the message. - * - * @param msg The OF message to be sent - * @return The XID used - */ + return asyncFastSend(msg, getNextXid()); + } + + /** + * This method puts the message in an outgoing priority queue with high + * priority. It will be served first before normal priority messages. The + * method should be used for critical messages such as hello, echo reply + * etc. The specified XID is inserted into the message. + * + * @param msg + * The OF message to be sent + * @return The XID used + */ @Override public Integer asyncFastSend(OFMessage msg, int xid) { - msg.setXid(xid); - transmitQ.add(new PriorityMessage(msg, 1)); + msg.setXid(xid); + if (transmitQ != null) { + transmitQ.add(new PriorityMessage(msg, 1)); + } return xid; } - public void resumeSend() { + public void resumeSend() { try { - msgReadWriteService.resumeSend(); - } catch (Exception e) { - reportError(e); - } + if (msgReadWriteService != null) { + msgReadWriteService.resumeSend(); + } + } catch (Exception e) { + reportError(e); + } + } + + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. If the input xid is not null, the specified xid is + * inserted into the message. Otherwise, an unique xid is generated + * automatically and inserted into the message. + * + * @param msg + * Message to be sent + * @param xid + * Message xid + */ + private void asyncSendNow(OFMessage msg, Integer xid) { + if (xid == null) { + xid = getNextXid(); + } + msg.setXid(xid); + + asyncSendNow(msg); + } + + /** + * This method bypasses the transmit queue and sends the message over the + * socket directly. + * + * @param msg + * Message to be sent + */ + private void asyncSendNow(OFMessage msg) { + if (msgReadWriteService == null) { + logger.warn( + "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", + msg); + return; + } + + try { + msgReadWriteService.asyncSend(msg); + } catch (Exception e) { + reportError(e); + } } public void handleMessages() { List msgs = null; - + try { - msgs = msgReadWriteService.readMessages(); - } catch (Exception e) { - reportError(e); - } - + if (msgReadWriteService != null) { + msgs = msgReadWriteService.readMessages(); + } + } catch (Exception e) { + reportError(e); + } + if (msgs == null) { - logger.debug("{} is down", toString()); + logger.debug("{} is down", this); // the connection is down, inform core reportSwitchStateChange(false); return; } for (OFMessage msg : msgs) { - logger.trace("Message received: {}", msg.toString()); - /* - if ((msg.getType() != OFType.ECHO_REQUEST) && - (msg.getType() != OFType.ECHO_REPLY)) { - logger.debug(msg.getType().toString() + " received from sw " + toString()); - } - */ + logger.trace("Message received: {}", msg); this.lastMsgReceivedTimeStamp = System.currentTimeMillis(); OFType type = msg.getType(); switch (type) { @@ -328,8 +382,8 @@ public class SwitchHandler implements ISwitch { OFFlowMod flowMod = (OFFlowMod) factory .getMessage(OFType.FLOW_MOD); flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE) - .setOutPort(OFPort.OFPP_NONE).setLength( - (short) OFFlowMod.MINIMUM_LENGTH); + .setOutPort(OFPort.OFPP_NONE) + .setLength((short) OFFlowMod.MINIMUM_LENGTH); asyncFastSend(flowMod); this.state = SwitchState.WAIT_FEATURES_REPLY; startSwitchTimer(); @@ -337,7 +391,8 @@ public class SwitchHandler implements ISwitch { case ECHO_REQUEST: OFEchoReply echoReply = (OFEchoReply) factory .getMessage(OFType.ECHO_REPLY); - asyncFastSend(echoReply); + // respond immediately + asyncSendNow(echoReply, msg.getXid()); break; case ECHO_REPLY: this.probeSent = false; @@ -346,7 +401,8 @@ public class SwitchHandler implements ISwitch { processFeaturesReply((OFFeaturesReply) msg); break; case GET_CONFIG_REPLY: - // make sure that the switch can send the whole packet to the controller + // make sure that the switch can send the whole packet to the + // controller if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) { this.state = SwitchState.OPERATIONAL; } @@ -375,18 +431,14 @@ public class SwitchHandler implements ISwitch { } private void processPortStatusMsg(OFPortStatus msg) { - //short portNumber = msg.getDesc().getPortNumber(); OFPhysicalPort port = msg.getDesc(); if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) { updatePhysicalPort(port); - //logger.debug("Port " + portNumber + " on " + toString() + " modified"); } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) { updatePhysicalPort(port); - //logger.debug("Port " + portNumber + " on " + toString() + " added"); } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE .ordinal()) { deletePhysicalPort(port); - //logger.debug("Port " + portNumber + " on " + toString() + " deleted"); } } @@ -398,14 +450,18 @@ public class SwitchHandler implements ISwitch { public void run() { try { Long now = System.currentTimeMillis(); - if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) { + if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) { if (probeSent) { - // switch failed to respond to our probe, consider it down - logger.warn("{} is idle for too long, disconnect", toString()); + // switch failed to respond to our probe, consider + // it down + logger.warn("{} is idle for too long, disconnect", + toString()); reportSwitchStateChange(false); } else { // send a probe to see if the switch is still alive - //logger.debug("Send idle probe (Echo Request) to " + switchName()); + logger.debug( + "Send idle probe (Echo Request) to {}", + this); probeSent = true; OFMessage echo = factory .getMessage(OFType.ECHO_REQUEST); @@ -419,7 +475,7 @@ public class SwitchHandler implements ISwitch { asyncFastSend(request); } else { if (state == SwitchState.WAIT_CONFIG_REPLY) { - // send another config request + // send another config request OFSetConfig config = (OFSetConfig) factory .getMessage(OFType.SET_CONFIG); config.setMissSendLength((short) 0xffff) @@ -445,18 +501,23 @@ public class SwitchHandler implements ISwitch { } private void reportError(Exception e) { - if (e instanceof AsynchronousCloseException) { - logger.debug("Caught exception {}", e.getMessage()); - } else { - logger.warn("Caught exception {}", e.getMessage()); - } + if (e instanceof AsynchronousCloseException + || e instanceof InterruptedException + || e instanceof SocketException || e instanceof IOException + || e instanceof ClosedSelectorException) { + if (logger.isDebugEnabled()) { + logger.debug("Caught exception {}", e.getMessage()); + } + } else { + logger.warn("Caught exception ", e); + } // notify core of this error event and disconnect the switch ((Controller) core).takeSwitchEventError(this); } private void reportSwitchStateChange(boolean added) { if (added) { - ((Controller) core).takeSwtichEventAdd(this); + ((Controller) core).takeSwitchEventAdd(this); } else { ((Controller) core).takeSwitchEventDelete(this); } @@ -484,7 +545,8 @@ public class SwitchHandler implements ISwitch { config.setMissSendLength((short) 0xffff).setLengthU( OFSetConfig.MINIMUM_LENGTH); asyncFastSend(config); - // send config request to make sure the switch can handle the set config + // send config request to make sure the switch can handle the set + // config OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST); asyncFastSend(getConfig); this.state = SwitchState.WAIT_CONFIG_REPLY; @@ -497,8 +559,7 @@ public class SwitchHandler implements ISwitch { Short portNumber = port.getPortNumber(); physicalPorts.put(portNumber, port); portBandwidth - .put( - portNumber, + .put(portNumber, port.getCurrentFeatures() & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD @@ -511,7 +572,7 @@ public class SwitchHandler implements ISwitch { .getValue() | OFPortFeatures.OFPPF_1GB_HD .getValue() | OFPortFeatures.OFPPF_10GB_FD - .getValue())); + .getValue())); } private void deletePhysicalPort(OFPhysicalPort port) { @@ -527,11 +588,16 @@ public class SwitchHandler implements ISwitch { @Override public String toString() { - return ("[" - + this.socket.toString() - + " SWID " - + (isOperational() ? HexString.toHexString(this.sid) - : "unkbown") + "]"); + try { + return ("Switch:" + + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + + " SWID:" + (isOperational() ? HexString + .toHexString(this.sid) : "unknown")); + } catch (Exception e) { + return (isOperational() ? HexString.toHexString(this.sid) + : "unknown"); + } + } @Override @@ -551,53 +617,25 @@ public class SwitchHandler implements ISwitch { Future submit = executor.submit(worker); Object result = null; try { - result = submit - .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS); + result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); return result; } catch (Exception e) { logger.warn("Timeout while waiting for {} replies", req.getType()); result = null; // to indicate timeout has occurred + worker.wakeup(); return result; } } @Override public Object syncSend(OFMessage msg) { - Integer xid = getNextXid(); - SynchronousMessage worker = new SynchronousMessage(this, xid, msg); - messageWaitingDone.put(xid, worker); - Object result = null; - Boolean status = false; - Future submit = executor.submit(worker); - try { - result = submit - .get(responseTimerValue, TimeUnit.MILLISECONDS); - messageWaitingDone.remove(xid); - if (result == null) { - // if result is null, then it means the switch can handle this message successfully - // convert the result into a Boolean with value true - status = true; - //logger.debug("Successfully send " + msg.getType().toString()); - result = status; - } else { - // if result is not null, this means the switch can't handle this message - // the result if OFError already - logger.debug("Send {} failed --> {}", - msg.getType().toString(), ((OFError) result).toString()); - } - return result; - } catch (Exception e) { - logger.warn("Timeout while waiting for {} reply", msg.getType().toString()); - // convert the result into a Boolean with value false - status = false; - result = status; - return result; - } + int xid = getNextXid(); + return syncSend(msg, xid); } /* - * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message, - * wake up associated task so that it can continue + * Either a BarrierReply or a OFError is received. If this is a reply for an + * outstanding sync message, wake up associated task so that it can continue */ private void processBarrierReply(OFBarrierReply msg) { Integer xid = msg.getXid(); @@ -618,7 +656,8 @@ public class SwitchHandler implements ISwitch { xid = errorMsg.getXid(); } /* - * the error can be a reply to a synchronous message or to a statistic request message + * the error can be a reply to a synchronous message or to a statistic + * request message */ Callable worker = messageWaitingDone.remove(xid); if (worker == null) { @@ -645,7 +684,7 @@ public class SwitchHandler implements ISwitch { worker.wakeup(); } } - + @Override public Map getPhysicalPorts() { return this.physicalPorts; @@ -670,12 +709,12 @@ public class SwitchHandler implements ISwitch { public Byte getTables() { return this.tables; } - + @Override public Integer getActions() { return this.actions; } - + @Override public Integer getCapabilities() { return this.capabilities; @@ -724,66 +763,183 @@ public class SwitchHandler implements ISwitch { return result; } - /* - * Transmit thread polls the message out of the priority queue and invokes - * messaging service to transmit it over the socket channel - */ + /* + * Transmit thread polls the message out of the priority queue and invokes + * messaging service to transmit it over the socket channel + */ class PriorityMessageTransmit implements Runnable { + @Override public void run() { running = true; while (running) { - try { - if (!transmitQ.isEmpty()) { - PriorityMessage pmsg = transmitQ.poll(); - msgReadWriteService.asyncSend(pmsg.msg); - logger.trace("Message sent: {}", pmsg.toString()); - } - Thread.sleep(10); - } catch (Exception e) { - reportError(e); - } + try { + PriorityMessage pmsg = transmitQ.take(); + msgReadWriteService.asyncSend(pmsg.msg); + /* + * If syncReply is set to true, wait for the response back. + */ + if (pmsg.syncReply) { + syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false); + } + } catch (InterruptedException ie) { + reportError(new InterruptedException( + "PriorityMessageTransmit thread interrupted")); + } catch (Exception e) { + reportError(e); + } } - transmitQ = null; + transmitQ = null; } } /* * Setup and start the transmit thread */ - private void startTransmitThread() { - this.transmitQ = new PriorityBlockingQueue(11, - new Comparator() { - public int compare(PriorityMessage p1, PriorityMessage p2) { - return p2.priority - p1.priority; - } - }); + private void startTransmitThread() { + this.transmitQ = new PriorityBlockingQueue(11, + new Comparator() { + @Override + public int compare(PriorityMessage p1, PriorityMessage p2) { + if (p2.priority != p1.priority) { + return p2.priority - p1.priority; + } else { + return (p2.seqNum < p1.seqNum) ? 1 : -1; + } + } + }); this.transmitThread = new Thread(new PriorityMessageTransmit()); this.transmitThread.start(); } - + /* * Setup communication services */ private void setupCommChannel() throws Exception { this.selector = SelectorProvider.provider().openSelector(); this.socket.configureBlocking(false); - this.socket.socket().setTcpNoDelay(true); + this.socket.socket().setTcpNoDelay(true); this.msgReadWriteService = getMessageReadWriteService(); } private void sendFirstHello() { - try { - OFMessage msg = factory.getMessage(OFType.HELLO); - asyncFastSend(msg); - } catch (Exception e) { - reportError(e); - } - } - + try { + OFMessage msg = factory.getMessage(OFType.HELLO); + asyncFastSend(msg); + } catch (Exception e) { + reportError(e); + } + } + private IMessageReadWrite getMessageReadWriteService() throws Exception { - String str = System.getProperty("secureChannelEnabled"); - return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? - new SecureMessageReadWriteService(socket, selector) : - new MessageReadWriteService(socket, selector); + String str = System.getProperty("secureChannelEnabled"); + return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService( + socket, selector) : new MessageReadWriteService(socket, + selector); + } + + /** + * Send Barrier message synchronously. The caller will be blocked until the + * Barrier reply is received. + */ + @Override + public Object syncSendBarrierMessage() { + OFBarrierRequest barrierMsg = new OFBarrierRequest(); + return syncSend(barrierMsg); + } + + /** + * Send Barrier message asynchronously. The caller is not blocked. The + * Barrier message will be sent in a transmit thread which will be blocked + * until the Barrier reply is received. + */ + @Override + public Object asyncSendBarrierMessage() { + if (transmitQ == null) { + return Boolean.FALSE; + } + + OFBarrierRequest barrierMsg = new OFBarrierRequest(); + int xid = getNextXid(); + + barrierMsg.setXid(xid); + transmitQ.add(new PriorityMessage(barrierMsg, 0, true)); + + return Boolean.TRUE; + } + + /** + * This method returns the switch liveness timeout value. If controller did + * not receive any message from the switch for such a long period, + * controller will tear down the connection to the switch. + * + * @return The timeout value + */ + private static int getSwitchLivenessTimeout() { + String timeout = System.getProperty("of.switchLivenessTimeout"); + int rv = 60500; + + try { + if (timeout != null) { + rv = Integer.parseInt(timeout); + } + } catch (Exception e) { + } + + return rv; + } + + /** + * This method performs synchronous operations for a given message. If + * syncRequest is set to true, the message will be sent out followed by a + * Barrier request message. Then it's blocked until the Barrier rely arrives + * or timeout. If syncRequest is false, it simply skips the message send and + * just waits for the response back. + * + * @param msg + * Message to be sent + * @param xid + * Message XID + * @param request + * If set to true, the message the message will be sent out + * followed by a Barrier request message. If set to false, it + * simply skips the sending and just waits for the Barrier reply. + * @return the result + */ + private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) { + SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest); + messageWaitingDone.put(xid, worker); + Object result = null; + Boolean status = false; + Future submit = executor.submit(worker); + try { + result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS); + messageWaitingDone.remove(xid); + if (result == null) { + // if result is null, then it means the switch can handle this + // message successfully + // convert the result into a Boolean with value true + status = true; + // logger.debug("Successfully send " + + // msg.getType().toString()); + result = status; + } else { + // if result is not null, this means the switch can't handle + // this message + // the result if OFError already + if (logger.isDebugEnabled()) { + logger.debug("Send {} failed --> {}", msg.getType(), + (result)); + } + } + return result; + } catch (Exception e) { + logger.warn("Timeout while waiting for {} reply", msg.getType() + .toString()); + // convert the result into a Boolean with value false + status = false; + result = status; + worker.wakeup(); + return result; + } } }