1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
3 import java.io.IOException;
4 import java.net.SocketAddress;
5 import java.net.SocketException;
6 import java.nio.channels.AsynchronousCloseException;
7 import java.nio.channels.ClosedSelectorException;
8 import java.util.ArrayList;
10 import java.util.HashMap;
11 import java.util.List;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.util.HashedWheelTimer;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.TimerTask;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchEvent.SwitchEventType;
27 import org.openflow.protocol.OFBarrierReply;
28 import org.openflow.protocol.OFBarrierRequest;
29 import org.openflow.protocol.OFEchoReply;
30 import org.openflow.protocol.OFError;
31 import org.openflow.protocol.OFFeaturesReply;
32 import org.openflow.protocol.OFFlowMod;
33 import org.openflow.protocol.OFGetConfigReply;
34 import org.openflow.protocol.OFMatch;
35 import org.openflow.protocol.OFMessage;
36 import org.openflow.protocol.OFPhysicalPort;
37 import org.openflow.protocol.OFPort;
38 import org.openflow.protocol.OFPortStatus;
39 import org.openflow.protocol.OFSetConfig;
40 import org.openflow.protocol.OFStatisticsReply;
41 import org.openflow.protocol.OFStatisticsRequest;
42 import org.openflow.protocol.OFType;
43 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
44 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
45 import org.openflow.protocol.OFPhysicalPort.OFPortState;
46 import org.openflow.protocol.OFPortStatus.OFPortReason;
47 import org.openflow.protocol.factory.BasicFactory;
48 import org.openflow.protocol.factory.MessageParseException;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
52 public class EnhancedSwitchHandler implements ISwitch {
55 private static final Logger logger = LoggerFactory
56 .getLogger(EnhancedSwitchHandler.class);
57 private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
58 private int MESSAGE_RESPONSE_TIMER = 2000;
60 private EnhancedController controller = null;
61 private Integer switchChannelID = null;
62 private Channel channel;
63 private long lastMsgReceivedTimeStamp = 0;
64 private SwitchState state = null;
65 private BasicFactory factory = null;
66 private HashedWheelTimer timer = null;
67 private SwitchLivelinessTimerTask switchLivelinessTask = null;
68 private Timeout switchLivelinessTaskHandle = null;
70 private AtomicInteger xid;
72 private int capabilities;
75 private Map<Short, OFPhysicalPort> physicalPorts;
76 private Map<Short, Integer> portBandwidth;
77 private Date connectedDate;
78 private ExecutorService executor = null;
79 private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
80 private Integer responseTimerValue;
81 private TrafficStatisticsHandler trafficStatsHandler = null;
82 private static final boolean START_LIVELINESS_TIMER = false;
84 private static final int BATCH_COUNT_FOR_FLUSHING = 3;
85 private int flushBatchTrack = 0;
88 private List<OFMessage> msgBuffer = new ArrayList<OFMessage>();
89 private int bufferTrack = 0;
90 private static final int BATCH_BUFFER_THRESHOLD = 100;
94 // PLEASE .. IF THERE IS SOMETHING CALLED GOD, HELP ME GET THE THROUGHPUT WITH THIS !!
95 private List<OFMessage> flushableMsgBuffer = new ArrayList<OFMessage>();
98 public enum SwitchState {
100 WAIT_FEATURES_REPLY(1),
101 WAIT_CONFIG_REPLY(2),
106 private SwitchState(int value) {
110 @SuppressWarnings("unused")
117 public EnhancedSwitchHandler(EnhancedController controller,
118 Integer switchConnectionChannelID,
120 HashedWheelTimer timer,
121 ExecutorService executor,
122 TrafficStatisticsHandler tHandler){
124 this.controller = controller;
125 this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126 this.portBandwidth = new HashMap<Short, Integer>();
127 this.switchChannelID = switchConnectionChannelID;
130 this.tables = (byte) 0;
131 this.actions = (int) 0;
132 this.capabilities = (int) 0;
133 this.buffers = (int) 0;
134 this.connectedDate = new Date();
135 this.state = SwitchState.NON_OPERATIONAL;
136 this.executor = executor;
137 this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138 this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139 this.channel = channel;
140 this.xid = new AtomicInteger(this.channel.hashCode());
141 this.trafficStatsHandler = tHandler;
146 public void startHandler(){
147 this.factory = new BasicFactory();
153 public void shutDownHandler(){
159 public void handleChannelIdle(){
160 // TODO: this is already handled by OFChannelHandler
167 public void start() {
173 SwitchEvent ev = new SwitchEvent(SwitchEventType.SWITCH_DELETE, this, null);
174 controller.switchDeleted(ev, switchChannelID);
177 private void cancelSwitchTimer() {
178 if (switchLivelinessTaskHandle != null){
179 this.switchLivelinessTaskHandle.cancel();
184 public void handleCaughtException(){
194 public int getNextXid() {
195 return this.xid.incrementAndGet();
199 public Long getId() {
204 public Byte getTables() {
209 public Integer getActions() {
214 public Integer getCapabilities() {
215 return this.capabilities;
219 public Integer getBuffers() {
224 public Date getConnectedDate() {
225 return this.connectedDate;
229 public Integer asyncSend(OFMessage msg) {
230 return asyncSend(msg, getNextXid());
235 public Integer asyncSend(OFMessage msg, int xid) {
237 // BATCHING IMPLEMENTATION. Please think hard before enablng this !!
238 // Some messages could be latency-sensitive and some could be batched
239 // for better throughput. So, below decision may not bring better
240 // throughput for latency-sensitive cases like FLOW-MODs or
244 if (bufferTrack == BUFFER_THRESHOLD){
245 this.channel.write(msgBuffer);
257 //List<OFMessage> msglist = new ArrayList<OFMessage>(1);
259 synchronized( flushableMsgBuffer ) {
260 flushableMsgBuffer.add(msg);
263 trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
264 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
266 //this.channel.write(msglist);
269 if (msg.getType() == OFType.FLOW_MOD){
270 this.trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
271 this.trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
281 public Integer asyncFastSend(OFMessage msg) {
282 return asyncFastSend(msg, getNextXid());
286 public Integer asyncFastSend(OFMessage msg, int xid) {
288 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
290 this.channel.write(msglist);
291 trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
292 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
297 public Object syncSend(OFMessage msg) {
298 int xid = getNextXid();
299 return syncSend(msg, xid);
302 private Object syncSend(OFMessage msg, int xid) {
303 return syncMessageInternal(msg, xid, true);
307 public Map<Short, OFPhysicalPort> getPhysicalPorts() {
308 return this.physicalPorts;
312 public Set<Short> getPorts() {
313 return this.physicalPorts.keySet();
317 public OFPhysicalPort getPhysicalPort(Short portNumber) {
318 return this.physicalPorts.get(portNumber);
322 public Integer getPortBandwidth(Short portNumber) {
323 return this.portBandwidth.get(portNumber);
327 public boolean isPortEnabled(short portNumber) {
328 return isPortEnabled(physicalPorts.get(portNumber));
332 public boolean isPortEnabled(OFPhysicalPort port) {
336 int portConfig = port.getConfig();
337 int portState = port.getState();
338 if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
341 if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
344 if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
353 public List<OFPhysicalPort> getEnabledPorts() {
354 List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
355 synchronized (this.physicalPorts) {
356 for (OFPhysicalPort port : physicalPorts.values()) {
357 if (isPortEnabled(port)) {
367 * WARNING: CALLER WOULD BE BLOCKED
371 public Object getStatistics(OFStatisticsRequest req) {
372 int xid = getNextXid();
373 StatisticsCollector worker = new StatisticsCollector(this, xid, req);
374 messageWaitingDone.put(xid, worker);
375 Future<Object> submit = executor.submit(worker);
376 Object result = null;
378 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
380 } catch (Exception e) {
381 logger.warn("Timeout while waiting for {} replies", req.getType());
382 result = null; // to indicate timeout has occurred
388 public boolean isOperational() {
389 return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
393 public Object syncSendBarrierMessage() {
394 OFBarrierRequest barrierMsg = new OFBarrierRequest();
395 return syncSend(barrierMsg);
399 public Object asyncSendBarrierMessage() {
400 List<OFMessage> msglist = new ArrayList<OFMessage>(1);
401 OFBarrierRequest barrierMsg = new OFBarrierRequest();
402 int xid = getNextXid();
404 barrierMsg.setXid(xid);
405 msglist.add(barrierMsg);
407 this.channel.write(msglist);
408 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REQUEST_SENT);
414 public void handleMessage(OFMessage ofMessage) {
417 logger.debug("Message received: {}", ofMessage.toString());
418 this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
419 OFType type = ofMessage.getType();
422 logger.debug("<<<< HELLO");
423 // send feature request
424 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_RECEIVED);
425 OFMessage featureRequest = factory
426 .getMessage(OFType.FEATURES_REQUEST);
427 asyncFastSend(featureRequest);
428 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REQUEST_SENT);
429 // delete all pre-existing flows
430 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
431 OFFlowMod flowMod = (OFFlowMod) factory
432 .getMessage(OFType.FLOW_MOD);
433 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
434 .setOutPort(OFPort.OFPP_NONE)
435 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
436 asyncFastSend(flowMod);
437 this.state = SwitchState.WAIT_FEATURES_REPLY;
438 if (START_LIVELINESS_TIMER){
443 logger.debug("<<<< ECHO REQUEST");
444 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_RECEIVED);
445 OFEchoReply echoReply = (OFEchoReply) factory
446 .getMessage(OFType.ECHO_REPLY);
447 asyncFastSend(echoReply);
448 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_SENT);
452 logger.debug("<<<< ECHO REPLY");
453 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_RECEIVED);
454 //this.probeSent = false;
457 logger.debug("<<<< FEATURES REPLY");
458 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REPLY_RECEIVED);
459 processFeaturesReply((OFFeaturesReply) ofMessage);
461 case GET_CONFIG_REPLY:
462 logger.debug("<<<< CONFIG REPLY");
463 // make sure that the switch can send the whole packet to the
465 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REPLY_RECEIVED);
466 if (((OFGetConfigReply) ofMessage).getMissSendLength() == (short) 0xffff) {
467 this.state = SwitchState.OPERATIONAL;
471 logger.debug("<<<< BARRIER REPLY");
472 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REPLY_RECEIVED);
473 processBarrierReply((OFBarrierReply) ofMessage);
476 logger.debug("<<<< ERROR");
477 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ERROR_MSG_RECEIVED);
478 processErrorReply((OFError) ofMessage);
481 logger.debug("<<<< PORT STATUS");
482 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PORT_STATUS_RECEIVED);
483 processPortStatusMsg((OFPortStatus) ofMessage);
486 logger.debug("<<<< STATS REPLY");
487 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.STATS_RESPONSE_RECEIVED);
488 processStatsReply((OFStatisticsReply) ofMessage);
491 logger.debug("<<<< PACKET_IN");
492 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
493 trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
498 if (isOperational()) {
499 logger.debug("SWITCH IS OPERATIONAL ... forwarding");
500 SwitchEvent ev = new SwitchEvent(
501 SwitchEvent.SwitchEventType.SWITCH_MESSAGE, this, ofMessage);
502 controller.switchMessage(ev, switchChannelID);
507 private void startSwitchTimer(){
508 if (this.timer != null){
509 if (switchLivelinessTask == null){
510 switchLivelinessTask = new SwitchLivelinessTimerTask();
512 switchLivelinessTaskHandle = timer.newTimeout(switchLivelinessTask,
513 switchLivenessTimeout, TimeUnit.SECONDS);
520 * This method returns the switch liveness timeout value. If controller did
521 * not receive any message from the switch for such a long period,
522 * controller will tear down the connection to the switch.
524 * @return The timeout value
526 private static int getSwitchLivenessTimeout() {
527 String timeout = System.getProperty("of.switchLivenessTimeout");
530 if (timeout != null) {
531 rv = Integer.parseInt(timeout);
533 } catch (Exception e) {
539 private void processFeaturesReply(OFFeaturesReply reply) {
540 if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
541 this.sid = reply.getDatapathId();
542 this.buffers = reply.getBuffers();
543 this.capabilities = reply.getCapabilities();
544 this.tables = reply.getTables();
545 this.actions = reply.getActions();
546 // notify core of this error event
547 for (OFPhysicalPort port : reply.getPorts()) {
548 updatePhysicalPort(port);
550 // config the switch to send full data packet
551 OFSetConfig config = (OFSetConfig) factory
552 .getMessage(OFType.SET_CONFIG);
553 config.setMissSendLength((short) 0xffff).setLengthU(
554 OFSetConfig.MINIMUM_LENGTH);
555 asyncFastSend(config);
556 // send config request to make sure the switch can handle the set
558 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
559 asyncFastSend(getConfig);
560 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REQUEST_SENT);
561 this.state = SwitchState.WAIT_CONFIG_REPLY;
562 // inform core that a new switch is now operational
563 reportSwitchStateChange(true);
568 private void updatePhysicalPort(OFPhysicalPort port) {
569 Short portNumber = port.getPortNumber();
570 physicalPorts.put(portNumber, port);
573 port.getCurrentFeatures()
574 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
575 | OFPortFeatures.OFPPF_10MB_HD
577 | OFPortFeatures.OFPPF_100MB_FD
579 | OFPortFeatures.OFPPF_100MB_HD
581 | OFPortFeatures.OFPPF_1GB_FD
583 | OFPortFeatures.OFPPF_1GB_HD
584 .getValue() | OFPortFeatures.OFPPF_10GB_FD
586 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.UPDATE_PHYSICAL_PORT);
590 private void reportSwitchStateChange(boolean added) {
591 SwitchEvent ev = null;
593 ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, this, null);
594 controller.switchAdded(ev, switchChannelID);
596 ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, this, null);
597 controller.switchDeleted(ev, switchChannelID);
602 protected class SwitchLivelinessTimerTask implements TimerTask {
605 public void run(Timeout timeout) throws Exception {
607 // set this reference in parent so that cancellation is
609 switchLivelinessTaskHandle = timeout;
610 Long now = System.currentTimeMillis();
611 if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
612 if (state == SwitchState.WAIT_FEATURES_REPLY) {
613 // send another features request
614 OFMessage request = factory
615 .getMessage(OFType.FEATURES_REQUEST);
616 asyncFastSend(request);
618 if (state == SwitchState.WAIT_CONFIG_REPLY) {
619 // send another config request
620 OFSetConfig config = (OFSetConfig) factory
621 .getMessage(OFType.SET_CONFIG);
622 config.setMissSendLength((short) 0xffff)
623 .setLengthU(OFSetConfig.MINIMUM_LENGTH);
624 asyncFastSend(config);
625 OFMessage getConfig = factory
626 .getMessage(OFType.GET_CONFIG_REQUEST);
627 asyncFastSend(getConfig);
631 timer.newTimeout(this, switchLivenessTimeout, TimeUnit.SECONDS);
638 * Either a BarrierReply or a OFError is received. If this is a reply for an
639 * outstanding sync message, wake up associated task so that it can continue
641 private void processBarrierReply(OFBarrierReply msg) {
642 Integer xid = msg.getXid();
643 SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
645 if (worker == null) {
651 private void processErrorReply(OFError errorMsg) {
653 OFMessage offendingMsg = errorMsg.getOffendingMsg();
655 if (offendingMsg != null) {
656 xi = offendingMsg.getXid();
658 xi = errorMsg.getXid();
661 catch(MessageParseException mpe){
666 private void processPortStatusMsg(OFPortStatus msg) {
667 OFPhysicalPort port = msg.getDesc();
668 if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
669 updatePhysicalPort(port);
670 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
671 updatePhysicalPort(port);
672 } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
674 deletePhysicalPort(port);
679 private void deletePhysicalPort(OFPhysicalPort port) {
680 Short portNumber = port.getPortNumber();
681 physicalPorts.remove(portNumber);
682 portBandwidth.remove(portNumber);
685 private void processStatsReply(OFStatisticsReply reply) {
686 Integer xid = reply.getXid();
687 StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
689 if (worker == null) {
692 if (worker.collect(reply)) {
693 // if all the stats records are received (collect() returns true)
695 messageWaitingDone.remove(xid);
702 * This method performs synchronous operations for a given message. If
703 * syncRequest is set to true, the message will be sent out followed by a
704 * Barrier request message. Then it's blocked until the Barrier rely arrives
705 * or timeout. If syncRequest is false, it simply skips the message send and
706 * just waits for the response back.
713 * If set to true, the message the message will be sent out
714 * followed by a Barrier request message. If set to false, it
715 * simply skips the sending and just waits for the Barrier reply.
718 private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
719 Object result = null;
721 SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
722 messageWaitingDone.put(xid, worker);
724 Boolean status = false;
725 Future<Object> submit = executor.submit(worker);
727 result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
728 messageWaitingDone.remove(xid);
729 if (result == null) {
730 // if result is null, then it means the switch can handle this
731 // message successfully
732 // convert the result into a Boolean with value true
734 // logger.debug("Successfully send " +
735 // msg.getType().toString());
738 // if result is not null, this means the switch can't handle
740 // the result if OFError already
741 logger.debug("Send {} failed --> {}", msg.getType().toString(),
742 ((OFError) result).toString());
745 } catch (Exception e) {
746 logger.warn("Timeout while waiting for {} reply", msg.getType()
748 // convert the result into a Boolean with value false
758 private void sendFirstHello() {
760 OFMessage msg = factory.getMessage(OFType.HELLO);
762 trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_SENT);
763 trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
764 trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
765 } catch (Exception e) {
771 private void reportError(Exception e) {
772 if (e instanceof AsynchronousCloseException
773 || e instanceof InterruptedException
774 || e instanceof SocketException || e instanceof IOException
775 || e instanceof ClosedSelectorException) {
776 logger.error("Caught exception {}", e.getMessage());
778 logger.error("Caught exception ", e);
780 // notify core of this error event and disconnect the switch
782 // TODO: We do not need this because except-hanling is done by
783 // Controller's OFChannelHandler
786 SwitchEvent ev = new SwitchEvent(
787 SwitchEvent.SwitchEventType.SWITCH_ERROR, this, null);
789 controller.switchError(ev, switchChannelID);
795 public void flushBufferedMessages() {
797 //if (flushBatchTrack > BATCH_COUNT_FOR_FLUSHING){
798 synchronized (flushableMsgBuffer) {
799 if (flushableMsgBuffer.size() > 0){
800 channel.write(flushableMsgBuffer);
801 flushableMsgBuffer.clear();
804 // flushBatchTrack = 0;
810 public SocketAddress getRemoteAddress() {
811 return (channel != null) ? channel.getRemoteAddress() : null;
815 public SocketAddress getLocalAddress() {
816 return (channel != null) ? channel.getLocalAddress() : null;