3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
12 import java.io.IOException;
13 import java.nio.ByteBuffer;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.nio.channels.spi.SelectorProvider;
18 import java.util.ArrayList;
19 import java.util.Date;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.List;
25 import java.util.Timer;
26 import java.util.TimerTask;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
37 import org.openflow.protocol.OFBarrierReply;
38 import org.openflow.protocol.OFEchoReply;
39 import org.openflow.protocol.OFError;
40 import org.openflow.protocol.OFFeaturesReply;
41 import org.openflow.protocol.OFFlowMod;
42 import org.openflow.protocol.OFGetConfigReply;
43 import org.openflow.protocol.OFMatch;
44 import org.openflow.protocol.OFMessage;
45 import org.openflow.protocol.OFPhysicalPort;
46 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
47 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
48 import org.openflow.protocol.OFPhysicalPort.OFPortState;
49 import org.openflow.protocol.OFPort;
50 import org.openflow.protocol.OFPortStatus;
51 import org.openflow.protocol.OFPortStatus.OFPortReason;
52 import org.openflow.protocol.OFSetConfig;
53 import org.openflow.protocol.OFStatisticsReply;
54 import org.openflow.protocol.OFStatisticsRequest;
55 import org.openflow.protocol.OFType;
56 import org.openflow.protocol.factory.BasicFactory;
57 import org.openflow.util.HexString;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
61 public class SwitchHandler implements ISwitch {
62 private static final Logger logger = LoggerFactory
63 .getLogger(SwitchHandler.class);
64 private static final int SWITCH_LIVENESS_TIMER = 5000;
65 private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
66 private static final int SYNCHRONOUS_FLOW_TIMEOUT = 2000;
67 private static final int STATS_COLLECTION_TIMEOUT = 2000;
68 private static final int bufferSize = 1024 * 1024;
70 private String instanceName;
71 private ISwitch thisISwitch;
72 private IController core;
74 private Integer buffers;
75 private Integer capabilities;
77 private Integer actions;
78 private Selector selector;
79 private SelectionKey clientSelectionKey;
80 private SocketChannel socket;
81 private ByteBuffer inBuffer;
82 private ByteBuffer outBuffer;
83 private BasicFactory factory;
84 private AtomicInteger xid;
85 private SwitchState state;
86 private Timer periodicTimer;
87 private Map<Short, OFPhysicalPort> physicalPorts;
88 private Map<Short, Integer> portBandwidth;
89 private Date connectedDate;
90 private Long lastMsgReceivedTimeStamp;
91 private Boolean probeSent;
92 private ExecutorService executor;
93 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
94 private boolean running;
95 private Thread switchHandlerThread;
97 private enum SwitchState {
98 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
103 private SwitchState(int value) {
107 @SuppressWarnings("unused")
113 public SwitchHandler(Controller core, SocketChannel sc, String name) {
114 this.instanceName = name;
115 this.thisISwitch = this;
117 this.buffers = (int)0;
118 this.capabilities = (int)0;
119 this.tables = (byte)0;
120 this.actions = (int)0;
123 this.factory = new BasicFactory();
124 this.connectedDate = new Date();
125 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
126 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
127 this.portBandwidth = new HashMap<Short, Integer>();
128 this.state = SwitchState.NON_OPERATIONAL;
129 this.probeSent = false;
130 this.xid = new AtomicInteger(this.socket.hashCode());
131 this.periodicTimer = null;
132 this.executor = Executors.newFixedThreadPool(4);
133 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
134 this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
135 this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
138 public void start() {
140 this.selector = SelectorProvider.provider().openSelector();
141 this.socket.configureBlocking(false);
142 this.socket.socket().setTcpNoDelay(true);
143 this.clientSelectionKey = this.socket.register(this.selector,
144 SelectionKey.OP_READ);
145 startHandlerThread();
146 } catch (Exception e) {
152 private void startHandlerThread() {
153 OFMessage msg = factory.getMessage(OFType.HELLO);
155 switchHandlerThread = new Thread(new Runnable() {
161 // wait for an incoming connection
163 Iterator<SelectionKey> selectedKeys = selector
164 .selectedKeys().iterator();
165 while (selectedKeys.hasNext()) {
166 SelectionKey skey = selectedKeys.next();
167 selectedKeys.remove();
168 if (skey.isValid() && skey.isWritable()) {
171 if (skey.isValid() && skey.isReadable()) {
175 } catch (IOException e) {
176 logger.error("Caught I/O Exception: " + e.toString());
182 switchHandlerThread.start();
190 this.clientSelectionKey.cancel();
193 } catch (IOException e) {
194 logger.error("Caught IOException in stop()");
199 public int getNextXid() {
200 return this.xid.incrementAndGet();
204 public Integer asyncSend(OFMessage msg) {
205 return asyncSend(msg, getNextXid());
209 public Integer asyncSend(OFMessage msg, int xid) {
210 synchronized (outBuffer) {
212 if ((msg.getType() != OFType.ECHO_REQUEST) &&
213 (msg.getType() != OFType.ECHO_REPLY)) {
214 logger.debug("sending " + msg.getType().toString() + " to " + toString());
218 int msgLen = msg.getLengthU();
219 if (outBuffer.remaining() < msgLen) {
220 // increase the buffer size so that it can contain this message
221 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
225 newBuffer.put(outBuffer);
226 outBuffer = newBuffer;
228 msg.writeTo(outBuffer);
231 socket.write(outBuffer);
233 if (outBuffer.position() > 0) {
234 this.clientSelectionKey = this.socket.register(
235 this.selector, SelectionKey.OP_WRITE, this);
237 logger.trace("Message sent: " + msg.toString());
238 } catch (IOException e) {
245 public void resumeSend() {
246 synchronized (outBuffer) {
249 socket.write(outBuffer);
251 if (outBuffer.position() > 0) {
252 this.clientSelectionKey = this.socket.register(
253 this.selector, SelectionKey.OP_WRITE, this);
255 this.clientSelectionKey = this.socket.register(
256 this.selector, SelectionKey.OP_READ, this);
258 } catch (IOException e) {
264 public void handleMessages() {
265 List<OFMessage> msgs = readMessages();
267 logger.debug(toString() + " is down");
268 // the connection is down, inform core
269 reportSwitchStateChange(false);
272 for (OFMessage msg : msgs) {
273 logger.trace("Message received: " + msg.toString());
275 if ((msg.getType() != OFType.ECHO_REQUEST) &&
276 (msg.getType() != OFType.ECHO_REPLY)) {
277 logger.debug(msg.getType().toString() + " received from sw " + toString());
280 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
281 OFType type = msg.getType();
284 // send feature request
285 OFMessage featureRequest = factory
286 .getMessage(OFType.FEATURES_REQUEST);
287 asyncSend(featureRequest);
288 // delete all pre-existing flows
289 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
290 OFFlowMod flowMod = (OFFlowMod) factory
291 .getMessage(OFType.FLOW_MOD);
292 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
293 .setOutPort(OFPort.OFPP_NONE).setLength(
294 (short) OFFlowMod.MINIMUM_LENGTH);
296 this.state = SwitchState.WAIT_FEATURES_REPLY;
300 OFEchoReply echoReply = (OFEchoReply) factory
301 .getMessage(OFType.ECHO_REPLY);
302 asyncSend(echoReply);
305 this.probeSent = false;
308 processFeaturesReply((OFFeaturesReply) msg);
310 case GET_CONFIG_REPLY:
311 // make sure that the switch can send the whole packet to the controller
312 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
313 this.state = SwitchState.OPERATIONAL;
317 processBarrierReply((OFBarrierReply) msg);
320 processErrorReply((OFError) msg);
323 processPortStatusMsg((OFPortStatus) msg);
326 processStatsReply((OFStatisticsReply) msg);
333 if (isOperational()) {
334 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
339 private void processPortStatusMsg(OFPortStatus msg) {
340 //short portNumber = msg.getDesc().getPortNumber();
341 OFPhysicalPort port = msg.getDesc();
342 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
343 updatePhysicalPort(port);
344 //logger.debug("Port " + portNumber + " on " + toString() + " modified");
345 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
346 updatePhysicalPort(port);
347 //logger.debug("Port " + portNumber + " on " + toString() + " added");
348 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
350 deletePhysicalPort(port);
351 //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
356 private List<OFMessage> readMessages() {
357 List<OFMessage> msgs = null;
360 bytesRead = socket.read(inBuffer);
361 } catch (IOException e) {
365 if (bytesRead == -1) {
369 msgs = factory.parseMessages(inBuffer);
370 if (inBuffer.hasRemaining()) {
378 private void startSwitchTimer() {
379 this.periodicTimer = new Timer();
380 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
384 Long now = System.currentTimeMillis();
385 if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
387 // switch failed to respond to our probe, consider it down
388 logger.warn(toString()
389 + " is idle for too long, disconnect");
390 reportSwitchStateChange(false);
392 // send a probe to see if the switch is still alive
393 //logger.debug("Send idle probe (Echo Request) to " + switchName());
395 OFMessage echo = factory
396 .getMessage(OFType.ECHO_REQUEST);
400 if (state == SwitchState.WAIT_FEATURES_REPLY) {
401 // send another features request
402 OFMessage request = factory
403 .getMessage(OFType.FEATURES_REQUEST);
406 if (state == SwitchState.WAIT_CONFIG_REPLY) {
407 // send another config request
408 OFSetConfig config = (OFSetConfig) factory
409 .getMessage(OFType.SET_CONFIG);
410 config.setMissSendLength((short) 0xffff)
411 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
413 OFMessage getConfig = factory
414 .getMessage(OFType.GET_CONFIG_REQUEST);
415 asyncSend(getConfig);
419 } catch (Exception e) {
423 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
426 private void cancelSwitchTimer() {
427 if (this.periodicTimer != null) {
428 this.periodicTimer.cancel();
432 private void reportError(Exception e) {
433 //logger.error(toString() + " caught Error " + e.toString());
434 // notify core of this error event
435 ((Controller) core).takeSwitchEventError(this);
438 private void reportSwitchStateChange(boolean added) {
440 ((Controller) core).takeSwtichEventAdd(this);
442 ((Controller) core).takeSwitchEventDelete(this);
447 public Long getId() {
451 private void processFeaturesReply(OFFeaturesReply reply) {
452 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
453 this.sid = reply.getDatapathId();
454 this.buffers = reply.getBuffers();
455 this.capabilities = reply.getCapabilities();
456 this.tables = reply.getTables();
457 this.actions = reply.getActions();
458 // notify core of this error event
459 for (OFPhysicalPort port : reply.getPorts()) {
460 updatePhysicalPort(port);
462 // config the switch to send full data packet
463 OFSetConfig config = (OFSetConfig) factory
464 .getMessage(OFType.SET_CONFIG);
465 config.setMissSendLength((short) 0xffff).setLengthU(
466 OFSetConfig.MINIMUM_LENGTH);
468 // send config request to make sure the switch can handle the set config
469 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
470 asyncSend(getConfig);
471 this.state = SwitchState.WAIT_CONFIG_REPLY;
472 // inform core that a new switch is now operational
473 reportSwitchStateChange(true);
477 private void updatePhysicalPort(OFPhysicalPort port) {
478 Short portNumber = port.getPortNumber();
479 physicalPorts.put(portNumber, port);
483 port.getCurrentFeatures()
484 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
485 | OFPortFeatures.OFPPF_10MB_HD
487 | OFPortFeatures.OFPPF_100MB_FD
489 | OFPortFeatures.OFPPF_100MB_HD
491 | OFPortFeatures.OFPPF_1GB_FD
493 | OFPortFeatures.OFPPF_1GB_HD
494 .getValue() | OFPortFeatures.OFPPF_10GB_FD
498 private void deletePhysicalPort(OFPhysicalPort port) {
499 Short portNumber = port.getPortNumber();
500 physicalPorts.remove(portNumber);
501 portBandwidth.remove(portNumber);
505 public boolean isOperational() {
506 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
510 public String toString() {
512 + this.socket.toString()
514 + (isOperational() ? HexString.toHexString(this.sid)
519 public Date getConnectedDate() {
520 return this.connectedDate;
523 public String getInstanceName() {
528 public Object getStatistics(OFStatisticsRequest req) {
529 int xid = getNextXid();
530 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
531 messageWaitingDone.put(xid, worker);
532 Future<Object> submit = executor.submit(worker);
533 Object result = null;
536 .get(STATS_COLLECTION_TIMEOUT, TimeUnit.MILLISECONDS);
538 } catch (Exception e) {
539 logger.warn("Timeout while waiting for " + req.getType()
541 result = null; // to indicate timeout has occurred
547 public Object syncSend(OFMessage msg) {
548 Integer xid = getNextXid();
549 SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
550 messageWaitingDone.put(xid, worker);
551 Object result = null;
552 Boolean status = false;
553 Future<Object> submit = executor.submit(worker);
556 .get(SYNCHRONOUS_FLOW_TIMEOUT, TimeUnit.MILLISECONDS);
557 messageWaitingDone.remove(xid);
558 if (result == null) {
559 // if result is null, then it means the switch can handle this message successfully
560 // convert the result into a Boolean with value true
562 //logger.debug("Successfully send " + msg.getType().toString());
565 // if result is not null, this means the switch can't handle this message
566 // the result if OFError already
567 logger.debug("Send " + msg.getType().toString()
568 + " failed --> " + ((OFError) result).toString());
571 } catch (Exception e) {
572 logger.warn("Timeout while waiting for " + msg.getType().toString()
574 // convert the result into a Boolean with value false
582 * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
583 * wake up associated task so that it can continue
585 private void processBarrierReply(OFBarrierReply msg) {
586 Integer xid = msg.getXid();
587 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
589 if (worker == null) {
595 private void processErrorReply(OFError errorMsg) {
596 OFMessage offendingMsg = errorMsg.getOffendingMsg();
598 if (offendingMsg != null) {
599 xid = offendingMsg.getXid();
601 xid = errorMsg.getXid();
604 * the error can be a reply to a synchronous message or to a statistic request message
606 Callable<?> worker = messageWaitingDone.remove(xid);
607 if (worker == null) {
610 if (worker instanceof SynchronousMessage) {
611 ((SynchronousMessage) worker).wakeup(errorMsg);
613 ((StatisticsCollector) worker).wakeup(errorMsg);
617 private void processStatsReply(OFStatisticsReply reply) {
618 Integer xid = reply.getXid();
619 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
621 if (worker == null) {
624 if (worker.collect(reply)) {
625 // if all the stats records are received (collect() returns true)
627 messageWaitingDone.remove(xid);
633 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
634 return this.physicalPorts;
638 public OFPhysicalPort getPhysicalPort(Short portNumber) {
639 return this.physicalPorts.get(portNumber);
643 public Integer getPortBandwidth(Short portNumber) {
644 return this.portBandwidth.get(portNumber);
648 public Set<Short> getPorts() {
649 return this.physicalPorts.keySet();
653 public Byte getTables() {
658 public Integer getActions() {
663 public Integer getCapabilities() {
664 return this.capabilities;
668 public Integer getBuffers() {
673 public boolean isPortEnabled(short portNumber) {
674 return isPortEnabled(physicalPorts.get(portNumber));
678 public boolean isPortEnabled(OFPhysicalPort port) {
682 int portConfig = port.getConfig();
683 int portState = port.getState();
684 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
687 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
690 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
698 public List<OFPhysicalPort> getEnabledPorts() {
699 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
700 synchronized (this.physicalPorts) {
701 for (OFPhysicalPort port : physicalPorts.values()) {
702 if (isPortEnabled(port)) {