2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.nio.channels.spi.SelectorProvider;
18 import java.util.ArrayList;
19 import java.util.Comparator;
20 import java.util.Date;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.PriorityBlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.openflow.protocol.OFBarrierReply;
41 import org.openflow.protocol.OFBarrierRequest;
42 import org.openflow.protocol.OFEchoReply;
43 import org.openflow.protocol.OFError;
44 import org.openflow.protocol.OFFeaturesReply;
45 import org.openflow.protocol.OFFlowMod;
46 import org.openflow.protocol.OFGetConfigReply;
47 import org.openflow.protocol.OFMatch;
48 import org.openflow.protocol.OFMessage;
49 import org.openflow.protocol.OFPhysicalPort;
50 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
51 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
52 import org.openflow.protocol.OFPhysicalPort.OFPortState;
53 import org.openflow.protocol.OFPort;
54 import org.openflow.protocol.OFPortStatus;
55 import org.openflow.protocol.OFPortStatus.OFPortReason;
56 import org.openflow.protocol.OFSetConfig;
57 import org.openflow.protocol.OFStatisticsReply;
58 import org.openflow.protocol.OFStatisticsRequest;
59 import org.openflow.protocol.OFType;
60 import org.openflow.protocol.factory.BasicFactory;
61 import org.openflow.util.HexString;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
65 public class SwitchHandler implements ISwitch {
66 private static final Logger logger = LoggerFactory
67 .getLogger(SwitchHandler.class);
68 private static final int SWITCH_LIVENESS_TIMER = 5000;
69 private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
70 private int MESSAGE_RESPONSE_TIMER = 2000;
72 private String instanceName;
73 private ISwitch thisISwitch;
74 private IController core;
76 private Integer buffers;
77 private Integer capabilities;
79 private Integer actions;
80 private Selector selector;
81 private SocketChannel socket;
82 private BasicFactory factory;
83 private AtomicInteger xid;
84 private SwitchState state;
85 private Timer periodicTimer;
86 private Map<Short, OFPhysicalPort> physicalPorts;
87 private Map<Short, Integer> portBandwidth;
88 private Date connectedDate;
89 private Long lastMsgReceivedTimeStamp;
90 private Boolean probeSent;
91 private ExecutorService executor;
92 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
93 private boolean running;
94 private IMessageReadWrite msgReadWriteService;
95 private Thread switchHandlerThread;
96 private Integer responseTimerValue;
97 private PriorityBlockingQueue<PriorityMessage> transmitQ;
98 private Thread transmitThread;
100 private enum SwitchState {
101 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
106 private SwitchState(int value) {
110 @SuppressWarnings("unused")
116 public SwitchHandler(Controller core, SocketChannel sc, String name) {
117 this.instanceName = name;
118 this.thisISwitch = this;
120 this.buffers = (int) 0;
121 this.capabilities = (int) 0;
122 this.tables = (byte) 0;
123 this.actions = (int) 0;
126 this.factory = new BasicFactory();
127 this.connectedDate = new Date();
128 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
129 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
130 this.portBandwidth = new HashMap<Short, Integer>();
131 this.state = SwitchState.NON_OPERATIONAL;
132 this.probeSent = false;
133 this.xid = new AtomicInteger(this.socket.hashCode());
134 this.periodicTimer = null;
135 this.executor = Executors.newFixedThreadPool(4);
136 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
137 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
138 String rTimer = System.getProperty("of.messageResponseTimer");
139 if (rTimer != null) {
141 responseTimerValue = Integer.decode(rTimer);
142 } catch (NumberFormatException e) {
144 "Invalid of.messageResponseTimer: {} use default({})",
145 rTimer, MESSAGE_RESPONSE_TIMER);
150 public void start() {
152 startTransmitThread();
155 startHandlerThread();
156 } catch (Exception e) {
161 private void startHandlerThread() {
162 switchHandlerThread = new Thread(new Runnable() {
168 // wait for an incoming connection
170 Iterator<SelectionKey> selectedKeys = selector
171 .selectedKeys().iterator();
172 while (selectedKeys.hasNext()) {
173 SelectionKey skey = selectedKeys.next();
174 selectedKeys.remove();
175 if (skey.isValid() && skey.isWritable()) {
178 if (skey.isValid() && skey.isReadable()) {
182 } catch (Exception e) {
188 switchHandlerThread.start();
197 } catch (Exception e) {
201 } catch (Exception e) {
204 msgReadWriteService.stop();
205 } catch (Exception e) {
210 msgReadWriteService = null;
212 if (switchHandlerThread != null) {
213 switchHandlerThread.interrupt();
215 if (transmitThread != null) {
216 transmitThread.interrupt();
221 public int getNextXid() {
222 return this.xid.incrementAndGet();
226 * This method puts the message in an outgoing priority queue with normal
227 * priority. It will be served after high priority messages. The method
228 * should be used for non-critical messages such as statistics request,
229 * discovery packets, etc. An unique XID is generated automatically and
230 * inserted into the message.
233 * The OF message to be sent
234 * @return The XID used
237 public Integer asyncSend(OFMessage msg) {
238 return asyncSend(msg, getNextXid());
241 private Object syncSend(OFMessage msg, int xid) {
242 SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
243 messageWaitingDone.put(xid, worker);
244 Object result = null;
245 Boolean status = false;
246 Future<Object> submit = executor.submit(worker);
248 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
249 messageWaitingDone.remove(xid);
250 if (result == null) {
251 // if result is null, then it means the switch can handle this
252 // message successfully
253 // convert the result into a Boolean with value true
255 // logger.debug("Successfully send " +
256 // msg.getType().toString());
259 // if result is not null, this means the switch can't handle
261 // the result if OFError already
262 logger.debug("Send {} failed --> {}", msg.getType().toString(),
263 ((OFError) result).toString());
266 } catch (Exception e) {
267 logger.warn("Timeout while waiting for {} reply", msg.getType()
269 // convert the result into a Boolean with value false
277 * This method puts the message in an outgoing priority queue with normal
278 * priority. It will be served after high priority messages. The method
279 * should be used for non-critical messages such as statistics request,
280 * discovery packets, etc. The specified XID is inserted into the message.
283 * The OF message to be Sent
285 * The XID to be used in the message
286 * @return The XID used
289 public Integer asyncSend(OFMessage msg, int xid) {
291 if (transmitQ != null) {
292 transmitQ.add(new PriorityMessage(msg, 0));
298 * This method puts the message in an outgoing priority queue with high
299 * priority. It will be served first before normal priority messages. The
300 * method should be used for critical messages such as hello, echo reply
301 * etc. An unique XID is generated automatically and inserted into the
305 * The OF message to be sent
306 * @return The XID used
309 public Integer asyncFastSend(OFMessage msg) {
310 return asyncFastSend(msg, getNextXid());
314 * This method puts the message in an outgoing priority queue with high
315 * priority. It will be served first before normal priority messages. The
316 * method should be used for critical messages such as hello, echo reply
317 * etc. The specified XID is inserted into the message.
320 * The OF message to be sent
321 * @return The XID used
324 public Integer asyncFastSend(OFMessage msg, int xid) {
326 if (transmitQ != null) {
327 transmitQ.add(new PriorityMessage(msg, 1));
332 public void resumeSend() {
334 if (msgReadWriteService != null) {
335 msgReadWriteService.resumeSend();
337 } catch (Exception e) {
342 public void handleMessages() {
343 List<OFMessage> msgs = null;
346 msgs = msgReadWriteService.readMessages();
347 } catch (Exception e) {
352 logger.debug("{} is down", toString());
353 // the connection is down, inform core
354 reportSwitchStateChange(false);
357 for (OFMessage msg : msgs) {
358 logger.trace("Message received: {}", msg.toString());
360 * if ((msg.getType() != OFType.ECHO_REQUEST) && (msg.getType() !=
361 * OFType.ECHO_REPLY)) { logger.debug(msg.getType().toString() +
362 * " received from sw " + toString()); }
364 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
365 OFType type = msg.getType();
368 // send feature request
369 OFMessage featureRequest = factory
370 .getMessage(OFType.FEATURES_REQUEST);
371 asyncFastSend(featureRequest);
372 // delete all pre-existing flows
373 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
374 OFFlowMod flowMod = (OFFlowMod) factory
375 .getMessage(OFType.FLOW_MOD);
376 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
377 .setOutPort(OFPort.OFPP_NONE)
378 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
379 asyncFastSend(flowMod);
380 this.state = SwitchState.WAIT_FEATURES_REPLY;
384 OFEchoReply echoReply = (OFEchoReply) factory
385 .getMessage(OFType.ECHO_REPLY);
386 asyncFastSend(echoReply);
389 this.probeSent = false;
392 processFeaturesReply((OFFeaturesReply) msg);
394 case GET_CONFIG_REPLY:
395 // make sure that the switch can send the whole packet to the
397 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
398 this.state = SwitchState.OPERATIONAL;
402 processBarrierReply((OFBarrierReply) msg);
405 processErrorReply((OFError) msg);
408 processPortStatusMsg((OFPortStatus) msg);
411 processStatsReply((OFStatisticsReply) msg);
418 if (isOperational()) {
419 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
424 private void processPortStatusMsg(OFPortStatus msg) {
425 // short portNumber = msg.getDesc().getPortNumber();
426 OFPhysicalPort port = msg.getDesc();
427 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
428 updatePhysicalPort(port);
429 // logger.debug("Port " + portNumber + " on " + toString() +
431 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
432 updatePhysicalPort(port);
433 // logger.debug("Port " + portNumber + " on " + toString() +
435 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
437 deletePhysicalPort(port);
438 // logger.debug("Port " + portNumber + " on " + toString() +
444 private void startSwitchTimer() {
445 this.periodicTimer = new Timer();
446 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
450 Long now = System.currentTimeMillis();
451 if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
453 // switch failed to respond to our probe, consider
455 logger.warn("{} is idle for too long, disconnect",
457 reportSwitchStateChange(false);
459 // send a probe to see if the switch is still alive
460 // logger.debug("Send idle probe (Echo Request) to "
463 OFMessage echo = factory
464 .getMessage(OFType.ECHO_REQUEST);
468 if (state == SwitchState.WAIT_FEATURES_REPLY) {
469 // send another features request
470 OFMessage request = factory
471 .getMessage(OFType.FEATURES_REQUEST);
472 asyncFastSend(request);
474 if (state == SwitchState.WAIT_CONFIG_REPLY) {
475 // send another config request
476 OFSetConfig config = (OFSetConfig) factory
477 .getMessage(OFType.SET_CONFIG);
478 config.setMissSendLength((short) 0xffff)
479 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
480 asyncFastSend(config);
481 OFMessage getConfig = factory
482 .getMessage(OFType.GET_CONFIG_REQUEST);
483 asyncFastSend(getConfig);
487 } catch (Exception e) {
491 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
494 private void cancelSwitchTimer() {
495 if (this.periodicTimer != null) {
496 this.periodicTimer.cancel();
500 private void reportError(Exception e) {
501 if (e instanceof AsynchronousCloseException
502 || e instanceof InterruptedException
503 || e instanceof SocketException
504 || e instanceof IOException) {
505 logger.debug("Caught exception {}", e.getMessage());
507 logger.warn("Caught exception ", e);
509 // notify core of this error event and disconnect the switch
510 ((Controller) core).takeSwitchEventError(this);
513 private void reportSwitchStateChange(boolean added) {
515 ((Controller) core).takeSwtichEventAdd(this);
517 ((Controller) core).takeSwitchEventDelete(this);
522 public Long getId() {
526 private void processFeaturesReply(OFFeaturesReply reply) {
527 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
528 this.sid = reply.getDatapathId();
529 this.buffers = reply.getBuffers();
530 this.capabilities = reply.getCapabilities();
531 this.tables = reply.getTables();
532 this.actions = reply.getActions();
533 // notify core of this error event
534 for (OFPhysicalPort port : reply.getPorts()) {
535 updatePhysicalPort(port);
537 // config the switch to send full data packet
538 OFSetConfig config = (OFSetConfig) factory
539 .getMessage(OFType.SET_CONFIG);
540 config.setMissSendLength((short) 0xffff).setLengthU(
541 OFSetConfig.MINIMUM_LENGTH);
542 asyncFastSend(config);
543 // send config request to make sure the switch can handle the set
545 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
546 asyncFastSend(getConfig);
547 this.state = SwitchState.WAIT_CONFIG_REPLY;
548 // inform core that a new switch is now operational
549 reportSwitchStateChange(true);
553 private void updatePhysicalPort(OFPhysicalPort port) {
554 Short portNumber = port.getPortNumber();
555 physicalPorts.put(portNumber, port);
558 port.getCurrentFeatures()
559 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
560 | OFPortFeatures.OFPPF_10MB_HD
562 | OFPortFeatures.OFPPF_100MB_FD
564 | OFPortFeatures.OFPPF_100MB_HD
566 | OFPortFeatures.OFPPF_1GB_FD
568 | OFPortFeatures.OFPPF_1GB_HD
569 .getValue() | OFPortFeatures.OFPPF_10GB_FD
573 private void deletePhysicalPort(OFPhysicalPort port) {
574 Short portNumber = port.getPortNumber();
575 physicalPorts.remove(portNumber);
576 portBandwidth.remove(portNumber);
580 public boolean isOperational() {
581 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
585 public String toString() {
587 + this.socket.toString()
589 + (isOperational() ? HexString.toHexString(this.sid)
594 public Date getConnectedDate() {
595 return this.connectedDate;
598 public String getInstanceName() {
603 public Object getStatistics(OFStatisticsRequest req) {
604 int xid = getNextXid();
605 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
606 messageWaitingDone.put(xid, worker);
607 Future<Object> submit = executor.submit(worker);
608 Object result = null;
610 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
612 } catch (Exception e) {
613 logger.warn("Timeout while waiting for {} replies", req.getType());
614 result = null; // to indicate timeout has occurred
620 public Object syncSend(OFMessage msg) {
621 int xid = getNextXid();
622 return syncSend(msg, xid);
626 * Either a BarrierReply or a OFError is received. If this is a reply for an
627 * outstanding sync message, wake up associated task so that it can continue
629 private void processBarrierReply(OFBarrierReply msg) {
630 Integer xid = msg.getXid();
631 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
633 if (worker == null) {
639 private void processErrorReply(OFError errorMsg) {
640 OFMessage offendingMsg = errorMsg.getOffendingMsg();
642 if (offendingMsg != null) {
643 xid = offendingMsg.getXid();
645 xid = errorMsg.getXid();
648 * the error can be a reply to a synchronous message or to a statistic
651 Callable<?> worker = messageWaitingDone.remove(xid);
652 if (worker == null) {
655 if (worker instanceof SynchronousMessage) {
656 ((SynchronousMessage) worker).wakeup(errorMsg);
658 ((StatisticsCollector) worker).wakeup(errorMsg);
662 private void processStatsReply(OFStatisticsReply reply) {
663 Integer xid = reply.getXid();
664 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
666 if (worker == null) {
669 if (worker.collect(reply)) {
670 // if all the stats records are received (collect() returns true)
672 messageWaitingDone.remove(xid);
678 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
679 return this.physicalPorts;
683 public OFPhysicalPort getPhysicalPort(Short portNumber) {
684 return this.physicalPorts.get(portNumber);
688 public Integer getPortBandwidth(Short portNumber) {
689 return this.portBandwidth.get(portNumber);
693 public Set<Short> getPorts() {
694 return this.physicalPorts.keySet();
698 public Byte getTables() {
703 public Integer getActions() {
708 public Integer getCapabilities() {
709 return this.capabilities;
713 public Integer getBuffers() {
718 public boolean isPortEnabled(short portNumber) {
719 return isPortEnabled(physicalPorts.get(portNumber));
723 public boolean isPortEnabled(OFPhysicalPort port) {
727 int portConfig = port.getConfig();
728 int portState = port.getState();
729 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
732 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
735 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
743 public List<OFPhysicalPort> getEnabledPorts() {
744 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
745 synchronized (this.physicalPorts) {
746 for (OFPhysicalPort port : physicalPorts.values()) {
747 if (isPortEnabled(port)) {
756 * Transmit thread polls the message out of the priority queue and invokes
757 * messaging service to transmit it over the socket channel
759 class PriorityMessageTransmit implements Runnable {
764 if (!transmitQ.isEmpty()) {
765 PriorityMessage pmsg = transmitQ.poll();
766 msgReadWriteService.asyncSend(pmsg.msg);
767 logger.trace("Message sent: {}", pmsg.toString());
770 } catch (InterruptedException ie) {
771 reportError(new InterruptedException(
772 "PriorityMessageTransmit thread interrupted"));
773 } catch (Exception e) {
782 * Setup and start the transmit thread
784 private void startTransmitThread() {
785 this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
786 new Comparator<PriorityMessage>() {
787 public int compare(PriorityMessage p1, PriorityMessage p2) {
788 if (p2.priority != p1.priority) {
789 return p2.priority - p1.priority;
791 return (p2.seqNum < p1.seqNum) ? 1 : -1;
795 this.transmitThread = new Thread(new PriorityMessageTransmit());
796 this.transmitThread.start();
800 * Setup communication services
802 private void setupCommChannel() throws Exception {
803 this.selector = SelectorProvider.provider().openSelector();
804 this.socket.configureBlocking(false);
805 this.socket.socket().setTcpNoDelay(true);
806 this.msgReadWriteService = getMessageReadWriteService();
809 private void sendFirstHello() {
811 OFMessage msg = factory.getMessage(OFType.HELLO);
813 } catch (Exception e) {
818 private IMessageReadWrite getMessageReadWriteService() throws Exception {
819 String str = System.getProperty("secureChannelEnabled");
820 return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
821 socket, selector) : new MessageReadWriteService(socket,
826 * Sends synchronous Barrier message
829 public Object sendBarrierMessage() {
830 OFBarrierRequest barrierMsg = new OFBarrierRequest();
831 return syncSend(barrierMsg);
835 * This method returns the switch liveness timeout value. If controller did
836 * not receive any message from the switch for such a long period,
837 * controller will tear down the connection to the switch.
839 * @return The timeout value
841 private static int getSwitchLivenessTimeout() {
842 String timeout = System.getProperty("of.switchLivenessTimeout");
846 if (timeout != null) {
847 rv = Integer.parseInt(timeout);
849 } catch (Exception e) {