3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
12 import java.nio.ByteBuffer;
13 import java.nio.channels.SelectionKey;
14 import java.nio.channels.Selector;
15 import java.nio.channels.SocketChannel;
16 import java.nio.channels.spi.SelectorProvider;
17 import java.util.ArrayList;
18 import java.util.Date;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.openflow.protocol.OFBarrierReply;
37 import org.openflow.protocol.OFEchoReply;
38 import org.openflow.protocol.OFError;
39 import org.openflow.protocol.OFFeaturesReply;
40 import org.openflow.protocol.OFFlowMod;
41 import org.openflow.protocol.OFGetConfigReply;
42 import org.openflow.protocol.OFMatch;
43 import org.openflow.protocol.OFMessage;
44 import org.openflow.protocol.OFPhysicalPort;
45 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
46 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
47 import org.openflow.protocol.OFPhysicalPort.OFPortState;
48 import org.openflow.protocol.OFPort;
49 import org.openflow.protocol.OFPortStatus;
50 import org.openflow.protocol.OFPortStatus.OFPortReason;
51 import org.openflow.protocol.OFSetConfig;
52 import org.openflow.protocol.OFStatisticsReply;
53 import org.openflow.protocol.OFStatisticsRequest;
54 import org.openflow.protocol.OFType;
55 import org.openflow.protocol.factory.BasicFactory;
56 import org.openflow.util.HexString;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 public class SwitchHandler implements ISwitch {
61 private static final Logger logger = LoggerFactory
62 .getLogger(SwitchHandler.class);
63 private static final int SWITCH_LIVENESS_TIMER = 5000;
64 private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
65 private static final int SYNCHRONOUS_FLOW_TIMEOUT = 2000;
66 private static final int STATS_COLLECTION_TIMEOUT = 2000;
67 private static final int bufferSize = 1024 * 1024;
69 private String instanceName;
70 private ISwitch thisISwitch;
71 private IController core;
73 private Integer buffers;
74 private Integer capabilities;
76 private Integer actions;
77 private Selector selector;
78 private SelectionKey clientSelectionKey;
79 private SocketChannel socket;
80 private ByteBuffer inBuffer;
81 private ByteBuffer outBuffer;
82 private BasicFactory factory;
83 private AtomicInteger xid;
84 private SwitchState state;
85 private Timer periodicTimer;
86 private Map<Short, OFPhysicalPort> physicalPorts;
87 private Map<Short, Integer> portBandwidth;
88 private Date connectedDate;
89 private Long lastMsgReceivedTimeStamp;
90 private Boolean probeSent;
91 private ExecutorService executor;
92 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
93 private boolean running;
94 private Thread switchHandlerThread;
96 private enum SwitchState {
97 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
102 private SwitchState(int value) {
106 @SuppressWarnings("unused")
112 public SwitchHandler(Controller core, SocketChannel sc, String name) {
113 this.instanceName = name;
114 this.thisISwitch = this;
116 this.buffers = (int)0;
117 this.capabilities = (int)0;
118 this.tables = (byte)0;
119 this.actions = (int)0;
122 this.factory = new BasicFactory();
123 this.connectedDate = new Date();
124 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
125 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126 this.portBandwidth = new HashMap<Short, Integer>();
127 this.state = SwitchState.NON_OPERATIONAL;
128 this.probeSent = false;
129 this.xid = new AtomicInteger(this.socket.hashCode());
130 this.periodicTimer = null;
131 this.executor = Executors.newFixedThreadPool(4);
132 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
133 this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
134 this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
137 public void start() {
139 this.selector = SelectorProvider.provider().openSelector();
140 this.socket.configureBlocking(false);
141 this.socket.socket().setTcpNoDelay(true);
142 this.clientSelectionKey = this.socket.register(this.selector,
143 SelectionKey.OP_READ);
144 startHandlerThread();
145 } catch (Exception e) {
151 private void startHandlerThread() {
152 OFMessage msg = factory.getMessage(OFType.HELLO);
154 switchHandlerThread = new Thread(new Runnable() {
160 // wait for an incoming connection
162 Iterator<SelectionKey> selectedKeys = selector
163 .selectedKeys().iterator();
164 while (selectedKeys.hasNext()) {
165 SelectionKey skey = selectedKeys.next();
166 selectedKeys.remove();
167 if (skey.isValid() && skey.isWritable()) {
170 if (skey.isValid() && skey.isReadable()) {
174 } catch (Exception e) {
180 switchHandlerThread.start();
188 this.clientSelectionKey.cancel();
191 } catch (Exception e) {
192 // do nothing since we are shutting down.
198 public int getNextXid() {
199 return this.xid.incrementAndGet();
203 public Integer asyncSend(OFMessage msg) {
204 return asyncSend(msg, getNextXid());
208 public Integer asyncSend(OFMessage msg, int xid) {
209 synchronized (outBuffer) {
211 if ((msg.getType() != OFType.ECHO_REQUEST) &&
212 (msg.getType() != OFType.ECHO_REPLY)) {
213 logger.debug("sending " + msg.getType().toString() + " to " + toString());
217 int msgLen = msg.getLengthU();
218 if (outBuffer.remaining() < msgLen) {
219 // increase the buffer size so that it can contain this message
220 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
224 newBuffer.put(outBuffer);
225 outBuffer = newBuffer;
227 msg.writeTo(outBuffer);
230 socket.write(outBuffer);
232 if (outBuffer.position() > 0) {
233 this.clientSelectionKey = this.socket.register(
234 this.selector, SelectionKey.OP_WRITE, this);
236 logger.trace("Message sent: " + msg.toString());
237 } catch (Exception e) {
244 public void resumeSend() {
245 synchronized (outBuffer) {
248 socket.write(outBuffer);
250 if (outBuffer.position() > 0) {
251 this.clientSelectionKey = this.socket.register(
252 this.selector, SelectionKey.OP_WRITE, this);
254 this.clientSelectionKey = this.socket.register(
255 this.selector, SelectionKey.OP_READ, this);
257 } catch (Exception e) {
263 public void handleMessages() {
264 List<OFMessage> msgs = readMessages();
266 logger.debug(toString() + " is down");
267 // the connection is down, inform core
268 reportSwitchStateChange(false);
271 for (OFMessage msg : msgs) {
272 logger.trace("Message received: " + msg.toString());
274 if ((msg.getType() != OFType.ECHO_REQUEST) &&
275 (msg.getType() != OFType.ECHO_REPLY)) {
276 logger.debug(msg.getType().toString() + " received from sw " + toString());
279 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
280 OFType type = msg.getType();
283 // send feature request
284 OFMessage featureRequest = factory
285 .getMessage(OFType.FEATURES_REQUEST);
286 asyncSend(featureRequest);
287 // delete all pre-existing flows
288 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
289 OFFlowMod flowMod = (OFFlowMod) factory
290 .getMessage(OFType.FLOW_MOD);
291 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
292 .setOutPort(OFPort.OFPP_NONE).setLength(
293 (short) OFFlowMod.MINIMUM_LENGTH);
295 this.state = SwitchState.WAIT_FEATURES_REPLY;
299 OFEchoReply echoReply = (OFEchoReply) factory
300 .getMessage(OFType.ECHO_REPLY);
301 asyncSend(echoReply);
304 this.probeSent = false;
307 processFeaturesReply((OFFeaturesReply) msg);
309 case GET_CONFIG_REPLY:
310 // make sure that the switch can send the whole packet to the controller
311 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
312 this.state = SwitchState.OPERATIONAL;
316 processBarrierReply((OFBarrierReply) msg);
319 processErrorReply((OFError) msg);
322 processPortStatusMsg((OFPortStatus) msg);
325 processStatsReply((OFStatisticsReply) msg);
332 if (isOperational()) {
333 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
338 private void processPortStatusMsg(OFPortStatus msg) {
339 //short portNumber = msg.getDesc().getPortNumber();
340 OFPhysicalPort port = msg.getDesc();
341 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
342 updatePhysicalPort(port);
343 //logger.debug("Port " + portNumber + " on " + toString() + " modified");
344 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
345 updatePhysicalPort(port);
346 //logger.debug("Port " + portNumber + " on " + toString() + " added");
347 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
349 deletePhysicalPort(port);
350 //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
355 private List<OFMessage> readMessages() {
356 List<OFMessage> msgs = null;
359 bytesRead = socket.read(inBuffer);
360 } catch (Exception e) {
364 if (bytesRead == -1) {
368 msgs = factory.parseMessages(inBuffer);
369 if (inBuffer.hasRemaining()) {
377 private void startSwitchTimer() {
378 this.periodicTimer = new Timer();
379 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
383 Long now = System.currentTimeMillis();
384 if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
386 // switch failed to respond to our probe, consider it down
387 logger.warn(toString()
388 + " is idle for too long, disconnect");
389 reportSwitchStateChange(false);
391 // send a probe to see if the switch is still alive
392 //logger.debug("Send idle probe (Echo Request) to " + switchName());
394 OFMessage echo = factory
395 .getMessage(OFType.ECHO_REQUEST);
399 if (state == SwitchState.WAIT_FEATURES_REPLY) {
400 // send another features request
401 OFMessage request = factory
402 .getMessage(OFType.FEATURES_REQUEST);
405 if (state == SwitchState.WAIT_CONFIG_REPLY) {
406 // send another config request
407 OFSetConfig config = (OFSetConfig) factory
408 .getMessage(OFType.SET_CONFIG);
409 config.setMissSendLength((short) 0xffff)
410 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
412 OFMessage getConfig = factory
413 .getMessage(OFType.GET_CONFIG_REQUEST);
414 asyncSend(getConfig);
418 } catch (Exception e) {
422 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
425 private void cancelSwitchTimer() {
426 if (this.periodicTimer != null) {
427 this.periodicTimer.cancel();
431 private void reportError(Exception e) {
432 //logger.error(toString() + " caught Error " + e.toString());
433 // notify core of this error event
434 ((Controller) core).takeSwitchEventError(this);
437 private void reportSwitchStateChange(boolean added) {
439 ((Controller) core).takeSwtichEventAdd(this);
441 ((Controller) core).takeSwitchEventDelete(this);
446 public Long getId() {
450 private void processFeaturesReply(OFFeaturesReply reply) {
451 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
452 this.sid = reply.getDatapathId();
453 this.buffers = reply.getBuffers();
454 this.capabilities = reply.getCapabilities();
455 this.tables = reply.getTables();
456 this.actions = reply.getActions();
457 // notify core of this error event
458 for (OFPhysicalPort port : reply.getPorts()) {
459 updatePhysicalPort(port);
461 // config the switch to send full data packet
462 OFSetConfig config = (OFSetConfig) factory
463 .getMessage(OFType.SET_CONFIG);
464 config.setMissSendLength((short) 0xffff).setLengthU(
465 OFSetConfig.MINIMUM_LENGTH);
467 // send config request to make sure the switch can handle the set config
468 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
469 asyncSend(getConfig);
470 this.state = SwitchState.WAIT_CONFIG_REPLY;
471 // inform core that a new switch is now operational
472 reportSwitchStateChange(true);
476 private void updatePhysicalPort(OFPhysicalPort port) {
477 Short portNumber = port.getPortNumber();
478 physicalPorts.put(portNumber, port);
482 port.getCurrentFeatures()
483 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
484 | OFPortFeatures.OFPPF_10MB_HD
486 | OFPortFeatures.OFPPF_100MB_FD
488 | OFPortFeatures.OFPPF_100MB_HD
490 | OFPortFeatures.OFPPF_1GB_FD
492 | OFPortFeatures.OFPPF_1GB_HD
493 .getValue() | OFPortFeatures.OFPPF_10GB_FD
497 private void deletePhysicalPort(OFPhysicalPort port) {
498 Short portNumber = port.getPortNumber();
499 physicalPorts.remove(portNumber);
500 portBandwidth.remove(portNumber);
504 public boolean isOperational() {
505 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
509 public String toString() {
511 + this.socket.toString()
513 + (isOperational() ? HexString.toHexString(this.sid)
518 public Date getConnectedDate() {
519 return this.connectedDate;
522 public String getInstanceName() {
527 public Object getStatistics(OFStatisticsRequest req) {
528 int xid = getNextXid();
529 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
530 messageWaitingDone.put(xid, worker);
531 Future<Object> submit = executor.submit(worker);
532 Object result = null;
535 .get(STATS_COLLECTION_TIMEOUT, TimeUnit.MILLISECONDS);
537 } catch (Exception e) {
538 logger.warn("Timeout while waiting for " + req.getType()
540 result = null; // to indicate timeout has occurred
546 public Object syncSend(OFMessage msg) {
547 Integer xid = getNextXid();
548 SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
549 messageWaitingDone.put(xid, worker);
550 Object result = null;
551 Boolean status = false;
552 Future<Object> submit = executor.submit(worker);
555 .get(SYNCHRONOUS_FLOW_TIMEOUT, TimeUnit.MILLISECONDS);
556 messageWaitingDone.remove(xid);
557 if (result == null) {
558 // if result is null, then it means the switch can handle this message successfully
559 // convert the result into a Boolean with value true
561 //logger.debug("Successfully send " + msg.getType().toString());
564 // if result is not null, this means the switch can't handle this message
565 // the result if OFError already
566 logger.debug("Send " + msg.getType().toString()
567 + " failed --> " + ((OFError) result).toString());
570 } catch (Exception e) {
571 logger.warn("Timeout while waiting for " + msg.getType().toString()
573 // convert the result into a Boolean with value false
581 * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
582 * wake up associated task so that it can continue
584 private void processBarrierReply(OFBarrierReply msg) {
585 Integer xid = msg.getXid();
586 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
588 if (worker == null) {
594 private void processErrorReply(OFError errorMsg) {
595 OFMessage offendingMsg = errorMsg.getOffendingMsg();
597 if (offendingMsg != null) {
598 xid = offendingMsg.getXid();
600 xid = errorMsg.getXid();
603 * the error can be a reply to a synchronous message or to a statistic request message
605 Callable<?> worker = messageWaitingDone.remove(xid);
606 if (worker == null) {
609 if (worker instanceof SynchronousMessage) {
610 ((SynchronousMessage) worker).wakeup(errorMsg);
612 ((StatisticsCollector) worker).wakeup(errorMsg);
616 private void processStatsReply(OFStatisticsReply reply) {
617 Integer xid = reply.getXid();
618 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
620 if (worker == null) {
623 if (worker.collect(reply)) {
624 // if all the stats records are received (collect() returns true)
626 messageWaitingDone.remove(xid);
632 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
633 return this.physicalPorts;
637 public OFPhysicalPort getPhysicalPort(Short portNumber) {
638 return this.physicalPorts.get(portNumber);
642 public Integer getPortBandwidth(Short portNumber) {
643 return this.portBandwidth.get(portNumber);
647 public Set<Short> getPorts() {
648 return this.physicalPorts.keySet();
652 public Byte getTables() {
657 public Integer getActions() {
662 public Integer getCapabilities() {
663 return this.capabilities;
667 public Integer getBuffers() {
672 public boolean isPortEnabled(short portNumber) {
673 return isPortEnabled(physicalPorts.get(portNumber));
677 public boolean isPortEnabled(OFPhysicalPort port) {
681 int portConfig = port.getConfig();
682 int portState = port.getState();
683 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
686 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
689 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
697 public List<OFPhysicalPort> getEnabledPorts() {
698 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
699 synchronized (this.physicalPorts) {
700 for (OFPhysicalPort port : physicalPorts.values()) {
701 if (isPortEnabled(port)) {