2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedSelectorException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.nio.channels.spi.SelectorProvider;
19 import java.util.ArrayList;
20 import java.util.Comparator;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
41 import org.openflow.protocol.OFBarrierReply;
42 import org.openflow.protocol.OFBarrierRequest;
43 import org.openflow.protocol.OFEchoReply;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFFeaturesReply;
46 import org.openflow.protocol.OFFlowMod;
47 import org.openflow.protocol.OFGetConfigReply;
48 import org.openflow.protocol.OFMatch;
49 import org.openflow.protocol.OFMessage;
50 import org.openflow.protocol.OFPhysicalPort;
51 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
52 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
53 import org.openflow.protocol.OFPhysicalPort.OFPortState;
54 import org.openflow.protocol.OFPort;
55 import org.openflow.protocol.OFPortStatus;
56 import org.openflow.protocol.OFPortStatus.OFPortReason;
57 import org.openflow.protocol.OFSetConfig;
58 import org.openflow.protocol.OFStatisticsReply;
59 import org.openflow.protocol.OFStatisticsRequest;
60 import org.openflow.protocol.OFType;
61 import org.openflow.protocol.factory.BasicFactory;
62 import org.openflow.util.HexString;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
66 public class SwitchHandler implements ISwitch {
67 private static final Logger logger = LoggerFactory
68 .getLogger(SwitchHandler.class);
69 private static final int SWITCH_LIVENESS_TIMER = 5000;
70 private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
71 private final int MESSAGE_RESPONSE_TIMER = 2000;
73 private final String instanceName;
74 private final ISwitch thisISwitch;
75 private final IController core;
77 private Integer buffers;
78 private Integer capabilities;
80 private Integer actions;
81 private Selector selector;
82 private final SocketChannel socket;
83 private final BasicFactory factory;
84 private final AtomicInteger xid;
85 private SwitchState state;
86 private Timer periodicTimer;
87 private final Map<Short, OFPhysicalPort> physicalPorts;
88 private final Map<Short, Integer> portBandwidth;
89 private final Date connectedDate;
90 private Long lastMsgReceivedTimeStamp;
91 private Boolean probeSent;
92 private final ExecutorService executor;
93 private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
94 private boolean running;
95 private IMessageReadWrite msgReadWriteService;
96 private Thread switchHandlerThread;
97 private Integer responseTimerValue;
98 private PriorityBlockingQueue<PriorityMessage> transmitQ;
99 private Thread transmitThread;
101 private enum SwitchState {
102 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
107 private SwitchState(int value) {
111 @SuppressWarnings("unused")
117 public SwitchHandler(Controller core, SocketChannel sc, String name) {
118 this.instanceName = name;
119 this.thisISwitch = this;
121 this.buffers = (int) 0;
122 this.capabilities = (int) 0;
123 this.tables = (byte) 0;
124 this.actions = (int) 0;
127 this.factory = new BasicFactory();
128 this.connectedDate = new Date();
129 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
130 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
131 this.portBandwidth = new HashMap<Short, Integer>();
132 this.state = SwitchState.NON_OPERATIONAL;
133 this.probeSent = false;
134 this.xid = new AtomicInteger(this.socket.hashCode());
135 this.periodicTimer = null;
136 this.executor = Executors.newFixedThreadPool(4);
137 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139 String rTimer = System.getProperty("of.messageResponseTimer");
140 if (rTimer != null) {
142 responseTimerValue = Integer.decode(rTimer);
143 } catch (NumberFormatException e) {
145 "Invalid of.messageResponseTimer: {} use default({})",
146 rTimer, MESSAGE_RESPONSE_TIMER);
151 public void start() {
153 startTransmitThread();
156 startHandlerThread();
157 } catch (Exception e) {
162 private void startHandlerThread() {
163 switchHandlerThread = new Thread(new Runnable() {
169 // wait for an incoming connection
171 Iterator<SelectionKey> selectedKeys = selector
172 .selectedKeys().iterator();
173 while (selectedKeys.hasNext()) {
174 SelectionKey skey = selectedKeys.next();
175 selectedKeys.remove();
176 if (skey.isValid() && skey.isWritable()) {
179 if (skey.isValid() && skey.isReadable()) {
183 } catch (Exception e) {
189 switchHandlerThread.start();
198 } catch (Exception e) {
202 } catch (Exception e) {
205 msgReadWriteService.stop();
206 } catch (Exception e) {
210 msgReadWriteService = null;
212 if (switchHandlerThread != null) {
213 switchHandlerThread.interrupt();
215 if (transmitThread != null) {
216 transmitThread.interrupt();
221 public int getNextXid() {
222 return this.xid.incrementAndGet();
226 * This method puts the message in an outgoing priority queue with normal
227 * priority. It will be served after high priority messages. The method
228 * should be used for non-critical messages such as statistics request,
229 * discovery packets, etc. An unique XID is generated automatically and
230 * inserted into the message.
233 * The OF message to be sent
234 * @return The XID used
237 public Integer asyncSend(OFMessage msg) {
238 return asyncSend(msg, getNextXid());
241 private Object syncSend(OFMessage msg, int xid) {
242 return syncMessageInternal(msg, xid, true);
246 * This method puts the message in an outgoing priority queue with normal
247 * priority. It will be served after high priority messages. The method
248 * should be used for non-critical messages such as statistics request,
249 * discovery packets, etc. The specified XID is inserted into the message.
252 * The OF message to be Sent
254 * The XID to be used in the message
255 * @return The XID used
258 public Integer asyncSend(OFMessage msg, int xid) {
260 if (transmitQ != null) {
261 transmitQ.add(new PriorityMessage(msg, 0));
267 * This method puts the message in an outgoing priority queue with high
268 * priority. It will be served first before normal priority messages. The
269 * method should be used for critical messages such as hello, echo reply
270 * etc. An unique XID is generated automatically and inserted into the
274 * The OF message to be sent
275 * @return The XID used
278 public Integer asyncFastSend(OFMessage msg) {
279 return asyncFastSend(msg, getNextXid());
283 * This method puts the message in an outgoing priority queue with high
284 * priority. It will be served first before normal priority messages. The
285 * method should be used for critical messages such as hello, echo reply
286 * etc. The specified XID is inserted into the message.
289 * The OF message to be sent
290 * @return The XID used
293 public Integer asyncFastSend(OFMessage msg, int xid) {
295 if (transmitQ != null) {
296 transmitQ.add(new PriorityMessage(msg, 1));
301 public void resumeSend() {
303 if (msgReadWriteService != null) {
304 msgReadWriteService.resumeSend();
306 } catch (Exception e) {
312 * This method bypasses the transmit queue and sends the message over the
313 * socket directly. If the input xid is not null, the specified xid is
314 * inserted into the message. Otherwise, an unique xid is generated
315 * automatically and inserted into the message.
322 private void asyncSendNow(OFMessage msg, Integer xid) {
332 * This method bypasses the transmit queue and sends the message over the
338 private void asyncSendNow(OFMessage msg) {
339 if (msgReadWriteService == null) {
341 "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
347 msgReadWriteService.asyncSend(msg);
348 } catch (Exception e) {
353 public void handleMessages() {
354 List<OFMessage> msgs = null;
357 if (msgReadWriteService != null) {
358 msgs = msgReadWriteService.readMessages();
360 } catch (Exception e) {
365 logger.debug("{} is down", this);
366 // the connection is down, inform core
367 reportSwitchStateChange(false);
370 for (OFMessage msg : msgs) {
371 logger.trace("Message received: {}", msg);
372 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
373 OFType type = msg.getType();
376 // send feature request
377 OFMessage featureRequest = factory
378 .getMessage(OFType.FEATURES_REQUEST);
379 asyncFastSend(featureRequest);
380 // delete all pre-existing flows
381 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
382 OFFlowMod flowMod = (OFFlowMod) factory
383 .getMessage(OFType.FLOW_MOD);
384 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
385 .setOutPort(OFPort.OFPP_NONE)
386 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
387 asyncFastSend(flowMod);
388 this.state = SwitchState.WAIT_FEATURES_REPLY;
392 OFEchoReply echoReply = (OFEchoReply) factory
393 .getMessage(OFType.ECHO_REPLY);
394 // respond immediately
395 asyncSendNow(echoReply, msg.getXid());
398 this.probeSent = false;
401 processFeaturesReply((OFFeaturesReply) msg);
403 case GET_CONFIG_REPLY:
404 // make sure that the switch can send the whole packet to the
406 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
407 this.state = SwitchState.OPERATIONAL;
411 processBarrierReply((OFBarrierReply) msg);
414 processErrorReply((OFError) msg);
417 processPortStatusMsg((OFPortStatus) msg);
420 processStatsReply((OFStatisticsReply) msg);
427 if (isOperational()) {
428 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
433 private void processPortStatusMsg(OFPortStatus msg) {
434 OFPhysicalPort port = msg.getDesc();
435 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
436 updatePhysicalPort(port);
437 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
438 updatePhysicalPort(port);
439 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
441 deletePhysicalPort(port);
446 private void startSwitchTimer() {
447 this.periodicTimer = new Timer();
448 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
452 Long now = System.currentTimeMillis();
453 if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
455 // switch failed to respond to our probe, consider
457 logger.warn("{} is idle for too long, disconnect",
459 reportSwitchStateChange(false);
461 // send a probe to see if the switch is still alive
463 "Send idle probe (Echo Request) to {}",
466 OFMessage echo = factory
467 .getMessage(OFType.ECHO_REQUEST);
471 if (state == SwitchState.WAIT_FEATURES_REPLY) {
472 // send another features request
473 OFMessage request = factory
474 .getMessage(OFType.FEATURES_REQUEST);
475 asyncFastSend(request);
477 if (state == SwitchState.WAIT_CONFIG_REPLY) {
478 // send another config request
479 OFSetConfig config = (OFSetConfig) factory
480 .getMessage(OFType.SET_CONFIG);
481 config.setMissSendLength((short) 0xffff)
482 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
483 asyncFastSend(config);
484 OFMessage getConfig = factory
485 .getMessage(OFType.GET_CONFIG_REQUEST);
486 asyncFastSend(getConfig);
490 } catch (Exception e) {
494 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
497 private void cancelSwitchTimer() {
498 if (this.periodicTimer != null) {
499 this.periodicTimer.cancel();
503 private void reportError(Exception e) {
504 if (e instanceof AsynchronousCloseException
505 || e instanceof InterruptedException
506 || e instanceof SocketException || e instanceof IOException
507 || e instanceof ClosedSelectorException) {
508 if (logger.isDebugEnabled()) {
509 logger.debug("Caught exception {}", e.getMessage());
512 logger.warn("Caught exception ", e);
514 // notify core of this error event and disconnect the switch
515 ((Controller) core).takeSwitchEventError(this);
518 private void reportSwitchStateChange(boolean added) {
520 ((Controller) core).takeSwitchEventAdd(this);
522 ((Controller) core).takeSwitchEventDelete(this);
527 public Long getId() {
531 private void processFeaturesReply(OFFeaturesReply reply) {
532 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
533 this.sid = reply.getDatapathId();
534 this.buffers = reply.getBuffers();
535 this.capabilities = reply.getCapabilities();
536 this.tables = reply.getTables();
537 this.actions = reply.getActions();
538 // notify core of this error event
539 for (OFPhysicalPort port : reply.getPorts()) {
540 updatePhysicalPort(port);
542 // config the switch to send full data packet
543 OFSetConfig config = (OFSetConfig) factory
544 .getMessage(OFType.SET_CONFIG);
545 config.setMissSendLength((short) 0xffff).setLengthU(
546 OFSetConfig.MINIMUM_LENGTH);
547 asyncFastSend(config);
548 // send config request to make sure the switch can handle the set
550 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
551 asyncFastSend(getConfig);
552 this.state = SwitchState.WAIT_CONFIG_REPLY;
553 // inform core that a new switch is now operational
554 reportSwitchStateChange(true);
558 private void updatePhysicalPort(OFPhysicalPort port) {
559 Short portNumber = port.getPortNumber();
560 physicalPorts.put(portNumber, port);
563 port.getCurrentFeatures()
564 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
565 | OFPortFeatures.OFPPF_10MB_HD
567 | OFPortFeatures.OFPPF_100MB_FD
569 | OFPortFeatures.OFPPF_100MB_HD
571 | OFPortFeatures.OFPPF_1GB_FD
573 | OFPortFeatures.OFPPF_1GB_HD
574 .getValue() | OFPortFeatures.OFPPF_10GB_FD
578 private void deletePhysicalPort(OFPhysicalPort port) {
579 Short portNumber = port.getPortNumber();
580 physicalPorts.remove(portNumber);
581 portBandwidth.remove(portNumber);
585 public boolean isOperational() {
586 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
590 public String toString() {
593 + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
594 + " SWID:" + (isOperational() ? HexString
595 .toHexString(this.sid) : "unknown"));
596 } catch (Exception e) {
597 return (isOperational() ? HexString.toHexString(this.sid)
604 public Date getConnectedDate() {
605 return this.connectedDate;
608 public String getInstanceName() {
613 public Object getStatistics(OFStatisticsRequest req) {
614 int xid = getNextXid();
615 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
616 messageWaitingDone.put(xid, worker);
617 Future<Object> submit = executor.submit(worker);
618 Object result = null;
620 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
622 } catch (Exception e) {
623 logger.warn("Timeout while waiting for {} replies", req.getType());
624 result = null; // to indicate timeout has occurred
631 public Object syncSend(OFMessage msg) {
632 int xid = getNextXid();
633 return syncSend(msg, xid);
637 * Either a BarrierReply or a OFError is received. If this is a reply for an
638 * outstanding sync message, wake up associated task so that it can continue
640 private void processBarrierReply(OFBarrierReply msg) {
641 Integer xid = msg.getXid();
642 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
644 if (worker == null) {
650 private void processErrorReply(OFError errorMsg) {
651 OFMessage offendingMsg = errorMsg.getOffendingMsg();
653 if (offendingMsg != null) {
654 xid = offendingMsg.getXid();
656 xid = errorMsg.getXid();
659 * the error can be a reply to a synchronous message or to a statistic
662 Callable<?> worker = messageWaitingDone.remove(xid);
663 if (worker == null) {
666 if (worker instanceof SynchronousMessage) {
667 ((SynchronousMessage) worker).wakeup(errorMsg);
669 ((StatisticsCollector) worker).wakeup(errorMsg);
673 private void processStatsReply(OFStatisticsReply reply) {
674 Integer xid = reply.getXid();
675 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
677 if (worker == null) {
680 if (worker.collect(reply)) {
681 // if all the stats records are received (collect() returns true)
683 messageWaitingDone.remove(xid);
689 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
690 return this.physicalPorts;
694 public OFPhysicalPort getPhysicalPort(Short portNumber) {
695 return this.physicalPorts.get(portNumber);
699 public Integer getPortBandwidth(Short portNumber) {
700 return this.portBandwidth.get(portNumber);
704 public Set<Short> getPorts() {
705 return this.physicalPorts.keySet();
709 public Byte getTables() {
714 public Integer getActions() {
719 public Integer getCapabilities() {
720 return this.capabilities;
724 public Integer getBuffers() {
729 public boolean isPortEnabled(short portNumber) {
730 return isPortEnabled(physicalPorts.get(portNumber));
734 public boolean isPortEnabled(OFPhysicalPort port) {
738 int portConfig = port.getConfig();
739 int portState = port.getState();
740 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
743 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
746 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
754 public List<OFPhysicalPort> getEnabledPorts() {
755 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
756 synchronized (this.physicalPorts) {
757 for (OFPhysicalPort port : physicalPorts.values()) {
758 if (isPortEnabled(port)) {
767 * Transmit thread polls the message out of the priority queue and invokes
768 * messaging service to transmit it over the socket channel
770 class PriorityMessageTransmit implements Runnable {
776 PriorityMessage pmsg = transmitQ.take();
777 msgReadWriteService.asyncSend(pmsg.msg);
779 * If syncReply is set to true, wait for the response back.
781 if (pmsg.syncReply) {
782 syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
784 } catch (InterruptedException ie) {
785 reportError(new InterruptedException(
786 "PriorityMessageTransmit thread interrupted"));
787 } catch (Exception e) {
796 * Setup and start the transmit thread
798 private void startTransmitThread() {
799 this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
800 new Comparator<PriorityMessage>() {
802 public int compare(PriorityMessage p1, PriorityMessage p2) {
803 if (p2.priority != p1.priority) {
804 return p2.priority - p1.priority;
806 return (p2.seqNum < p1.seqNum) ? 1 : -1;
810 this.transmitThread = new Thread(new PriorityMessageTransmit());
811 this.transmitThread.start();
815 * Setup communication services
817 private void setupCommChannel() throws Exception {
818 this.selector = SelectorProvider.provider().openSelector();
819 this.socket.configureBlocking(false);
820 this.socket.socket().setTcpNoDelay(true);
821 this.msgReadWriteService = getMessageReadWriteService();
824 private void sendFirstHello() {
826 OFMessage msg = factory.getMessage(OFType.HELLO);
828 } catch (Exception e) {
833 private IMessageReadWrite getMessageReadWriteService() throws Exception {
834 String str = System.getProperty("secureChannelEnabled");
835 return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
836 socket, selector) : new MessageReadWriteService(socket,
841 * Send Barrier message synchronously. The caller will be blocked until the
842 * Barrier reply is received.
845 public Object syncSendBarrierMessage() {
846 OFBarrierRequest barrierMsg = new OFBarrierRequest();
847 return syncSend(barrierMsg);
851 * Send Barrier message asynchronously. The caller is not blocked. The
852 * Barrier message will be sent in a transmit thread which will be blocked
853 * until the Barrier reply is received.
856 public Object asyncSendBarrierMessage() {
857 if (transmitQ == null) {
858 return Boolean.FALSE;
861 OFBarrierRequest barrierMsg = new OFBarrierRequest();
862 int xid = getNextXid();
864 barrierMsg.setXid(xid);
865 transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
871 * This method returns the switch liveness timeout value. If controller did
872 * not receive any message from the switch for such a long period,
873 * controller will tear down the connection to the switch.
875 * @return The timeout value
877 private static int getSwitchLivenessTimeout() {
878 String timeout = System.getProperty("of.switchLivenessTimeout");
882 if (timeout != null) {
883 rv = Integer.parseInt(timeout);
885 } catch (Exception e) {
892 * This method performs synchronous operations for a given message. If
893 * syncRequest is set to true, the message will be sent out followed by a
894 * Barrier request message. Then it's blocked until the Barrier rely arrives
895 * or timeout. If syncRequest is false, it simply skips the message send and
896 * just waits for the response back.
903 * If set to true, the message the message will be sent out
904 * followed by a Barrier request message. If set to false, it
905 * simply skips the sending and just waits for the Barrier reply.
908 private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
909 SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
910 messageWaitingDone.put(xid, worker);
911 Object result = null;
912 Boolean status = false;
913 Future<Object> submit = executor.submit(worker);
915 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
916 messageWaitingDone.remove(xid);
917 if (result == null) {
918 // if result is null, then it means the switch can handle this
919 // message successfully
920 // convert the result into a Boolean with value true
922 // logger.debug("Successfully send " +
923 // msg.getType().toString());
926 // if result is not null, this means the switch can't handle
928 // the result if OFError already
929 if (logger.isDebugEnabled()) {
930 logger.debug("Send {} failed --> {}", msg.getType(),
935 } catch (Exception e) {
936 logger.warn("Timeout while waiting for {} reply", msg.getType()
938 // convert the result into a Boolean with value false