3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
12 import java.nio.ByteBuffer;
13 import java.nio.channels.SelectionKey;
14 import java.nio.channels.Selector;
15 import java.nio.channels.SocketChannel;
16 import java.nio.channels.spi.SelectorProvider;
17 import java.util.ArrayList;
18 import java.util.Date;
19 import java.util.HashMap;
20 import java.util.Iterator;
21 import java.util.List;
24 import java.util.Timer;
25 import java.util.TimerTask;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.ConcurrentHashMap;
28 import java.util.concurrent.ExecutorService;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.Future;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicInteger;
34 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
35 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
36 import org.openflow.protocol.OFBarrierReply;
37 import org.openflow.protocol.OFEchoReply;
38 import org.openflow.protocol.OFError;
39 import org.openflow.protocol.OFFeaturesReply;
40 import org.openflow.protocol.OFFlowMod;
41 import org.openflow.protocol.OFGetConfigReply;
42 import org.openflow.protocol.OFMatch;
43 import org.openflow.protocol.OFMessage;
44 import org.openflow.protocol.OFPhysicalPort;
45 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
46 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
47 import org.openflow.protocol.OFPhysicalPort.OFPortState;
48 import org.openflow.protocol.OFPort;
49 import org.openflow.protocol.OFPortStatus;
50 import org.openflow.protocol.OFPortStatus.OFPortReason;
51 import org.openflow.protocol.OFSetConfig;
52 import org.openflow.protocol.OFStatisticsReply;
53 import org.openflow.protocol.OFStatisticsRequest;
54 import org.openflow.protocol.OFType;
55 import org.openflow.protocol.factory.BasicFactory;
56 import org.openflow.util.HexString;
57 import org.slf4j.Logger;
58 import org.slf4j.LoggerFactory;
60 public class SwitchHandler implements ISwitch {
61 private static final Logger logger = LoggerFactory
62 .getLogger(SwitchHandler.class);
63 private static final int SWITCH_LIVENESS_TIMER = 5000;
64 private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
65 private int MESSAGE_RESPONSE_TIMER = 2000;
66 private static final int bufferSize = 1024 * 1024;
68 private String instanceName;
69 private ISwitch thisISwitch;
70 private IController core;
72 private Integer buffers;
73 private Integer capabilities;
75 private Integer actions;
76 private Selector selector;
77 private SelectionKey clientSelectionKey;
78 private SocketChannel socket;
79 private ByteBuffer inBuffer;
80 private ByteBuffer outBuffer;
81 private BasicFactory factory;
82 private AtomicInteger xid;
83 private SwitchState state;
84 private Timer periodicTimer;
85 private Map<Short, OFPhysicalPort> physicalPorts;
86 private Map<Short, Integer> portBandwidth;
87 private Date connectedDate;
88 private Long lastMsgReceivedTimeStamp;
89 private Boolean probeSent;
90 private ExecutorService executor;
91 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
92 private boolean running;
93 private Thread switchHandlerThread;
94 private Integer responseTimerValue;
96 private enum SwitchState {
97 NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
102 private SwitchState(int value) {
106 @SuppressWarnings("unused")
112 public SwitchHandler(Controller core, SocketChannel sc, String name) {
113 this.instanceName = name;
114 this.thisISwitch = this;
116 this.buffers = (int)0;
117 this.capabilities = (int)0;
118 this.tables = (byte)0;
119 this.actions = (int)0;
122 this.factory = new BasicFactory();
123 this.connectedDate = new Date();
124 this.lastMsgReceivedTimeStamp = connectedDate.getTime();
125 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126 this.portBandwidth = new HashMap<Short, Integer>();
127 this.state = SwitchState.NON_OPERATIONAL;
128 this.probeSent = false;
129 this.xid = new AtomicInteger(this.socket.hashCode());
130 this.periodicTimer = null;
131 this.executor = Executors.newFixedThreadPool(4);
132 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
133 this.inBuffer = ByteBuffer.allocateDirect(bufferSize);
134 this.outBuffer = ByteBuffer.allocateDirect(bufferSize);
135 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
136 String rTimer = System.getProperty("of.messageResponseTimer");
137 if (rTimer != null) {
139 responseTimerValue = Integer.decode(rTimer);
140 } catch (NumberFormatException e) {
141 logger.warn("Invalid of.messageResponseTimer:" + rTimer + ", use default("
142 + MESSAGE_RESPONSE_TIMER+ ")");
147 public void start() {
149 this.selector = SelectorProvider.provider().openSelector();
150 this.socket.configureBlocking(false);
151 this.socket.socket().setTcpNoDelay(true);
152 this.clientSelectionKey = this.socket.register(this.selector,
153 SelectionKey.OP_READ);
154 startHandlerThread();
155 } catch (Exception e) {
161 private void startHandlerThread() {
162 OFMessage msg = factory.getMessage(OFType.HELLO);
164 switchHandlerThread = new Thread(new Runnable() {
170 // wait for an incoming connection
172 Iterator<SelectionKey> selectedKeys = selector
173 .selectedKeys().iterator();
174 while (selectedKeys.hasNext()) {
175 SelectionKey skey = selectedKeys.next();
176 selectedKeys.remove();
177 if (skey.isValid() && skey.isWritable()) {
180 if (skey.isValid() && skey.isReadable()) {
184 } catch (Exception e) {
190 switchHandlerThread.start();
198 this.clientSelectionKey.cancel();
201 } catch (Exception e) {
202 // do nothing since we are shutting down.
208 public int getNextXid() {
209 return this.xid.incrementAndGet();
213 public Integer asyncSend(OFMessage msg) {
214 return asyncSend(msg, getNextXid());
218 public Integer asyncSend(OFMessage msg, int xid) {
219 synchronized (outBuffer) {
221 if ((msg.getType() != OFType.ECHO_REQUEST) &&
222 (msg.getType() != OFType.ECHO_REPLY)) {
223 logger.debug("sending " + msg.getType().toString() + " to " + toString());
227 int msgLen = msg.getLengthU();
228 if (outBuffer.remaining() < msgLen) {
229 // increase the buffer size so that it can contain this message
230 ByteBuffer newBuffer = ByteBuffer.allocateDirect(outBuffer
234 newBuffer.put(outBuffer);
235 outBuffer = newBuffer;
237 msg.writeTo(outBuffer);
240 socket.write(outBuffer);
242 if (outBuffer.position() > 0) {
243 this.clientSelectionKey = this.socket.register(
244 this.selector, SelectionKey.OP_WRITE, this);
246 logger.trace("Message sent: " + msg.toString());
247 } catch (Exception e) {
254 public void resumeSend() {
255 synchronized (outBuffer) {
258 socket.write(outBuffer);
260 if (outBuffer.position() > 0) {
261 this.clientSelectionKey = this.socket.register(
262 this.selector, SelectionKey.OP_WRITE, this);
264 this.clientSelectionKey = this.socket.register(
265 this.selector, SelectionKey.OP_READ, this);
267 } catch (Exception e) {
273 public void handleMessages() {
274 List<OFMessage> msgs = readMessages();
276 logger.debug(toString() + " is down");
277 // the connection is down, inform core
278 reportSwitchStateChange(false);
281 for (OFMessage msg : msgs) {
282 logger.trace("Message received: " + msg.toString());
284 if ((msg.getType() != OFType.ECHO_REQUEST) &&
285 (msg.getType() != OFType.ECHO_REPLY)) {
286 logger.debug(msg.getType().toString() + " received from sw " + toString());
289 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
290 OFType type = msg.getType();
293 // send feature request
294 OFMessage featureRequest = factory
295 .getMessage(OFType.FEATURES_REQUEST);
296 asyncSend(featureRequest);
297 // delete all pre-existing flows
298 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
299 OFFlowMod flowMod = (OFFlowMod) factory
300 .getMessage(OFType.FLOW_MOD);
301 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
302 .setOutPort(OFPort.OFPP_NONE).setLength(
303 (short) OFFlowMod.MINIMUM_LENGTH);
305 this.state = SwitchState.WAIT_FEATURES_REPLY;
309 OFEchoReply echoReply = (OFEchoReply) factory
310 .getMessage(OFType.ECHO_REPLY);
311 asyncSend(echoReply);
314 this.probeSent = false;
317 processFeaturesReply((OFFeaturesReply) msg);
319 case GET_CONFIG_REPLY:
320 // make sure that the switch can send the whole packet to the controller
321 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
322 this.state = SwitchState.OPERATIONAL;
326 processBarrierReply((OFBarrierReply) msg);
329 processErrorReply((OFError) msg);
332 processPortStatusMsg((OFPortStatus) msg);
335 processStatsReply((OFStatisticsReply) msg);
342 if (isOperational()) {
343 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
348 private void processPortStatusMsg(OFPortStatus msg) {
349 //short portNumber = msg.getDesc().getPortNumber();
350 OFPhysicalPort port = msg.getDesc();
351 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
352 updatePhysicalPort(port);
353 //logger.debug("Port " + portNumber + " on " + toString() + " modified");
354 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
355 updatePhysicalPort(port);
356 //logger.debug("Port " + portNumber + " on " + toString() + " added");
357 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
359 deletePhysicalPort(port);
360 //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
365 private List<OFMessage> readMessages() {
366 List<OFMessage> msgs = null;
369 bytesRead = socket.read(inBuffer);
370 } catch (Exception e) {
374 if (bytesRead == -1) {
378 msgs = factory.parseMessages(inBuffer);
379 if (inBuffer.hasRemaining()) {
387 private void startSwitchTimer() {
388 this.periodicTimer = new Timer();
389 this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
393 Long now = System.currentTimeMillis();
394 if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
396 // switch failed to respond to our probe, consider it down
397 logger.warn(toString()
398 + " is idle for too long, disconnect");
399 reportSwitchStateChange(false);
401 // send a probe to see if the switch is still alive
402 //logger.debug("Send idle probe (Echo Request) to " + switchName());
404 OFMessage echo = factory
405 .getMessage(OFType.ECHO_REQUEST);
409 if (state == SwitchState.WAIT_FEATURES_REPLY) {
410 // send another features request
411 OFMessage request = factory
412 .getMessage(OFType.FEATURES_REQUEST);
415 if (state == SwitchState.WAIT_CONFIG_REPLY) {
416 // send another config request
417 OFSetConfig config = (OFSetConfig) factory
418 .getMessage(OFType.SET_CONFIG);
419 config.setMissSendLength((short) 0xffff)
420 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
422 OFMessage getConfig = factory
423 .getMessage(OFType.GET_CONFIG_REQUEST);
424 asyncSend(getConfig);
428 } catch (Exception e) {
432 }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
435 private void cancelSwitchTimer() {
436 if (this.periodicTimer != null) {
437 this.periodicTimer.cancel();
441 private void reportError(Exception e) {
442 //logger.error(toString() + " caught Error " + e.toString());
443 // notify core of this error event
444 ((Controller) core).takeSwitchEventError(this);
447 private void reportSwitchStateChange(boolean added) {
449 ((Controller) core).takeSwtichEventAdd(this);
451 ((Controller) core).takeSwitchEventDelete(this);
456 public Long getId() {
460 private void processFeaturesReply(OFFeaturesReply reply) {
461 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
462 this.sid = reply.getDatapathId();
463 this.buffers = reply.getBuffers();
464 this.capabilities = reply.getCapabilities();
465 this.tables = reply.getTables();
466 this.actions = reply.getActions();
467 // notify core of this error event
468 for (OFPhysicalPort port : reply.getPorts()) {
469 updatePhysicalPort(port);
471 // config the switch to send full data packet
472 OFSetConfig config = (OFSetConfig) factory
473 .getMessage(OFType.SET_CONFIG);
474 config.setMissSendLength((short) 0xffff).setLengthU(
475 OFSetConfig.MINIMUM_LENGTH);
477 // send config request to make sure the switch can handle the set config
478 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
479 asyncSend(getConfig);
480 this.state = SwitchState.WAIT_CONFIG_REPLY;
481 // inform core that a new switch is now operational
482 reportSwitchStateChange(true);
486 private void updatePhysicalPort(OFPhysicalPort port) {
487 Short portNumber = port.getPortNumber();
488 physicalPorts.put(portNumber, port);
492 port.getCurrentFeatures()
493 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
494 | OFPortFeatures.OFPPF_10MB_HD
496 | OFPortFeatures.OFPPF_100MB_FD
498 | OFPortFeatures.OFPPF_100MB_HD
500 | OFPortFeatures.OFPPF_1GB_FD
502 | OFPortFeatures.OFPPF_1GB_HD
503 .getValue() | OFPortFeatures.OFPPF_10GB_FD
507 private void deletePhysicalPort(OFPhysicalPort port) {
508 Short portNumber = port.getPortNumber();
509 physicalPorts.remove(portNumber);
510 portBandwidth.remove(portNumber);
514 public boolean isOperational() {
515 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
519 public String toString() {
521 + this.socket.toString()
523 + (isOperational() ? HexString.toHexString(this.sid)
528 public Date getConnectedDate() {
529 return this.connectedDate;
532 public String getInstanceName() {
537 public Object getStatistics(OFStatisticsRequest req) {
538 int xid = getNextXid();
539 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
540 messageWaitingDone.put(xid, worker);
541 Future<Object> submit = executor.submit(worker);
542 Object result = null;
545 .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
547 } catch (Exception e) {
548 logger.warn("Timeout while waiting for " + req.getType()
550 result = null; // to indicate timeout has occurred
556 public Object syncSend(OFMessage msg) {
557 Integer xid = getNextXid();
558 SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
559 messageWaitingDone.put(xid, worker);
560 Object result = null;
561 Boolean status = false;
562 Future<Object> submit = executor.submit(worker);
565 .get(responseTimerValue, TimeUnit.MILLISECONDS);
566 messageWaitingDone.remove(xid);
567 if (result == null) {
568 // if result is null, then it means the switch can handle this message successfully
569 // convert the result into a Boolean with value true
571 //logger.debug("Successfully send " + msg.getType().toString());
574 // if result is not null, this means the switch can't handle this message
575 // the result if OFError already
576 logger.debug("Send " + msg.getType().toString()
577 + " failed --> " + ((OFError) result).toString());
580 } catch (Exception e) {
581 logger.warn("Timeout while waiting for " + msg.getType().toString()
583 // convert the result into a Boolean with value false
591 * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
592 * wake up associated task so that it can continue
594 private void processBarrierReply(OFBarrierReply msg) {
595 Integer xid = msg.getXid();
596 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
598 if (worker == null) {
604 private void processErrorReply(OFError errorMsg) {
605 OFMessage offendingMsg = errorMsg.getOffendingMsg();
607 if (offendingMsg != null) {
608 xid = offendingMsg.getXid();
610 xid = errorMsg.getXid();
613 * the error can be a reply to a synchronous message or to a statistic request message
615 Callable<?> worker = messageWaitingDone.remove(xid);
616 if (worker == null) {
619 if (worker instanceof SynchronousMessage) {
620 ((SynchronousMessage) worker).wakeup(errorMsg);
622 ((StatisticsCollector) worker).wakeup(errorMsg);
626 private void processStatsReply(OFStatisticsReply reply) {
627 Integer xid = reply.getXid();
628 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
630 if (worker == null) {
633 if (worker.collect(reply)) {
634 // if all the stats records are received (collect() returns true)
636 messageWaitingDone.remove(xid);
642 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
643 return this.physicalPorts;
647 public OFPhysicalPort getPhysicalPort(Short portNumber) {
648 return this.physicalPorts.get(portNumber);
652 public Integer getPortBandwidth(Short portNumber) {
653 return this.portBandwidth.get(portNumber);
657 public Set<Short> getPorts() {
658 return this.physicalPorts.keySet();
662 public Byte getTables() {
667 public Integer getActions() {
672 public Integer getCapabilities() {
673 return this.capabilities;
677 public Integer getBuffers() {
682 public boolean isPortEnabled(short portNumber) {
683 return isPortEnabled(physicalPorts.get(portNumber));
687 public boolean isPortEnabled(OFPhysicalPort port) {
691 int portConfig = port.getConfig();
692 int portState = port.getState();
693 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
696 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
699 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
707 public List<OFPhysicalPort> getEnabledPorts() {
708 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
709 synchronized (this.physicalPorts) {
710 for (OFPhysicalPort port : physicalPorts.values()) {
711 if (isPortEnabled(port)) {