1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
3 import java.io.IOException;
4 import java.net.SocketAddress;
5 import java.net.SocketException;
6 import java.nio.channels.AsynchronousCloseException;
7 import java.nio.channels.ClosedSelectorException;
8 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.util.HashedWheelTimer;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.TimerTask;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
26 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
27 import org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchEvent.SwitchEventType;
28 import org.openflow.protocol.OFBarrierReply;
29 import org.openflow.protocol.OFBarrierRequest;
30 import org.openflow.protocol.OFEchoReply;
31 import org.openflow.protocol.OFError;
32 import org.openflow.protocol.OFFeaturesReply;
33 import org.openflow.protocol.OFFlowMod;
34 import org.openflow.protocol.OFGetConfigReply;
35 import org.openflow.protocol.OFMatch;
36 import org.openflow.protocol.OFMessage;
37 import org.openflow.protocol.OFPhysicalPort;
38 import org.openflow.protocol.OFPort;
39 import org.openflow.protocol.OFPortStatus;
40 import org.openflow.protocol.OFSetConfig;
41 import org.openflow.protocol.OFStatisticsReply;
42 import org.openflow.protocol.OFStatisticsRequest;
43 import org.openflow.protocol.OFType;
44 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
45 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
46 import org.openflow.protocol.OFPhysicalPort.OFPortState;
47 import org.openflow.protocol.OFPortStatus.OFPortReason;
48 import org.openflow.protocol.factory.BasicFactory;
49 import org.openflow.protocol.factory.MessageParseException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
53 public class EnhancedSwitchHandler implements IEnhancedSwitch {
56 private static final Logger logger = LoggerFactory
57 .getLogger(EnhancedSwitchHandler.class);
58 private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
59 private int MESSAGE_RESPONSE_TIMER = 2000;
61 private EnhancedController controller = null;
62 private Integer switchChannelID = null;
63 private Channel channel;
64 private long lastMsgReceivedTimeStamp = 0;
65 private SwitchState state = null;
66 private BasicFactory factory = null;
67 private HashedWheelTimer timer = null;
68 private SwitchLivelinessTimerTask switchLivelinessTask = null;
69 private Timeout switchLivelinessTaskHandle = null;
71 private AtomicInteger xid;
73 private int capabilities;
76 private Map<Short, OFPhysicalPort> physicalPorts;
77 private Map<Short, Integer> portBandwidth;
78 private Date connectedDate;
79 private ExecutorService executor = null;
80 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
81 private Integer responseTimerValue;
82 private TrafficStatisticsHandler trafficStatsHandler = null;
83 private static final boolean START_LIVELINESS_TIMER = false;
85 private static final int BATCH_COUNT_FOR_FLUSHING = 3;
86 private int flushBatchTrack = 0;
89 private List<OFMessage> msgBuffer = new ArrayList<OFMessage>();
90 private int bufferTrack = 0;
91 private static final int BATCH_BUFFER_THRESHOLD = 100;
95 // PLEASE .. IF THERE IS SOMETHING CALLED GOD, HELP ME GET THE THROUGHPUT WITH THIS !!
96 private List<OFMessage> flushableMsgBuffer = new ArrayList<OFMessage>();
99 public enum SwitchState {
101 WAIT_FEATURES_REPLY(1),
102 WAIT_CONFIG_REPLY(2),
107 private SwitchState(int value) {
111 @SuppressWarnings("unused")
118 public EnhancedSwitchHandler(EnhancedController controller,
119 Integer switchConnectionChannelID,
121 HashedWheelTimer timer,
122 ExecutorService executor,
123 TrafficStatisticsHandler tHandler){
125 this.controller = controller;
126 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
127 this.portBandwidth = new HashMap<Short, Integer>();
128 this.switchChannelID = switchConnectionChannelID;
131 this.tables = (byte) 0;
132 this.actions = (int) 0;
133 this.capabilities = (int) 0;
134 this.buffers = (int) 0;
135 this.connectedDate = new Date();
136 this.state = SwitchState.NON_OPERATIONAL;
137 this.executor = executor;
138 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
139 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
140 this.channel = channel;
141 this.xid = new AtomicInteger(this.channel.hashCode());
142 this.trafficStatsHandler = tHandler;
146 Integer getSwitchChannelID() {
147 return this.switchChannelID;
150 public void startHandler(){
151 this.factory = new BasicFactory();
157 public void shutDownHandler(){
163 public void handleChannelIdle(){
164 // TODO: this is already handled by OFChannelHandler
171 public void start() {
177 SwitchEvent ev = new SwitchEvent(SwitchEventType.SWITCH_DELETE, this, null);
178 controller.switchDeleted(ev, switchChannelID);
181 private void cancelSwitchTimer() {
182 if (switchLivelinessTaskHandle != null){
183 this.switchLivelinessTaskHandle.cancel();
188 public void handleCaughtException(){
198 public int getNextXid() {
199 return this.xid.incrementAndGet();
203 public Long getId() {
208 public Byte getTables() {
213 public Integer getActions() {
218 public Integer getCapabilities() {
219 return this.capabilities;
223 public Integer getBuffers() {
228 public Date getConnectedDate() {
229 return this.connectedDate;
233 public Integer asyncSend(OFMessage msg) {
234 return asyncSend(msg, getNextXid());
239 public Integer asyncSend(OFMessage msg, int xid) {
241 // BATCHING IMPLEMENTATION. Please think hard before enablng this !!
242 // Some messages could be latency-sensitive and some could be batched
243 // for better throughput. So, below decision may not bring better
244 // throughput for latency-sensitive cases like FLOW-MODs or
248 if (bufferTrack == BUFFER_THRESHOLD){
249 this.channel.write(msgBuffer);
261 //List<OFMessage> msglist = new ArrayList<OFMessage>(1);
263 synchronized( flushableMsgBuffer ) {
264 flushableMsgBuffer.add(msg);
267 trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
268 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
270 //this.channel.write(msglist);
273 if (msg.getType() == OFType.FLOW_MOD){
274 this.trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
275 this.trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
285 public Integer asyncFastSend(OFMessage msg) {
286 return asyncFastSend(msg, getNextXid());
290 public Integer asyncFastSend(OFMessage msg, int xid) {
292 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
294 this.channel.write(msglist);
295 trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
296 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
301 public Object syncSend(OFMessage msg) {
302 int xid = getNextXid();
303 return syncSend(msg, xid);
306 private Object syncSend(OFMessage msg, int xid) {
307 return syncMessageInternal(msg, xid, true);
311 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
312 return this.physicalPorts;
316 public Set<Short> getPorts() {
317 return this.physicalPorts.keySet();
321 public OFPhysicalPort getPhysicalPort(Short portNumber) {
322 return this.physicalPorts.get(portNumber);
326 public Integer getPortBandwidth(Short portNumber) {
327 return this.portBandwidth.get(portNumber);
331 public boolean isPortEnabled(short portNumber) {
332 return isPortEnabled(physicalPorts.get(portNumber));
336 public boolean isPortEnabled(OFPhysicalPort port) {
340 int portConfig = port.getConfig();
341 int portState = port.getState();
342 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
345 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
348 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
357 public List<OFPhysicalPort> getEnabledPorts() {
358 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
359 synchronized (this.physicalPorts) {
360 for (OFPhysicalPort port : physicalPorts.values()) {
361 if (isPortEnabled(port)) {
371 * WARNING: CALLER WOULD BE BLOCKED
375 public Object getStatistics(OFStatisticsRequest req) {
376 int xid = getNextXid();
377 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
378 messageWaitingDone.put(xid, worker);
379 Future<Object> submit = executor.submit(worker);
380 Object result = null;
382 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
384 } catch (Exception e) {
385 logger.warn("Timeout while waiting for {} replies", req.getType());
386 result = null; // to indicate timeout has occurred
392 public boolean isOperational() {
393 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
397 public Object syncSendBarrierMessage() {
398 OFBarrierRequest barrierMsg = new OFBarrierRequest();
399 return syncSend(barrierMsg);
403 public Object asyncSendBarrierMessage() {
404 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
405 OFBarrierRequest barrierMsg = new OFBarrierRequest();
406 int xid = getNextXid();
408 barrierMsg.setXid(xid);
409 msglist.add(barrierMsg);
411 this.channel.write(msglist);
412 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REQUEST_SENT);
418 public void handleMessage(OFMessage ofMessage) {
421 logger.debug("Message received: {}", ofMessage.toString());
422 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
423 OFType type = ofMessage.getType();
426 logger.debug("<<<< HELLO");
427 // send feature request
428 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_RECEIVED);
429 OFMessage featureRequest = factory
430 .getMessage(OFType.FEATURES_REQUEST);
431 asyncFastSend(featureRequest);
432 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REQUEST_SENT);
433 // delete all pre-existing flows
434 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
435 OFFlowMod flowMod = (OFFlowMod) factory
436 .getMessage(OFType.FLOW_MOD);
437 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
438 .setOutPort(OFPort.OFPP_NONE)
439 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
440 asyncFastSend(flowMod);
441 this.state = SwitchState.WAIT_FEATURES_REPLY;
442 if (START_LIVELINESS_TIMER){
447 logger.debug("<<<< ECHO REQUEST");
448 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_RECEIVED);
449 OFEchoReply echoReply = (OFEchoReply) factory
450 .getMessage(OFType.ECHO_REPLY);
451 asyncFastSend(echoReply);
452 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_SENT);
456 logger.debug("<<<< ECHO REPLY");
457 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_RECEIVED);
458 //this.probeSent = false;
461 logger.debug("<<<< FEATURES REPLY");
462 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REPLY_RECEIVED);
463 processFeaturesReply((OFFeaturesReply) ofMessage);
465 case GET_CONFIG_REPLY:
466 logger.debug("<<<< CONFIG REPLY");
467 // make sure that the switch can send the whole packet to the
469 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REPLY_RECEIVED);
470 if (((OFGetConfigReply) ofMessage).getMissSendLength() == (short) 0xffff) {
471 this.state = SwitchState.OPERATIONAL;
475 logger.debug("<<<< BARRIER REPLY");
476 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REPLY_RECEIVED);
477 processBarrierReply((OFBarrierReply) ofMessage);
480 logger.debug("<<<< ERROR");
481 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ERROR_MSG_RECEIVED);
482 processErrorReply((OFError) ofMessage);
485 logger.debug("<<<< PORT STATUS");
486 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PORT_STATUS_RECEIVED);
487 processPortStatusMsg((OFPortStatus) ofMessage);
490 logger.debug("<<<< STATS REPLY");
491 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.STATS_RESPONSE_RECEIVED);
492 processStatsReply((OFStatisticsReply) ofMessage);
495 logger.debug("<<<< PACKET_IN");
496 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
497 trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
502 if (isOperational()) {
503 logger.debug("SWITCH IS OPERATIONAL ... forwarding");
504 SwitchEvent ev = new SwitchEvent(
505 SwitchEvent.SwitchEventType.SWITCH_MESSAGE, this, ofMessage);
506 controller.switchMessage(ev, switchChannelID);
511 private void startSwitchTimer(){
512 if (this.timer != null){
513 if (switchLivelinessTask == null){
514 switchLivelinessTask = new SwitchLivelinessTimerTask();
516 switchLivelinessTaskHandle = timer.newTimeout(switchLivelinessTask,
517 switchLivenessTimeout, TimeUnit.SECONDS);
524 * This method returns the switch liveness timeout value. If controller did
525 * not receive any message from the switch for such a long period,
526 * controller will tear down the connection to the switch.
528 * @return The timeout value
530 private static int getSwitchLivenessTimeout() {
531 String timeout = System.getProperty("of.switchLivenessTimeout");
534 if (timeout != null) {
535 rv = Integer.parseInt(timeout);
537 } catch (Exception e) {
543 private void processFeaturesReply(OFFeaturesReply reply) {
544 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
545 this.sid = reply.getDatapathId();
546 this.buffers = reply.getBuffers();
547 this.capabilities = reply.getCapabilities();
548 this.tables = reply.getTables();
549 this.actions = reply.getActions();
550 // notify core of this error event
551 for (OFPhysicalPort port : reply.getPorts()) {
552 updatePhysicalPort(port);
554 // config the switch to send full data packet
555 OFSetConfig config = (OFSetConfig) factory
556 .getMessage(OFType.SET_CONFIG);
557 config.setMissSendLength((short) 0xffff).setLengthU(
558 OFSetConfig.MINIMUM_LENGTH);
559 asyncFastSend(config);
560 // send config request to make sure the switch can handle the set
562 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
563 asyncFastSend(getConfig);
564 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REQUEST_SENT);
565 this.state = SwitchState.WAIT_CONFIG_REPLY;
566 // inform core that a new switch is now operational
567 reportSwitchStateChange(true);
572 private void updatePhysicalPort(OFPhysicalPort port) {
573 Short portNumber = port.getPortNumber();
574 physicalPorts.put(portNumber, port);
577 port.getCurrentFeatures()
578 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
579 | OFPortFeatures.OFPPF_10MB_HD
581 | OFPortFeatures.OFPPF_100MB_FD
583 | OFPortFeatures.OFPPF_100MB_HD
585 | OFPortFeatures.OFPPF_1GB_FD
587 | OFPortFeatures.OFPPF_1GB_HD
588 .getValue() | OFPortFeatures.OFPPF_10GB_FD
590 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.UPDATE_PHYSICAL_PORT);
594 private void reportSwitchStateChange(boolean added) {
595 SwitchEvent ev = null;
597 ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, this, null);
598 controller.switchAdded(ev, switchChannelID);
600 ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, this, null);
601 controller.switchDeleted(ev, switchChannelID);
606 protected class SwitchLivelinessTimerTask implements TimerTask {
609 public void run(Timeout timeout) throws Exception {
611 // set this reference in parent so that cancellation is
613 switchLivelinessTaskHandle = timeout;
614 Long now = System.currentTimeMillis();
615 if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
616 if (state == SwitchState.WAIT_FEATURES_REPLY) {
617 // send another features request
618 OFMessage request = factory
619 .getMessage(OFType.FEATURES_REQUEST);
620 asyncFastSend(request);
622 if (state == SwitchState.WAIT_CONFIG_REPLY) {
623 // send another config request
624 OFSetConfig config = (OFSetConfig) factory
625 .getMessage(OFType.SET_CONFIG);
626 config.setMissSendLength((short) 0xffff)
627 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
628 asyncFastSend(config);
629 OFMessage getConfig = factory
630 .getMessage(OFType.GET_CONFIG_REQUEST);
631 asyncFastSend(getConfig);
635 timer.newTimeout(this, switchLivenessTimeout, TimeUnit.SECONDS);
642 * Either a BarrierReply or a OFError is received. If this is a reply for an
643 * outstanding sync message, wake up associated task so that it can continue
645 private void processBarrierReply(OFBarrierReply msg) {
646 Integer xid = msg.getXid();
647 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
649 if (worker == null) {
655 private void processErrorReply(OFError errorMsg) {
657 OFMessage offendingMsg = errorMsg.getOffendingMsg();
659 if (offendingMsg != null) {
660 xi = offendingMsg.getXid();
662 xi = errorMsg.getXid();
665 catch(MessageParseException mpe){
670 private void processPortStatusMsg(OFPortStatus msg) {
671 OFPhysicalPort port = msg.getDesc();
672 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
673 updatePhysicalPort(port);
674 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
675 updatePhysicalPort(port);
676 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
678 deletePhysicalPort(port);
683 private void deletePhysicalPort(OFPhysicalPort port) {
684 Short portNumber = port.getPortNumber();
685 physicalPorts.remove(portNumber);
686 portBandwidth.remove(portNumber);
689 private void processStatsReply(OFStatisticsReply reply) {
690 Integer xid = reply.getXid();
691 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
693 if (worker == null) {
696 if (worker.collect(reply)) {
697 // if all the stats records are received (collect() returns true)
699 messageWaitingDone.remove(xid);
706 * This method performs synchronous operations for a given message. If
707 * syncRequest is set to true, the message will be sent out followed by a
708 * Barrier request message. Then it's blocked until the Barrier rely arrives
709 * or timeout. If syncRequest is false, it simply skips the message send and
710 * just waits for the response back.
717 * If set to true, the message the message will be sent out
718 * followed by a Barrier request message. If set to false, it
719 * simply skips the sending and just waits for the Barrier reply.
722 private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
723 Object result = null;
725 SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
726 messageWaitingDone.put(xid, worker);
728 Boolean status = false;
729 Future<Object> submit = executor.submit(worker);
731 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
732 messageWaitingDone.remove(xid);
733 if (result == null) {
734 // if result is null, then it means the switch can handle this
735 // message successfully
736 // convert the result into a Boolean with value true
738 // logger.debug("Successfully send " +
739 // msg.getType().toString());
742 // if result is not null, this means the switch can't handle
744 // the result if OFError already
745 logger.debug("Send {} failed --> {}", msg.getType().toString(),
746 ((OFError) result).toString());
749 } catch (Exception e) {
750 logger.warn("Timeout while waiting for {} reply", msg.getType()
752 // convert the result into a Boolean with value false
762 private void sendFirstHello() {
764 OFMessage msg = factory.getMessage(OFType.HELLO);
766 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_SENT);
767 trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
768 trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
769 } catch (Exception e) {
775 private void reportError(Exception e) {
776 if (e instanceof AsynchronousCloseException
777 || e instanceof InterruptedException
778 || e instanceof SocketException || e instanceof IOException
779 || e instanceof ClosedSelectorException) {
780 logger.error("Caught exception {}", e.getMessage());
782 logger.error("Caught exception ", e);
784 // notify core of this error event and disconnect the switch
786 // TODO: We do not need this because except-hanling is done by
787 // Controller's OFChannelHandler
790 SwitchEvent ev = new SwitchEvent(
791 SwitchEvent.SwitchEventType.SWITCH_ERROR, this, null);
793 controller.switchError(ev, switchChannelID);
799 public void flushBufferedMessages() {
801 //if (flushBatchTrack > BATCH_COUNT_FOR_FLUSHING){
802 synchronized (flushableMsgBuffer) {
803 if (flushableMsgBuffer.size() > 0){
804 channel.write(flushableMsgBuffer);
805 flushableMsgBuffer.clear();
808 // flushBatchTrack = 0;
814 public SocketAddress getRemoteAddress() {
815 return (channel != null) ? channel.getRemoteAddress() : null;
819 public SocketAddress getLocalAddress() {
820 return (channel != null) ? channel.getLocalAddress() : null;