package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
-import java.io.IOException;
-import java.net.SocketException;
-import java.nio.channels.AsynchronousCloseException;
-import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
-import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
+import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
import org.openflow.protocol.OFBarrierReply;
import org.openflow.protocol.OFBarrierRequest;
import org.openflow.protocol.OFEchoReply;
+import org.openflow.protocol.OFEchoRequest;
import org.openflow.protocol.OFError;
import org.openflow.protocol.OFFeaturesReply;
import org.openflow.protocol.OFFlowMod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class SwitchHandler implements ISwitch {
- private static final Logger logger = LoggerFactory
- .getLogger(SwitchHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
private static final int SWITCH_LIVENESS_TIMER = 5000;
private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
- private int MESSAGE_RESPONSE_TIMER = 2000;
+ private final int MESSAGE_RESPONSE_TIMER = 2000;
- private String instanceName;
- private ISwitch thisISwitch;
- private IController core;
+ private final String instanceName;
+ private final ISwitch thisISwitch;
+ private final IController core;
private Long sid;
private Integer buffers;
private Integer capabilities;
private Byte tables;
private Integer actions;
private Selector selector;
- private SocketChannel socket;
- private BasicFactory factory;
- private AtomicInteger xid;
+ private final SocketChannel socket;
+ private final BasicFactory factory;
+ private final AtomicInteger xid;
private SwitchState state;
private Timer periodicTimer;
- private Map<Short, OFPhysicalPort> physicalPorts;
- private Map<Short, Integer> portBandwidth;
- private Date connectedDate;
+ private final Map<Short, OFPhysicalPort> physicalPorts;
+ private final Map<Short, Integer> portBandwidth;
+ private final Date connectedDate;
private Long lastMsgReceivedTimeStamp;
private Boolean probeSent;
- private ExecutorService executor;
- private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
+ private final ExecutorService executor;
+ private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
private boolean running;
private IMessageReadWrite msgReadWriteService;
private Thread switchHandlerThread;
private Thread transmitThread;
private enum SwitchState {
- NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
- 3);
+ NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
private int value;
try {
responseTimerValue = Integer.decode(rTimer);
} catch (NumberFormatException e) {
- logger.warn(
- "Invalid of.messageResponseTimer: {} use default({})",
- rTimer, MESSAGE_RESPONSE_TIMER);
+ logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
}
}
}
try {
// wait for an incoming connection
selector.select(0);
- Iterator<SelectionKey> selectedKeys = selector
- .selectedKeys().iterator();
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey skey = selectedKeys.next();
selectedKeys.remove();
switchHandlerThread.start();
}
- public void stop() {
+ private void stopInternal() {
+ logger.debug("{} receives stop signal",
+ (isOperational() ? HexString.toHexString(sid) : "unknown"));
running = false;
cancelSwitchTimer();
try {
msgReadWriteService.stop();
} catch (Exception e) {
}
- executor.shutdown();
+ logger.debug("executor shutdown now");
+ executor.shutdownNow();
msgReadWriteService = null;
+ }
+
+ public void stop() {
+ stopInternal();
if (switchHandlerThread != null) {
switchHandlerThread.interrupt();
* should be used for non-critical messages such as statistics request,
* discovery packets, etc. An unique XID is generated automatically and
* inserted into the message.
- *
+ *
* @param msg
* The OF message to be sent
* @return The XID used
* priority. It will be served after high priority messages. The method
* should be used for non-critical messages such as statistics request,
* discovery packets, etc. The specified XID is inserted into the message.
- *
+ *
* @param msg
* The OF message to be Sent
* @param xid
* method should be used for critical messages such as hello, echo reply
* etc. An unique XID is generated automatically and inserted into the
* message.
- *
+ *
* @param msg
* The OF message to be sent
* @return The XID used
* priority. It will be served first before normal priority messages. The
* method should be used for critical messages such as hello, echo reply
* etc. The specified XID is inserted into the message.
- *
+ *
* @param msg
* The OF message to be sent
* @return The XID used
}
}
+ /**
+ * This method bypasses the transmit queue and sends the message over the
+ * socket directly. If the input xid is not null, the specified xid is
+ * inserted into the message. Otherwise, an unique xid is generated
+ * automatically and inserted into the message.
+ *
+ * @param msg
+ * Message to be sent
+ * @param xid
+ * Message xid
+ */
+ private void asyncSendNow(OFMessage msg, Integer xid) {
+ if (xid == null) {
+ xid = getNextXid();
+ }
+ msg.setXid(xid);
+
+ asyncSendNow(msg);
+ }
+
+ /**
+ * This method bypasses the transmit queue and sends the message over the
+ * socket directly.
+ *
+ * @param msg
+ * Message to be sent
+ */
+ private void asyncSendNow(OFMessage msg) {
+ if (msgReadWriteService == null) {
+ logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
+ return;
+ }
+
+ try {
+ msgReadWriteService.asyncSend(msg);
+ } catch (Exception e) {
+ reportError(e);
+ }
+ }
+
public void handleMessages() {
List<OFMessage> msgs = null;
}
if (msgs == null) {
- logger.debug("{} is down", toString());
- // the connection is down, inform core
- reportSwitchStateChange(false);
return;
}
for (OFMessage msg : msgs) {
- logger.trace("Message received: {}", msg.toString());
+ logger.trace("Message received: {}", msg);
this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
OFType type = msg.getType();
switch (type) {
case HELLO:
- // send feature request
- OFMessage featureRequest = factory
- .getMessage(OFType.FEATURES_REQUEST);
- asyncFastSend(featureRequest);
- // delete all pre-existing flows
- OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
- OFFlowMod flowMod = (OFFlowMod) factory
- .getMessage(OFType.FLOW_MOD);
- flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
- .setOutPort(OFPort.OFPP_NONE)
- .setLength((short) OFFlowMod.MINIMUM_LENGTH);
- asyncFastSend(flowMod);
- this.state = SwitchState.WAIT_FEATURES_REPLY;
- startSwitchTimer();
+ sendFeaturesRequest();
break;
case ECHO_REQUEST:
- OFEchoReply echoReply = (OFEchoReply) factory
- .getMessage(OFType.ECHO_REPLY);
- asyncFastSend(echoReply);
+ OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
+
+ byte []payload = ((OFEchoRequest)msg).getPayload();
+ if (payload != null && payload.length != 0 ) {
+ // the response must have the same payload as the request
+ echoReply.setPayload(payload);
+ echoReply.setLength( (short)(echoReply.getLength() + payload.length) );
+ }
+
+ // respond immediately
+ asyncSendNow(echoReply, msg.getXid());
+
+ // send features request if not sent yet
+ sendFeaturesRequest();
break;
case ECHO_REPLY:
this.probeSent = false;
updatePhysicalPort(port);
} else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
updatePhysicalPort(port);
- } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
- .ordinal()) {
+ } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
deletePhysicalPort(port);
}
if (probeSent) {
// switch failed to respond to our probe, consider
// it down
- logger.warn("{} is idle for too long, disconnect",
- toString());
+ logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
+ .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
+ : HexString.toHexString(sid));
reportSwitchStateChange(false);
} else {
// send a probe to see if the switch is still alive
- logger.debug(
- "Send idle probe (Echo Request) to {}",
- toString());
+ logger.debug("Send idle probe (Echo Request) to {}", this);
probeSent = true;
- OFMessage echo = factory
- .getMessage(OFType.ECHO_REQUEST);
+ OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
asyncFastSend(echo);
}
} else {
if (state == SwitchState.WAIT_FEATURES_REPLY) {
// send another features request
- OFMessage request = factory
- .getMessage(OFType.FEATURES_REQUEST);
+ OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
asyncFastSend(request);
} else {
if (state == SwitchState.WAIT_CONFIG_REPLY) {
// send another config request
- OFSetConfig config = (OFSetConfig) factory
- .getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff)
- .setLengthU(OFSetConfig.MINIMUM_LENGTH);
+ OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+ config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
asyncFastSend(config);
- OFMessage getConfig = factory
- .getMessage(OFType.GET_CONFIG_REQUEST);
+ OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
asyncFastSend(getConfig);
}
}
}
private void reportError(Exception e) {
- if (e instanceof AsynchronousCloseException
- || e instanceof InterruptedException
- || e instanceof SocketException || e instanceof IOException
- || e instanceof ClosedSelectorException) {
- logger.debug("Caught exception {}", e.getMessage());
- } else {
- logger.warn("Caught exception ", e);
+ if (!running) {
+ logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
+ (isOperational() ? HexString.toHexString(sid) : "unknown"));
+ return;
}
+ logger.debug("Caught exception: ", e);
+
// notify core of this error event and disconnect the switch
((Controller) core).takeSwitchEventError(this);
+
+ // clean up some internal states immediately
+ stopInternal();
}
private void reportSwitchStateChange(boolean added) {
return this.sid;
}
+ private void sendFeaturesRequest() {
+ if (!isOperational() && (this.state != SwitchState.WAIT_FEATURES_REPLY)) {
+ // send feature request
+ OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
+ asyncFastSend(featureRequest);
+ this.state = SwitchState.WAIT_FEATURES_REPLY;
+ startSwitchTimer();
+ }
+ }
+
private void processFeaturesReply(OFFeaturesReply reply) {
if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
this.sid = reply.getDatapathId();
updatePhysicalPort(port);
}
// config the switch to send full data packet
- OFSetConfig config = (OFSetConfig) factory
- .getMessage(OFType.SET_CONFIG);
- config.setMissSendLength((short) 0xffff).setLengthU(
- OFSetConfig.MINIMUM_LENGTH);
+ OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
+ config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
asyncFastSend(config);
// send config request to make sure the switch can handle the set
// config
portBandwidth
.put(portNumber,
port.getCurrentFeatures()
- & (OFPortFeatures.OFPPF_10MB_FD.getValue()
- | OFPortFeatures.OFPPF_10MB_HD
- .getValue()
- | OFPortFeatures.OFPPF_100MB_FD
- .getValue()
- | OFPortFeatures.OFPPF_100MB_HD
- .getValue()
- | OFPortFeatures.OFPPF_1GB_FD
- .getValue()
- | OFPortFeatures.OFPPF_1GB_HD
- .getValue() | OFPortFeatures.OFPPF_10GB_FD
+ & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
+ | OFPortFeatures.OFPPF_100MB_FD.getValue()
+ | OFPortFeatures.OFPPF_100MB_HD.getValue()
+ | OFPortFeatures.OFPPF_1GB_FD.getValue()
+ | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
.getValue()));
}
@Override
public String toString() {
try {
- return ("Switch:"
- + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
- + " SWID:" + (isOperational() ? HexString
+ return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
.toHexString(this.sid) : "unknown"));
} catch (Exception e) {
- return (isOperational() ? HexString.toHexString(this.sid)
- : "unknown");
+ return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
}
}
int xid = getNextXid();
StatisticsCollector worker = new StatisticsCollector(this, xid, req);
messageWaitingDone.put(xid, worker);
- Future<Object> submit = executor.submit(worker);
+ Future<Object> submit;
Object result = null;
+ try {
+ submit = executor.submit(worker);
+ } catch (RejectedExecutionException re) {
+ messageWaitingDone.remove(xid);
+ return result;
+ }
try {
result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for {} replies", req.getType());
+ logger.warn("Timeout while waiting for {} replies from {}",
+ req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
result = null; // to indicate timeout has occurred
+ worker.wakeup();
return result;
}
}
@Override
public Object syncSend(OFMessage msg) {
+ if (!running) {
+ logger.debug("Switch is going down, ignore syncSend");
+ return null;
+ }
int xid = getNextXid();
return syncSend(msg, xid);
}
*/
private void processBarrierReply(OFBarrierReply msg) {
Integer xid = msg.getXid();
- SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
- .remove(xid);
+ SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
if (worker == null) {
return;
}
private void processStatsReply(OFStatisticsReply reply) {
Integer xid = reply.getXid();
- StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
- .get(xid);
+ StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
if (worker == null) {
return;
}
if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
return false;
}
- if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
- .getValue()) {
+ if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
return false;
}
return true;
* messaging service to transmit it over the socket channel
*/
class PriorityMessageTransmit implements Runnable {
+ @Override
public void run() {
running = true;
while (running) {
try {
- if (!transmitQ.isEmpty()) {
- PriorityMessage pmsg = transmitQ.poll();
- msgReadWriteService.asyncSend(pmsg.msg);
- logger.trace("Message sent: {}", pmsg.toString());
- /*
- * If syncReply is set to true, wait for the response
- * back.
- */
- if (pmsg.syncReply) {
- syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
- }
+ PriorityMessage pmsg = transmitQ.take();
+ msgReadWriteService.asyncSend(pmsg.msg);
+ /*
+ * If syncReply is set to true, wait for the response back.
+ */
+ if (pmsg.syncReply) {
+ syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
}
- Thread.sleep(10);
} catch (InterruptedException ie) {
- reportError(new InterruptedException(
- "PriorityMessageTransmit thread interrupted"));
+ reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
} catch (Exception e) {
reportError(e);
}
* Setup and start the transmit thread
*/
private void startTransmitThread() {
- this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
- new Comparator<PriorityMessage>() {
- public int compare(PriorityMessage p1, PriorityMessage p2) {
- if (p2.priority != p1.priority) {
- return p2.priority - p1.priority;
- } else {
- return (p2.seqNum < p1.seqNum) ? 1 : -1;
- }
- }
- });
+ this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
+ @Override
+ public int compare(PriorityMessage p1, PriorityMessage p2) {
+ if (p2.priority != p1.priority) {
+ return p2.priority - p1.priority;
+ } else {
+ return (p2.seqNum < p1.seqNum) ? 1 : -1;
+ }
+ }
+ });
this.transmitThread = new Thread(new PriorityMessageTransmit());
this.transmitThread.start();
}
private IMessageReadWrite getMessageReadWriteService() throws Exception {
String str = System.getProperty("secureChannelEnabled");
- return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
- socket, selector) : new MessageReadWriteService(socket,
- selector);
+ return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
+ selector) : new MessageReadWriteService(socket, selector);
}
/**
barrierMsg.setXid(xid);
transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
-
+
return Boolean.TRUE;
}
* This method returns the switch liveness timeout value. If controller did
* not receive any message from the switch for such a long period,
* controller will tear down the connection to the switch.
- *
+ *
* @return The timeout value
*/
private static int getSwitchLivenessTimeout() {
* Barrier request message. Then it's blocked until the Barrier rely arrives
* or timeout. If syncRequest is false, it simply skips the message send and
* just waits for the response back.
- *
+ *
* @param msg
* Message to be sent
* @param xid
messageWaitingDone.put(xid, worker);
Object result = null;
Boolean status = false;
- Future<Object> submit = executor.submit(worker);
+ Future<Object> submit;
+ try {
+ submit = executor.submit(worker);
+ } catch (RejectedExecutionException re) {
+ messageWaitingDone.remove(xid);
+ return result;
+ }
try {
result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
messageWaitingDone.remove(xid);
// if result is not null, this means the switch can't handle
// this message
// the result if OFError already
- logger.debug("Send {} failed --> {}", msg.getType().toString(),
- ((OFError) result).toString());
+ if (logger.isDebugEnabled()) {
+ logger.debug("Send {} failed --> {}", msg.getType(), (result));
+ }
}
return result;
} catch (Exception e) {
- logger.warn("Timeout while waiting for {} reply", msg.getType()
- .toString());
+ logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
// convert the result into a Boolean with value false
status = false;
result = status;
+ worker.wakeup();
return result;
}
}
+
+ @Override
+ public void deleteAllFlows() {
+ logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
+ OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
+ OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
+ flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
+ .setLength((short) OFFlowMod.MINIMUM_LENGTH);
+ asyncFastSend(flowMod);
+ }
}