3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.nio.channels.spi.SelectorProvider;
18 import java.util.ArrayList;
19 import java.util.Comparator;
20 import java.util.Date;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.PriorityBlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.openflow.protocol.OFBarrierReply;
41 import org.openflow.protocol.OFEchoReply;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFeaturesReply;
44 import org.openflow.protocol.OFFlowMod;
45 import org.openflow.protocol.OFGetConfigReply;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFPhysicalPort;
49 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
50 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
51 import org.openflow.protocol.OFPhysicalPort.OFPortState;
52 import org.openflow.protocol.OFPort;
53 import org.openflow.protocol.OFPortStatus;
54 import org.openflow.protocol.OFPortStatus.OFPortReason;
55 import org.openflow.protocol.OFSetConfig;
56 import org.openflow.protocol.OFStatisticsReply;
57 import org.openflow.protocol.OFStatisticsRequest;
58 import org.openflow.protocol.OFType;
59 import org.openflow.protocol.factory.BasicFactory;
60 import org.openflow.util.HexString;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
64 public class SwitchHandler implements ISwitch {
65 private static final Logger logger = LoggerFactory
66 .getLogger(SwitchHandler.class);
67 private static final int SWITCH_LIVENESS_TIMER = 5000;
68 private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
69 private int MESSAGE_RESPONSE_TIMER = 2000;
71 private String instanceName;
72 private ISwitch thisISwitch;
73 private IController core;
75 private Integer buffers;
76 private Integer capabilities;
78 private Integer actions;
79 private Selector selector;
80 private SocketChannel socket;
81 private BasicFactory factory;
82 private AtomicInteger xid;
83 private SwitchState state;
84 private Timer periodicTimer;
85 private Map<Short, OFPhysicalPort> physicalPorts;
86 private Map<Short, Integer> portBandwidth;
87 private Date connectedDate;
88 private Long lastMsgReceivedTimeStamp;
89 private Boolean probeSent;
90 private ExecutorService executor;
91 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
92 private boolean running;
93 private IMessageReadWrite msgReadWriteService;
94 private Thread switchHandlerThread;
95 private Integer responseTimerValue;
96 private PriorityBlockingQueue<PriorityMessage> transmitQ;
97 private Thread transmitThread;
99 private enum SwitchState {
100 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
105 private SwitchState(int value) {
109 @SuppressWarnings("unused")
115 public SwitchHandler(Controller core, SocketChannel sc, String name) {
116 this.instanceName = name;
117 this.thisISwitch = this;
119 this.buffers = (int)0;
120 this.capabilities = (int)0;
121 this.tables = (byte)0;
122 this.actions = (int)0;
125 this.factory = new BasicFactory();
126 this.connectedDate = new Date();
127 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
128 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
129 this.portBandwidth = new HashMap<Short, Integer>();
130 this.state = SwitchState.NON_OPERATIONAL;
131 this.probeSent = false;
132 this.xid = new AtomicInteger(this.socket.hashCode());
133 this.periodicTimer = null;
134 this.executor = Executors.newFixedThreadPool(4);
135 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
136 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
137 String rTimer = System.getProperty("of.messageResponseTimer");
138 if (rTimer != null) {
140 responseTimerValue = Integer.decode(rTimer);
141 } catch (NumberFormatException e) {
142 logger.warn("Invalid of.messageResponseTimer: {} use default({})",
143 rTimer, MESSAGE_RESPONSE_TIMER);
148 public void start() {
150 startTransmitThread();
153 startHandlerThread();
154 } catch (Exception e) {
159 private void startHandlerThread() {
160 switchHandlerThread = new Thread(new Runnable() {
166 // wait for an incoming connection
168 Iterator<SelectionKey> selectedKeys = selector
169 .selectedKeys().iterator();
170 while (selectedKeys.hasNext()) {
171 SelectionKey skey = selectedKeys.next();
172 selectedKeys.remove();
173 if (skey.isValid() && skey.isWritable()) {
176 if (skey.isValid() && skey.isReadable()) {
180 } catch (Exception e) {
186 switchHandlerThread.start();
195 } catch (Exception e) {
199 } catch (Exception e) {
202 msgReadWriteService.stop();
203 } catch (Exception e) {
209 msgReadWriteService = null;
211 if (switchHandlerThread != null) {
212 switchHandlerThread.interrupt();
214 if (transmitThread != null) {
215 transmitThread.interrupt();
220 public int getNextXid() {
221 return this.xid.incrementAndGet();
225 * This method puts the message in an outgoing priority queue with normal
226 * priority. It will be served after high priority messages. The method
227 * should be used for non-critical messages such as statistics request,
228 * discovery packets, etc. An unique XID is generated automatically and
229 * inserted into the message.
231 * @param msg The OF message to be sent
232 * @return The XID used
235 public Integer asyncSend(OFMessage msg) {
236 return asyncSend(msg, getNextXid());
240 * This method puts the message in an outgoing priority queue with normal
241 * priority. It will be served after high priority messages. The method
242 * should be used for non-critical messages such as statistics request,
243 * discovery packets, etc. The specified XID is inserted into the message.
245 * @param msg The OF message to be Sent
246 * @param xid The XID to be used in the message
247 * @return The XID used
250 public Integer asyncSend(OFMessage msg, int xid) {
252 if (transmitQ != null) {
253 transmitQ.add(new PriorityMessage(msg, 0));
259 * This method puts the message in an outgoing priority queue with high
260 * priority. It will be served first before normal priority messages. The
261 * method should be used for critical messages such as hello, echo reply
262 * etc. An unique XID is generated automatically and inserted into the
265 * @param msg The OF message to be sent
266 * @return The XID used
269 public Integer asyncFastSend(OFMessage msg) {
270 return asyncFastSend(msg, getNextXid());
274 * This method puts the message in an outgoing priority queue with high
275 * priority. It will be served first before normal priority messages. The
276 * method should be used for critical messages such as hello, echo reply
277 * etc. The specified XID is inserted into the message.
279 * @param msg The OF message to be sent
280 * @return The XID used
283 public Integer asyncFastSend(OFMessage msg, int xid) {
285 if (transmitQ != null) {
286 transmitQ.add(new PriorityMessage(msg, 1));
291 public void resumeSend() {
293 if (msgReadWriteService != null) {
294 msgReadWriteService.resumeSend();
296 } catch (Exception e) {
301 public void handleMessages() {
302 List<OFMessage> msgs = null;
305 msgs = msgReadWriteService.readMessages();
306 } catch (Exception e) {
311 logger.debug("{} is down", toString());
312 // the connection is down, inform core
313 reportSwitchStateChange(false);
316 for (OFMessage msg : msgs) {
317 logger.trace("Message received: {}", msg.toString());
319 if ((msg.getType() != OFType.ECHO_REQUEST) &&
320 (msg.getType() != OFType.ECHO_REPLY)) {
321 logger.debug(msg.getType().toString() + " received from sw " + toString());
324 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
325 OFType type = msg.getType();
328 // send feature request
329 OFMessage featureRequest = factory
330 .getMessage(OFType.FEATURES_REQUEST);
331 asyncFastSend(featureRequest);
332 // delete all pre-existing flows
333 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
334 OFFlowMod flowMod = (OFFlowMod) factory
335 .getMessage(OFType.FLOW_MOD);
336 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
337 .setOutPort(OFPort.OFPP_NONE).setLength(
338 (short) OFFlowMod.MINIMUM_LENGTH);
339 asyncFastSend(flowMod);
340 this.state = SwitchState.WAIT_FEATURES_REPLY;
344 OFEchoReply echoReply = (OFEchoReply) factory
345 .getMessage(OFType.ECHO_REPLY);
346 asyncFastSend(echoReply);
349 this.probeSent = false;
352 processFeaturesReply((OFFeaturesReply) msg);
354 case GET_CONFIG_REPLY:
355 // make sure that the switch can send the whole packet to the controller
356 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
357 this.state = SwitchState.OPERATIONAL;
361 processBarrierReply((OFBarrierReply) msg);
364 processErrorReply((OFError) msg);
367 processPortStatusMsg((OFPortStatus) msg);
370 processStatsReply((OFStatisticsReply) msg);
377 if (isOperational()) {
378 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
383 private void processPortStatusMsg(OFPortStatus msg) {
384 //short portNumber = msg.getDesc().getPortNumber();
385 OFPhysicalPort port = msg.getDesc();
386 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
387 updatePhysicalPort(port);
388 //logger.debug("Port " + portNumber + " on " + toString() + " modified");
389 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
390 updatePhysicalPort(port);
391 //logger.debug("Port " + portNumber + " on " + toString() + " added");
392 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
394 deletePhysicalPort(port);
395 //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
400 private void startSwitchTimer() {
401 this.periodicTimer = new Timer();
402 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
406 Long now = System.currentTimeMillis();
407 if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
409 // switch failed to respond to our probe, consider it down
410 logger.warn("{} is idle for too long, disconnect", toString());
411 reportSwitchStateChange(false);
413 // send a probe to see if the switch is still alive
414 //logger.debug("Send idle probe (Echo Request) to " + switchName());
416 OFMessage echo = factory
417 .getMessage(OFType.ECHO_REQUEST);
421 if (state == SwitchState.WAIT_FEATURES_REPLY) {
422 // send another features request
423 OFMessage request = factory
424 .getMessage(OFType.FEATURES_REQUEST);
425 asyncFastSend(request);
427 if (state == SwitchState.WAIT_CONFIG_REPLY) {
428 // send another config request
429 OFSetConfig config = (OFSetConfig) factory
430 .getMessage(OFType.SET_CONFIG);
431 config.setMissSendLength((short) 0xffff)
432 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
433 asyncFastSend(config);
434 OFMessage getConfig = factory
435 .getMessage(OFType.GET_CONFIG_REQUEST);
436 asyncFastSend(getConfig);
440 } catch (Exception e) {
444 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
447 private void cancelSwitchTimer() {
448 if (this.periodicTimer != null) {
449 this.periodicTimer.cancel();
453 private void reportError(Exception e) {
454 if (e instanceof AsynchronousCloseException ||
455 e instanceof InterruptedException ||
456 e instanceof SocketException) {
457 logger.debug("Caught exception {}", e.getMessage());
459 logger.warn("Caught exception {}", e.getMessage());
461 // notify core of this error event and disconnect the switch
462 ((Controller) core).takeSwitchEventError(this);
465 private void reportSwitchStateChange(boolean added) {
467 ((Controller) core).takeSwtichEventAdd(this);
469 ((Controller) core).takeSwitchEventDelete(this);
474 public Long getId() {
478 private void processFeaturesReply(OFFeaturesReply reply) {
479 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
480 this.sid = reply.getDatapathId();
481 this.buffers = reply.getBuffers();
482 this.capabilities = reply.getCapabilities();
483 this.tables = reply.getTables();
484 this.actions = reply.getActions();
485 // notify core of this error event
486 for (OFPhysicalPort port : reply.getPorts()) {
487 updatePhysicalPort(port);
489 // config the switch to send full data packet
490 OFSetConfig config = (OFSetConfig) factory
491 .getMessage(OFType.SET_CONFIG);
492 config.setMissSendLength((short) 0xffff).setLengthU(
493 OFSetConfig.MINIMUM_LENGTH);
494 asyncFastSend(config);
495 // send config request to make sure the switch can handle the set config
496 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
497 asyncFastSend(getConfig);
498 this.state = SwitchState.WAIT_CONFIG_REPLY;
499 // inform core that a new switch is now operational
500 reportSwitchStateChange(true);
504 private void updatePhysicalPort(OFPhysicalPort port) {
505 Short portNumber = port.getPortNumber();
506 physicalPorts.put(portNumber, port);
510 port.getCurrentFeatures()
511 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
512 | OFPortFeatures.OFPPF_10MB_HD
514 | OFPortFeatures.OFPPF_100MB_FD
516 | OFPortFeatures.OFPPF_100MB_HD
518 | OFPortFeatures.OFPPF_1GB_FD
520 | OFPortFeatures.OFPPF_1GB_HD
521 .getValue() | OFPortFeatures.OFPPF_10GB_FD
525 private void deletePhysicalPort(OFPhysicalPort port) {
526 Short portNumber = port.getPortNumber();
527 physicalPorts.remove(portNumber);
528 portBandwidth.remove(portNumber);
532 public boolean isOperational() {
533 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
537 public String toString() {
539 + this.socket.toString()
541 + (isOperational() ? HexString.toHexString(this.sid)
546 public Date getConnectedDate() {
547 return this.connectedDate;
550 public String getInstanceName() {
555 public Object getStatistics(OFStatisticsRequest req) {
556 int xid = getNextXid();
557 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
558 messageWaitingDone.put(xid, worker);
559 Future<Object> submit = executor.submit(worker);
560 Object result = null;
563 .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
565 } catch (Exception e) {
566 logger.warn("Timeout while waiting for {} replies", req.getType());
567 result = null; // to indicate timeout has occurred
573 public Object syncSend(OFMessage msg) {
574 Integer xid = getNextXid();
575 SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
576 messageWaitingDone.put(xid, worker);
577 Object result = null;
578 Boolean status = false;
579 Future<Object> submit = executor.submit(worker);
582 .get(responseTimerValue, TimeUnit.MILLISECONDS);
583 messageWaitingDone.remove(xid);
584 if (result == null) {
585 // if result is null, then it means the switch can handle this message successfully
586 // convert the result into a Boolean with value true
588 //logger.debug("Successfully send " + msg.getType().toString());
591 // if result is not null, this means the switch can't handle this message
592 // the result if OFError already
593 logger.debug("Send {} failed --> {}",
594 msg.getType().toString(), ((OFError) result).toString());
597 } catch (Exception e) {
598 logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
599 // convert the result into a Boolean with value false
607 * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
608 * wake up associated task so that it can continue
610 private void processBarrierReply(OFBarrierReply msg) {
611 Integer xid = msg.getXid();
612 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
614 if (worker == null) {
620 private void processErrorReply(OFError errorMsg) {
621 OFMessage offendingMsg = errorMsg.getOffendingMsg();
623 if (offendingMsg != null) {
624 xid = offendingMsg.getXid();
626 xid = errorMsg.getXid();
629 * the error can be a reply to a synchronous message or to a statistic request message
631 Callable<?> worker = messageWaitingDone.remove(xid);
632 if (worker == null) {
635 if (worker instanceof SynchronousMessage) {
636 ((SynchronousMessage) worker).wakeup(errorMsg);
638 ((StatisticsCollector) worker).wakeup(errorMsg);
642 private void processStatsReply(OFStatisticsReply reply) {
643 Integer xid = reply.getXid();
644 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
646 if (worker == null) {
649 if (worker.collect(reply)) {
650 // if all the stats records are received (collect() returns true)
652 messageWaitingDone.remove(xid);
658 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
659 return this.physicalPorts;
663 public OFPhysicalPort getPhysicalPort(Short portNumber) {
664 return this.physicalPorts.get(portNumber);
668 public Integer getPortBandwidth(Short portNumber) {
669 return this.portBandwidth.get(portNumber);
673 public Set<Short> getPorts() {
674 return this.physicalPorts.keySet();
678 public Byte getTables() {
683 public Integer getActions() {
688 public Integer getCapabilities() {
689 return this.capabilities;
693 public Integer getBuffers() {
698 public boolean isPortEnabled(short portNumber) {
699 return isPortEnabled(physicalPorts.get(portNumber));
703 public boolean isPortEnabled(OFPhysicalPort port) {
707 int portConfig = port.getConfig();
708 int portState = port.getState();
709 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
712 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
715 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
723 public List<OFPhysicalPort> getEnabledPorts() {
724 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
725 synchronized (this.physicalPorts) {
726 for (OFPhysicalPort port : physicalPorts.values()) {
727 if (isPortEnabled(port)) {
736 * Transmit thread polls the message out of the priority queue and invokes
737 * messaging service to transmit it over the socket channel
739 class PriorityMessageTransmit implements Runnable {
744 if (!transmitQ.isEmpty()) {
745 PriorityMessage pmsg = transmitQ.poll();
746 msgReadWriteService.asyncSend(pmsg.msg);
747 logger.trace("Message sent: {}", pmsg.toString());
750 } catch (InterruptedException ie) {
751 reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
752 } catch (Exception e) {
761 * Setup and start the transmit thread
763 private void startTransmitThread() {
764 this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
765 new Comparator<PriorityMessage>() {
766 public int compare(PriorityMessage p1, PriorityMessage p2) {
767 return p2.priority - p1.priority;
770 this.transmitThread = new Thread(new PriorityMessageTransmit());
771 this.transmitThread.start();
775 * Setup communication services
777 private void setupCommChannel() throws Exception {
778 this.selector = SelectorProvider.provider().openSelector();
779 this.socket.configureBlocking(false);
780 this.socket.socket().setTcpNoDelay(true);
781 this.msgReadWriteService = getMessageReadWriteService();
784 private void sendFirstHello() {
786 OFMessage msg = factory.getMessage(OFType.HELLO);
788 } catch (Exception e) {
793 private IMessageReadWrite getMessageReadWriteService() throws Exception {
794 String str = System.getProperty("secureChannelEnabled");
795 return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ?
796 new SecureMessageReadWriteService(socket, selector) :
797 new MessageReadWriteService(socket, selector);