1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
3 import java.io.IOException;
4 import java.net.SocketAddress;
5 import java.net.SocketException;
6 import java.nio.channels.AsynchronousCloseException;
7 import java.nio.channels.ClosedSelectorException;
8 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.util.HashedWheelTimer;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.TimerTask;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
26 import org.opendaylight.openflowplugin.openflow.core.internal.StatisticsCollector;
27 import org.opendaylight.openflowplugin.openflow.core.internal.SwitchEvent;
28 import org.opendaylight.openflowplugin.openflow.core.internal.SwitchEvent.SwitchEventType;
29 import org.opendaylight.openflowplugin.openflow.core.internal.SynchronousMessage;
30 import org.openflow.protocol.OFBarrierReply;
31 import org.openflow.protocol.OFBarrierRequest;
32 import org.openflow.protocol.OFEchoReply;
33 import org.openflow.protocol.OFError;
34 import org.openflow.protocol.OFFeaturesReply;
35 import org.openflow.protocol.OFFlowMod;
36 import org.openflow.protocol.OFGetConfigReply;
37 import org.openflow.protocol.OFMatch;
38 import org.openflow.protocol.OFMessage;
39 import org.openflow.protocol.OFPhysicalPort;
40 import org.openflow.protocol.OFPort;
41 import org.openflow.protocol.OFPortStatus;
42 import org.openflow.protocol.OFSetConfig;
43 import org.openflow.protocol.OFStatisticsReply;
44 import org.openflow.protocol.OFStatisticsRequest;
45 import org.openflow.protocol.OFType;
46 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
47 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
48 import org.openflow.protocol.OFPhysicalPort.OFPortState;
49 import org.openflow.protocol.OFPortStatus.OFPortReason;
50 import org.openflow.protocol.factory.BasicFactory;
51 import org.openflow.protocol.factory.MessageParseException;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
55 public class EnhancedSwitchHandler implements IEnhancedSwitch {
58 private static final Logger logger = LoggerFactory
59 .getLogger(EnhancedSwitchHandler.class);
60 private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
61 private int MESSAGE_RESPONSE_TIMER = 2000;
63 private EnhancedController controller = null;
64 private Integer switchChannelID = null;
65 private Channel channel;
66 private long lastMsgReceivedTimeStamp = 0;
67 private SwitchState state = null;
68 private BasicFactory factory = null;
69 private HashedWheelTimer timer = null;
70 private SwitchLivelinessTimerTask switchLivelinessTask = null;
71 private Timeout switchLivelinessTaskHandle = null;
73 private AtomicInteger xid;
75 private int capabilities;
78 private Map<Short, OFPhysicalPort> physicalPorts;
79 private Map<Short, Integer> portBandwidth;
80 private Date connectedDate;
81 private ExecutorService executor = null;
82 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
83 private Integer responseTimerValue;
84 private TrafficStatisticsHandler trafficStatsHandler = null;
85 private static final boolean START_LIVELINESS_TIMER = false;
87 private static final int BATCH_COUNT_FOR_FLUSHING = 3;
88 private int flushBatchTrack = 0;
91 private List<OFMessage> msgBuffer = new ArrayList<OFMessage>();
92 private int bufferTrack = 0;
93 private static final int BATCH_BUFFER_THRESHOLD = 100;
97 // PLEASE .. IF THERE IS SOMETHING CALLED GOD, HELP ME GET THE THROUGHPUT WITH THIS !!
98 private List<OFMessage> flushableMsgBuffer = new ArrayList<OFMessage>();
101 public enum SwitchState {
103 WAIT_FEATURES_REPLY(1),
104 WAIT_CONFIG_REPLY(2),
109 private SwitchState(int value) {
113 @SuppressWarnings("unused")
120 public EnhancedSwitchHandler(EnhancedController controller,
121 Integer switchConnectionChannelID,
123 HashedWheelTimer timer,
124 ExecutorService executor,
125 TrafficStatisticsHandler tHandler){
127 this.controller = controller;
128 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
129 this.portBandwidth = new HashMap<Short, Integer>();
130 this.switchChannelID = switchConnectionChannelID;
133 this.tables = (byte) 0;
134 this.actions = (int) 0;
135 this.capabilities = (int) 0;
136 this.buffers = (int) 0;
137 this.connectedDate = new Date();
138 this.state = SwitchState.NON_OPERATIONAL;
139 this.executor = executor;
140 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
141 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
142 this.channel = channel;
143 this.xid = new AtomicInteger(this.channel.hashCode());
144 this.trafficStatsHandler = tHandler;
148 Integer getSwitchChannelID() {
149 return this.switchChannelID;
152 public void startHandler(){
153 this.factory = new BasicFactory();
159 public void shutDownHandler(){
165 public void handleChannelIdle(){
166 // TODO: this is already handled by OFChannelHandler
173 public void start() {
179 SwitchEvent ev = new SwitchEvent(SwitchEventType.SWITCH_DELETE, this, null);
180 controller.switchDeleted(ev, switchChannelID);
183 private void cancelSwitchTimer() {
184 if (switchLivelinessTaskHandle != null){
185 this.switchLivelinessTaskHandle.cancel();
190 public void handleCaughtException(){
200 public int getNextXid() {
201 return this.xid.incrementAndGet();
205 public Long getId() {
210 public Byte getTables() {
215 public Integer getActions() {
220 public Integer getCapabilities() {
221 return this.capabilities;
225 public Integer getBuffers() {
230 public Date getConnectedDate() {
231 return this.connectedDate;
235 public Integer asyncSend(OFMessage msg) {
236 return asyncSend(msg, getNextXid());
241 public Integer asyncSend(OFMessage msg, int xid) {
243 // BATCHING IMPLEMENTATION. Please think hard before enablng this !!
244 // Some messages could be latency-sensitive and some could be batched
245 // for better throughput. So, below decision may not bring better
246 // throughput for latency-sensitive cases like FLOW-MODs or
250 if (bufferTrack == BUFFER_THRESHOLD){
251 this.channel.write(msgBuffer);
263 //List<OFMessage> msglist = new ArrayList<OFMessage>(1);
265 synchronized( flushableMsgBuffer ) {
266 flushableMsgBuffer.add(msg);
269 trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
270 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
272 //this.channel.write(msglist);
275 if (msg.getType() == OFType.FLOW_MOD){
276 this.trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
277 this.trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
287 public Integer asyncFastSend(OFMessage msg) {
288 return asyncFastSend(msg, getNextXid());
292 public Integer asyncFastSend(OFMessage msg, int xid) {
294 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
296 this.channel.write(msglist);
297 trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
298 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
303 public Object syncSend(OFMessage msg) {
304 int xid = getNextXid();
305 return syncSend(msg, xid);
308 private Object syncSend(OFMessage msg, int xid) {
309 return syncMessageInternal(msg, xid, true);
313 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
314 return this.physicalPorts;
318 public Set<Short> getPorts() {
319 return this.physicalPorts.keySet();
323 public OFPhysicalPort getPhysicalPort(Short portNumber) {
324 return this.physicalPorts.get(portNumber);
328 public Integer getPortBandwidth(Short portNumber) {
329 return this.portBandwidth.get(portNumber);
333 public boolean isPortEnabled(short portNumber) {
334 return isPortEnabled(physicalPorts.get(portNumber));
338 public boolean isPortEnabled(OFPhysicalPort port) {
342 int portConfig = port.getConfig();
343 int portState = port.getState();
344 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
347 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
350 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
359 public List<OFPhysicalPort> getEnabledPorts() {
360 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
361 synchronized (this.physicalPorts) {
362 for (OFPhysicalPort port : physicalPorts.values()) {
363 if (isPortEnabled(port)) {
373 * WARNING: CALLER WOULD BE BLOCKED
377 public Object getStatistics(OFStatisticsRequest req) {
378 int xid = getNextXid();
379 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
380 messageWaitingDone.put(xid, worker);
381 Future<Object> submit = executor.submit(worker);
382 Object result = null;
384 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
386 } catch (Exception e) {
387 logger.warn("Timeout while waiting for {} replies", req.getType());
388 result = null; // to indicate timeout has occurred
394 public boolean isOperational() {
395 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
399 public Object syncSendBarrierMessage() {
400 OFBarrierRequest barrierMsg = new OFBarrierRequest();
401 return syncSend(barrierMsg);
405 public Object asyncSendBarrierMessage() {
406 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
407 OFBarrierRequest barrierMsg = new OFBarrierRequest();
408 int xid = getNextXid();
410 barrierMsg.setXid(xid);
411 msglist.add(barrierMsg);
413 this.channel.write(msglist);
414 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REQUEST_SENT);
420 public void handleMessage(OFMessage ofMessage) {
423 logger.debug("Message received: {}", ofMessage.toString());
424 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
425 OFType type = ofMessage.getType();
428 logger.debug("<<<< HELLO");
429 // send feature request
430 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_RECEIVED);
431 OFMessage featureRequest = factory
432 .getMessage(OFType.FEATURES_REQUEST);
433 asyncFastSend(featureRequest);
434 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REQUEST_SENT);
435 // delete all pre-existing flows
436 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
437 OFFlowMod flowMod = (OFFlowMod) factory
438 .getMessage(OFType.FLOW_MOD);
439 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
440 .setOutPort(OFPort.OFPP_NONE)
441 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
442 asyncFastSend(flowMod);
443 this.state = SwitchState.WAIT_FEATURES_REPLY;
444 if (START_LIVELINESS_TIMER){
449 logger.debug("<<<< ECHO REQUEST");
450 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_RECEIVED);
451 OFEchoReply echoReply = (OFEchoReply) factory
452 .getMessage(OFType.ECHO_REPLY);
453 asyncFastSend(echoReply);
454 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_SENT);
458 logger.debug("<<<< ECHO REPLY");
459 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_RECEIVED);
460 //this.probeSent = false;
463 logger.debug("<<<< FEATURES REPLY");
464 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REPLY_RECEIVED);
465 processFeaturesReply((OFFeaturesReply) ofMessage);
467 case GET_CONFIG_REPLY:
468 logger.debug("<<<< CONFIG REPLY");
469 // make sure that the switch can send the whole packet to the
471 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REPLY_RECEIVED);
472 if (((OFGetConfigReply) ofMessage).getMissSendLength() == (short) 0xffff) {
473 this.state = SwitchState.OPERATIONAL;
477 logger.debug("<<<< BARRIER REPLY");
478 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REPLY_RECEIVED);
479 processBarrierReply((OFBarrierReply) ofMessage);
482 logger.debug("<<<< ERROR");
483 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ERROR_MSG_RECEIVED);
484 processErrorReply((OFError) ofMessage);
487 logger.debug("<<<< PORT STATUS");
488 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PORT_STATUS_RECEIVED);
489 processPortStatusMsg((OFPortStatus) ofMessage);
492 logger.debug("<<<< STATS REPLY");
493 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.STATS_RESPONSE_RECEIVED);
494 processStatsReply((OFStatisticsReply) ofMessage);
497 logger.debug("<<<< PACKET_IN");
498 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
499 trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
504 if (isOperational()) {
505 logger.debug("SWITCH IS OPERATIONAL ... forwarding");
506 SwitchEvent ev = new SwitchEvent(
507 SwitchEvent.SwitchEventType.SWITCH_MESSAGE, this, ofMessage);
508 controller.switchMessage(ev, switchChannelID);
513 private void startSwitchTimer(){
514 if (this.timer != null){
515 if (switchLivelinessTask == null){
516 switchLivelinessTask = new SwitchLivelinessTimerTask();
518 switchLivelinessTaskHandle = timer.newTimeout(switchLivelinessTask,
519 switchLivenessTimeout, TimeUnit.SECONDS);
526 * This method returns the switch liveness timeout value. If controller did
527 * not receive any message from the switch for such a long period,
528 * controller will tear down the connection to the switch.
530 * @return The timeout value
532 private static int getSwitchLivenessTimeout() {
533 String timeout = System.getProperty("of.switchLivenessTimeout");
536 if (timeout != null) {
537 rv = Integer.parseInt(timeout);
539 } catch (Exception e) {
545 private void processFeaturesReply(OFFeaturesReply reply) {
546 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
547 this.sid = reply.getDatapathId();
548 this.buffers = reply.getBuffers();
549 this.capabilities = reply.getCapabilities();
550 this.tables = reply.getTables();
551 this.actions = reply.getActions();
552 // notify core of this error event
553 for (OFPhysicalPort port : reply.getPorts()) {
554 updatePhysicalPort(port);
556 // config the switch to send full data packet
557 OFSetConfig config = (OFSetConfig) factory
558 .getMessage(OFType.SET_CONFIG);
559 config.setMissSendLength((short) 0xffff).setLengthU(
560 OFSetConfig.MINIMUM_LENGTH);
561 asyncFastSend(config);
562 // send config request to make sure the switch can handle the set
564 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
565 asyncFastSend(getConfig);
566 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REQUEST_SENT);
567 this.state = SwitchState.WAIT_CONFIG_REPLY;
568 // inform core that a new switch is now operational
569 reportSwitchStateChange(true);
574 private void updatePhysicalPort(OFPhysicalPort port) {
575 Short portNumber = port.getPortNumber();
576 physicalPorts.put(portNumber, port);
579 port.getCurrentFeatures()
580 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
581 | OFPortFeatures.OFPPF_10MB_HD
583 | OFPortFeatures.OFPPF_100MB_FD
585 | OFPortFeatures.OFPPF_100MB_HD
587 | OFPortFeatures.OFPPF_1GB_FD
589 | OFPortFeatures.OFPPF_1GB_HD
590 .getValue() | OFPortFeatures.OFPPF_10GB_FD
592 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.UPDATE_PHYSICAL_PORT);
596 private void reportSwitchStateChange(boolean added) {
597 SwitchEvent ev = null;
599 ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, this, null);
600 controller.switchAdded(ev, switchChannelID);
602 ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, this, null);
603 controller.switchDeleted(ev, switchChannelID);
608 protected class SwitchLivelinessTimerTask implements TimerTask {
611 public void run(Timeout timeout) throws Exception {
613 // set this reference in parent so that cancellation is
615 switchLivelinessTaskHandle = timeout;
616 Long now = System.currentTimeMillis();
617 if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
618 if (state == SwitchState.WAIT_FEATURES_REPLY) {
619 // send another features request
620 OFMessage request = factory
621 .getMessage(OFType.FEATURES_REQUEST);
622 asyncFastSend(request);
624 if (state == SwitchState.WAIT_CONFIG_REPLY) {
625 // send another config request
626 OFSetConfig config = (OFSetConfig) factory
627 .getMessage(OFType.SET_CONFIG);
628 config.setMissSendLength((short) 0xffff)
629 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
630 asyncFastSend(config);
631 OFMessage getConfig = factory
632 .getMessage(OFType.GET_CONFIG_REQUEST);
633 asyncFastSend(getConfig);
637 timer.newTimeout(this, switchLivenessTimeout, TimeUnit.SECONDS);
644 * Either a BarrierReply or a OFError is received. If this is a reply for an
645 * outstanding sync message, wake up associated task so that it can continue
647 private void processBarrierReply(OFBarrierReply msg) {
648 Integer xid = msg.getXid();
649 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
651 if (worker == null) {
657 private void processErrorReply(OFError errorMsg) {
659 OFMessage offendingMsg = errorMsg.getOffendingMsg();
661 if (offendingMsg != null) {
662 xi = offendingMsg.getXid();
664 xi = errorMsg.getXid();
667 catch(MessageParseException mpe){
672 private void processPortStatusMsg(OFPortStatus msg) {
673 OFPhysicalPort port = msg.getDesc();
674 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
675 updatePhysicalPort(port);
676 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
677 updatePhysicalPort(port);
678 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
680 deletePhysicalPort(port);
685 private void deletePhysicalPort(OFPhysicalPort port) {
686 Short portNumber = port.getPortNumber();
687 physicalPorts.remove(portNumber);
688 portBandwidth.remove(portNumber);
691 private void processStatsReply(OFStatisticsReply reply) {
692 Integer xid = reply.getXid();
693 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
695 if (worker == null) {
698 if (worker.collect(reply)) {
699 // if all the stats records are received (collect() returns true)
701 messageWaitingDone.remove(xid);
708 * This method performs synchronous operations for a given message. If
709 * syncRequest is set to true, the message will be sent out followed by a
710 * Barrier request message. Then it's blocked until the Barrier rely arrives
711 * or timeout. If syncRequest is false, it simply skips the message send and
712 * just waits for the response back.
719 * If set to true, the message the message will be sent out
720 * followed by a Barrier request message. If set to false, it
721 * simply skips the sending and just waits for the Barrier reply.
724 private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
725 Object result = null;
727 SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
728 messageWaitingDone.put(xid, worker);
730 Boolean status = false;
731 Future<Object> submit = executor.submit(worker);
733 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
734 messageWaitingDone.remove(xid);
735 if (result == null) {
736 // if result is null, then it means the switch can handle this
737 // message successfully
738 // convert the result into a Boolean with value true
740 // logger.debug("Successfully send " +
741 // msg.getType().toString());
744 // if result is not null, this means the switch can't handle
746 // the result if OFError already
747 logger.debug("Send {} failed --> {}", msg.getType().toString(),
748 ((OFError) result).toString());
751 } catch (Exception e) {
752 logger.warn("Timeout while waiting for {} reply", msg.getType()
754 // convert the result into a Boolean with value false
764 private void sendFirstHello() {
766 OFMessage msg = factory.getMessage(OFType.HELLO);
768 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_SENT);
769 trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
770 trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
771 } catch (Exception e) {
777 private void reportError(Exception e) {
778 if (e instanceof AsynchronousCloseException
779 || e instanceof InterruptedException
780 || e instanceof SocketException || e instanceof IOException
781 || e instanceof ClosedSelectorException) {
782 logger.error("Caught exception {}", e.getMessage());
784 logger.error("Caught exception ", e);
786 // notify core of this error event and disconnect the switch
788 // TODO: We do not need this because except-hanling is done by
789 // Controller's OFChannelHandler
792 SwitchEvent ev = new SwitchEvent(
793 SwitchEvent.SwitchEventType.SWITCH_ERROR, this, null);
795 controller.switchError(ev, switchChannelID);
801 public void flushBufferedMessages() {
803 //if (flushBatchTrack > BATCH_COUNT_FOR_FLUSHING){
804 synchronized (flushableMsgBuffer) {
805 if (flushableMsgBuffer.size() > 0){
806 channel.write(flushableMsgBuffer);
807 flushableMsgBuffer.clear();
810 // flushBatchTrack = 0;
816 public SocketAddress getRemoteAddress() {
817 return (channel != null) ? channel.getRemoteAddress() : null;
821 public SocketAddress getLocalAddress() {
822 return (channel != null) ? channel.getLocalAddress() : null;