3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
12 import java.nio.channels.AsynchronousCloseException;
13 import java.nio.channels.SelectionKey;
14 import java.nio.channels.Selector;
15 import java.nio.channels.SocketChannel;
16 import java.nio.channels.spi.SelectorProvider;
17 import java.util.ArrayList;
18 import java.util.Comparator;
19 import java.util.Date;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.List;
25 import java.util.Timer;
26 import java.util.TimerTask;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.PriorityBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
39 import org.openflow.protocol.OFBarrierReply;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFError;
42 import org.openflow.protocol.OFFeaturesReply;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFGetConfigReply;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFMessage;
47 import org.openflow.protocol.OFPhysicalPort;
48 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
49 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
50 import org.openflow.protocol.OFPhysicalPort.OFPortState;
51 import org.openflow.protocol.OFPort;
52 import org.openflow.protocol.OFPortStatus;
53 import org.openflow.protocol.OFPortStatus.OFPortReason;
54 import org.openflow.protocol.OFSetConfig;
55 import org.openflow.protocol.OFStatisticsReply;
56 import org.openflow.protocol.OFStatisticsRequest;
57 import org.openflow.protocol.OFType;
58 import org.openflow.protocol.factory.BasicFactory;
59 import org.openflow.util.HexString;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
63 public class SwitchHandler implements ISwitch {
64 private static final Logger logger = LoggerFactory
65 .getLogger(SwitchHandler.class);
66 private static final int SWITCH_LIVENESS_TIMER = 5000;
67 private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
68 private int MESSAGE_RESPONSE_TIMER = 2000;
70 private String instanceName;
71 private ISwitch thisISwitch;
72 private IController core;
74 private Integer buffers;
75 private Integer capabilities;
77 private Integer actions;
78 private Selector selector;
79 private SocketChannel socket;
80 private BasicFactory factory;
81 private AtomicInteger xid;
82 private SwitchState state;
83 private Timer periodicTimer;
84 private Map<Short, OFPhysicalPort> physicalPorts;
85 private Map<Short, Integer> portBandwidth;
86 private Date connectedDate;
87 private Long lastMsgReceivedTimeStamp;
88 private Boolean probeSent;
89 private ExecutorService executor;
90 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
91 private boolean running;
92 private IMessageReadWrite msgReadWriteService;
93 private Thread switchHandlerThread;
94 private Integer responseTimerValue;
95 private PriorityBlockingQueue<PriorityMessage> transmitQ;
96 private Thread transmitThread;
98 private enum SwitchState {
99 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
104 private SwitchState(int value) {
108 @SuppressWarnings("unused")
114 public SwitchHandler(Controller core, SocketChannel sc, String name) {
115 this.instanceName = name;
116 this.thisISwitch = this;
118 this.buffers = (int)0;
119 this.capabilities = (int)0;
120 this.tables = (byte)0;
121 this.actions = (int)0;
124 this.factory = new BasicFactory();
125 this.connectedDate = new Date();
126 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
127 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
128 this.portBandwidth = new HashMap<Short, Integer>();
129 this.state = SwitchState.NON_OPERATIONAL;
130 this.probeSent = false;
131 this.xid = new AtomicInteger(this.socket.hashCode());
132 this.periodicTimer = null;
133 this.executor = Executors.newFixedThreadPool(4);
134 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
135 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
136 String rTimer = System.getProperty("of.messageResponseTimer");
137 if (rTimer != null) {
139 responseTimerValue = Integer.decode(rTimer);
140 } catch (NumberFormatException e) {
141 logger.warn("Invalid of.messageResponseTimer: {} use default({})",
142 rTimer, MESSAGE_RESPONSE_TIMER);
147 public void start() {
149 startTransmitThread();
152 startHandlerThread();
153 } catch (Exception e) {
158 private void startHandlerThread() {
159 switchHandlerThread = new Thread(new Runnable() {
165 // wait for an incoming connection
167 Iterator<SelectionKey> selectedKeys = selector
168 .selectedKeys().iterator();
169 while (selectedKeys.hasNext()) {
170 SelectionKey skey = selectedKeys.next();
171 selectedKeys.remove();
172 if (skey.isValid() && skey.isWritable()) {
175 if (skey.isValid() && skey.isReadable()) {
179 } catch (Exception e) {
185 switchHandlerThread.start();
193 this.selector.close();
196 } catch (Exception e) {
197 // do nothing since we are shutting down.
203 public int getNextXid() {
204 return this.xid.incrementAndGet();
208 * This method puts the message in an outgoing priority queue with normal
209 * priority. It will be served after high priority messages. The method
210 * should be used for non-critical messages such as statistics request,
211 * discovery packets, etc. An unique XID is generated automatically and
212 * inserted into the message.
214 * @param msg The OF message to be sent
215 * @return The XID used
218 public Integer asyncSend(OFMessage msg) {
219 return asyncSend(msg, getNextXid());
223 * This method puts the message in an outgoing priority queue with normal
224 * priority. It will be served after high priority messages. The method
225 * should be used for non-critical messages such as statistics request,
226 * discovery packets, etc. The specified XID is inserted into the message.
228 * @param msg The OF message to be Sent
229 * @param xid The XID to be used in the message
230 * @return The XID used
233 public Integer asyncSend(OFMessage msg, int xid) {
235 transmitQ.add(new PriorityMessage(msg, 0));
240 * This method puts the message in an outgoing priority queue with high
241 * priority. It will be served first before normal priority messages. The
242 * method should be used for critical messages such as hello, echo reply
243 * etc. An unique XID is generated automatically and inserted into the
246 * @param msg The OF message to be sent
247 * @return The XID used
250 public Integer asyncFastSend(OFMessage msg) {
251 return asyncFastSend(msg, getNextXid());
255 * This method puts the message in an outgoing priority queue with high
256 * priority. It will be served first before normal priority messages. The
257 * method should be used for critical messages such as hello, echo reply
258 * etc. The specified XID is inserted into the message.
260 * @param msg The OF message to be sent
261 * @return The XID used
264 public Integer asyncFastSend(OFMessage msg, int xid) {
266 transmitQ.add(new PriorityMessage(msg, 1));
270 public void resumeSend() {
272 msgReadWriteService.resumeSend();
273 } catch (Exception e) {
278 public void handleMessages() {
279 List<OFMessage> msgs = null;
282 msgs = msgReadWriteService.readMessages();
283 } catch (Exception e) {
288 logger.debug("{} is down", toString());
289 // the connection is down, inform core
290 reportSwitchStateChange(false);
293 for (OFMessage msg : msgs) {
294 logger.trace("Message received: {}", msg.toString());
296 if ((msg.getType() != OFType.ECHO_REQUEST) &&
297 (msg.getType() != OFType.ECHO_REPLY)) {
298 logger.debug(msg.getType().toString() + " received from sw " + toString());
301 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
302 OFType type = msg.getType();
305 // send feature request
306 OFMessage featureRequest = factory
307 .getMessage(OFType.FEATURES_REQUEST);
308 asyncFastSend(featureRequest);
309 // delete all pre-existing flows
310 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
311 OFFlowMod flowMod = (OFFlowMod) factory
312 .getMessage(OFType.FLOW_MOD);
313 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
314 .setOutPort(OFPort.OFPP_NONE).setLength(
315 (short) OFFlowMod.MINIMUM_LENGTH);
316 asyncFastSend(flowMod);
317 this.state = SwitchState.WAIT_FEATURES_REPLY;
321 OFEchoReply echoReply = (OFEchoReply) factory
322 .getMessage(OFType.ECHO_REPLY);
323 asyncFastSend(echoReply);
326 this.probeSent = false;
329 processFeaturesReply((OFFeaturesReply) msg);
331 case GET_CONFIG_REPLY:
332 // make sure that the switch can send the whole packet to the controller
333 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
334 this.state = SwitchState.OPERATIONAL;
338 processBarrierReply((OFBarrierReply) msg);
341 processErrorReply((OFError) msg);
344 processPortStatusMsg((OFPortStatus) msg);
347 processStatsReply((OFStatisticsReply) msg);
354 if (isOperational()) {
355 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
360 private void processPortStatusMsg(OFPortStatus msg) {
361 //short portNumber = msg.getDesc().getPortNumber();
362 OFPhysicalPort port = msg.getDesc();
363 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
364 updatePhysicalPort(port);
365 //logger.debug("Port " + portNumber + " on " + toString() + " modified");
366 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
367 updatePhysicalPort(port);
368 //logger.debug("Port " + portNumber + " on " + toString() + " added");
369 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
371 deletePhysicalPort(port);
372 //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
377 private void startSwitchTimer() {
378 this.periodicTimer = new Timer();
379 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
383 Long now = System.currentTimeMillis();
384 if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
386 // switch failed to respond to our probe, consider it down
387 logger.warn("{} is idle for too long, disconnect", toString());
388 reportSwitchStateChange(false);
390 // send a probe to see if the switch is still alive
391 //logger.debug("Send idle probe (Echo Request) to " + switchName());
393 OFMessage echo = factory
394 .getMessage(OFType.ECHO_REQUEST);
398 if (state == SwitchState.WAIT_FEATURES_REPLY) {
399 // send another features request
400 OFMessage request = factory
401 .getMessage(OFType.FEATURES_REQUEST);
402 asyncFastSend(request);
404 if (state == SwitchState.WAIT_CONFIG_REPLY) {
405 // send another config request
406 OFSetConfig config = (OFSetConfig) factory
407 .getMessage(OFType.SET_CONFIG);
408 config.setMissSendLength((short) 0xffff)
409 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
410 asyncFastSend(config);
411 OFMessage getConfig = factory
412 .getMessage(OFType.GET_CONFIG_REQUEST);
413 asyncFastSend(getConfig);
417 } catch (Exception e) {
421 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
424 private void cancelSwitchTimer() {
425 if (this.periodicTimer != null) {
426 this.periodicTimer.cancel();
430 private void reportError(Exception e) {
431 if (e instanceof AsynchronousCloseException) {
432 logger.debug("Caught exception {}", e.getMessage());
434 logger.warn("Caught exception {}", e.getMessage());
436 // notify core of this error event and disconnect the switch
437 ((Controller) core).takeSwitchEventError(this);
440 private void reportSwitchStateChange(boolean added) {
442 ((Controller) core).takeSwtichEventAdd(this);
444 ((Controller) core).takeSwitchEventDelete(this);
449 public Long getId() {
453 private void processFeaturesReply(OFFeaturesReply reply) {
454 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
455 this.sid = reply.getDatapathId();
456 this.buffers = reply.getBuffers();
457 this.capabilities = reply.getCapabilities();
458 this.tables = reply.getTables();
459 this.actions = reply.getActions();
460 // notify core of this error event
461 for (OFPhysicalPort port : reply.getPorts()) {
462 updatePhysicalPort(port);
464 // config the switch to send full data packet
465 OFSetConfig config = (OFSetConfig) factory
466 .getMessage(OFType.SET_CONFIG);
467 config.setMissSendLength((short) 0xffff).setLengthU(
468 OFSetConfig.MINIMUM_LENGTH);
469 asyncFastSend(config);
470 // send config request to make sure the switch can handle the set config
471 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
472 asyncFastSend(getConfig);
473 this.state = SwitchState.WAIT_CONFIG_REPLY;
474 // inform core that a new switch is now operational
475 reportSwitchStateChange(true);
479 private void updatePhysicalPort(OFPhysicalPort port) {
480 Short portNumber = port.getPortNumber();
481 physicalPorts.put(portNumber, port);
485 port.getCurrentFeatures()
486 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
487 | OFPortFeatures.OFPPF_10MB_HD
489 | OFPortFeatures.OFPPF_100MB_FD
491 | OFPortFeatures.OFPPF_100MB_HD
493 | OFPortFeatures.OFPPF_1GB_FD
495 | OFPortFeatures.OFPPF_1GB_HD
496 .getValue() | OFPortFeatures.OFPPF_10GB_FD
500 private void deletePhysicalPort(OFPhysicalPort port) {
501 Short portNumber = port.getPortNumber();
502 physicalPorts.remove(portNumber);
503 portBandwidth.remove(portNumber);
507 public boolean isOperational() {
508 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
512 public String toString() {
514 + this.socket.toString()
516 + (isOperational() ? HexString.toHexString(this.sid)
521 public Date getConnectedDate() {
522 return this.connectedDate;
525 public String getInstanceName() {
530 public Object getStatistics(OFStatisticsRequest req) {
531 int xid = getNextXid();
532 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
533 messageWaitingDone.put(xid, worker);
534 Future<Object> submit = executor.submit(worker);
535 Object result = null;
538 .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
540 } catch (Exception e) {
541 logger.warn("Timeout while waiting for {} replies", req.getType());
542 result = null; // to indicate timeout has occurred
548 public Object syncSend(OFMessage msg) {
549 Integer xid = getNextXid();
550 SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
551 messageWaitingDone.put(xid, worker);
552 Object result = null;
553 Boolean status = false;
554 Future<Object> submit = executor.submit(worker);
557 .get(responseTimerValue, TimeUnit.MILLISECONDS);
558 messageWaitingDone.remove(xid);
559 if (result == null) {
560 // if result is null, then it means the switch can handle this message successfully
561 // convert the result into a Boolean with value true
563 //logger.debug("Successfully send " + msg.getType().toString());
566 // if result is not null, this means the switch can't handle this message
567 // the result if OFError already
568 logger.debug("Send {} failed --> {}",
569 msg.getType().toString(), ((OFError) result).toString());
572 } catch (Exception e) {
573 logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
574 // convert the result into a Boolean with value false
582 * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
583 * wake up associated task so that it can continue
585 private void processBarrierReply(OFBarrierReply msg) {
586 Integer xid = msg.getXid();
587 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
589 if (worker == null) {
595 private void processErrorReply(OFError errorMsg) {
596 OFMessage offendingMsg = errorMsg.getOffendingMsg();
598 if (offendingMsg != null) {
599 xid = offendingMsg.getXid();
601 xid = errorMsg.getXid();
604 * the error can be a reply to a synchronous message or to a statistic request message
606 Callable<?> worker = messageWaitingDone.remove(xid);
607 if (worker == null) {
610 if (worker instanceof SynchronousMessage) {
611 ((SynchronousMessage) worker).wakeup(errorMsg);
613 ((StatisticsCollector) worker).wakeup(errorMsg);
617 private void processStatsReply(OFStatisticsReply reply) {
618 Integer xid = reply.getXid();
619 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
621 if (worker == null) {
624 if (worker.collect(reply)) {
625 // if all the stats records are received (collect() returns true)
627 messageWaitingDone.remove(xid);
633 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
634 return this.physicalPorts;
638 public OFPhysicalPort getPhysicalPort(Short portNumber) {
639 return this.physicalPorts.get(portNumber);
643 public Integer getPortBandwidth(Short portNumber) {
644 return this.portBandwidth.get(portNumber);
648 public Set<Short> getPorts() {
649 return this.physicalPorts.keySet();
653 public Byte getTables() {
658 public Integer getActions() {
663 public Integer getCapabilities() {
664 return this.capabilities;
668 public Integer getBuffers() {
673 public boolean isPortEnabled(short portNumber) {
674 return isPortEnabled(physicalPorts.get(portNumber));
678 public boolean isPortEnabled(OFPhysicalPort port) {
682 int portConfig = port.getConfig();
683 int portState = port.getState();
684 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
687 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
690 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
698 public List<OFPhysicalPort> getEnabledPorts() {
699 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
700 synchronized (this.physicalPorts) {
701 for (OFPhysicalPort port : physicalPorts.values()) {
702 if (isPortEnabled(port)) {
711 * Transmit thread polls the message out of the priority queue and invokes
712 * messaging service to transmit it over the socket channel
714 class PriorityMessageTransmit implements Runnable {
718 if (!transmitQ.isEmpty()) {
719 PriorityMessage pmsg = transmitQ.poll();
720 msgReadWriteService.asyncSend(pmsg.msg);
721 logger.trace("Message sent: {}", pmsg.toString());
724 } catch (Exception e) {
732 * Setup and start the transmit thread
734 private void startTransmitThread() {
735 this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
736 new Comparator<PriorityMessage>() {
737 public int compare(PriorityMessage p1, PriorityMessage p2) {
738 return p2.priority - p1.priority;
741 this.transmitThread = new Thread(new PriorityMessageTransmit());
742 this.transmitThread.start();
746 * Setup communication services
748 private void setupCommChannel() throws Exception {
749 this.selector = SelectorProvider.provider().openSelector();
750 this.socket.configureBlocking(false);
751 this.socket.socket().setTcpNoDelay(true);
752 this.msgReadWriteService = getMessageReadWriteService();
755 private void sendFirstHello() {
757 OFMessage msg = factory.getMessage(OFType.HELLO);
759 } catch (Exception e) {
764 private IMessageReadWrite getMessageReadWriteService() throws Exception {
765 String str = System.getProperty("secureChannelEnabled");
766 return ((str != null) && (str.equalsIgnoreCase("true"))) ?
767 new SecureMessageReadWriteService(socket, selector) :
768 new MessageReadWriteService(socket, selector);