52ea7fd575a76ae4f2053c5b1c96c7a10e622c0e
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedSelectorException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.nio.channels.spi.SelectorProvider;
19 import java.util.ArrayList;
20 import java.util.Comparator;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
41 import org.openflow.protocol.OFBarrierReply;
42 import org.openflow.protocol.OFBarrierRequest;
43 import org.openflow.protocol.OFEchoReply;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFFeaturesReply;
46 import org.openflow.protocol.OFFlowMod;
47 import org.openflow.protocol.OFGetConfigReply;
48 import org.openflow.protocol.OFMatch;
49 import org.openflow.protocol.OFMessage;
50 import org.openflow.protocol.OFPhysicalPort;
51 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
52 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
53 import org.openflow.protocol.OFPhysicalPort.OFPortState;
54 import org.openflow.protocol.OFPort;
55 import org.openflow.protocol.OFPortStatus;
56 import org.openflow.protocol.OFPortStatus.OFPortReason;
57 import org.openflow.protocol.OFSetConfig;
58 import org.openflow.protocol.OFStatisticsReply;
59 import org.openflow.protocol.OFStatisticsRequest;
60 import org.openflow.protocol.OFType;
61 import org.openflow.protocol.factory.BasicFactory;
62 import org.openflow.util.HexString;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class SwitchHandler implements ISwitch {
67     private static final Logger logger = LoggerFactory
68             .getLogger(SwitchHandler.class);
69     private static final int SWITCH_LIVENESS_TIMER = 5000;
70     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
71     private final int MESSAGE_RESPONSE_TIMER = 2000;
72
73     private final String instanceName;
74     private final ISwitch thisISwitch;
75     private final IController core;
76     private Long sid;
77     private Integer buffers;
78     private Integer capabilities;
79     private Byte tables;
80     private Integer actions;
81     private Selector selector;
82     private final SocketChannel socket;
83     private final BasicFactory factory;
84     private final AtomicInteger xid;
85     private SwitchState state;
86     private Timer periodicTimer;
87     private final Map<Short, OFPhysicalPort> physicalPorts;
88     private final Map<Short, Integer> portBandwidth;
89     private final Date connectedDate;
90     private Long lastMsgReceivedTimeStamp;
91     private Boolean probeSent;
92     private final ExecutorService executor;
93     private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
94     private boolean running;
95     private IMessageReadWrite msgReadWriteService;
96     private Thread switchHandlerThread;
97     private Integer responseTimerValue;
98     private PriorityBlockingQueue<PriorityMessage> transmitQ;
99     private Thread transmitThread;
100
101     private enum SwitchState {
102         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
103                 3);
104
105         private int value;
106
107         private SwitchState(int value) {
108             this.value = value;
109         }
110
111         @SuppressWarnings("unused")
112         public int value() {
113             return this.value;
114         }
115     }
116
117     public SwitchHandler(Controller core, SocketChannel sc, String name) {
118         this.instanceName = name;
119         this.thisISwitch = this;
120         this.sid = (long) 0;
121         this.buffers = (int) 0;
122         this.capabilities = (int) 0;
123         this.tables = (byte) 0;
124         this.actions = (int) 0;
125         this.core = core;
126         this.socket = sc;
127         this.factory = new BasicFactory();
128         this.connectedDate = new Date();
129         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
130         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
131         this.portBandwidth = new HashMap<Short, Integer>();
132         this.state = SwitchState.NON_OPERATIONAL;
133         this.probeSent = false;
134         this.xid = new AtomicInteger(this.socket.hashCode());
135         this.periodicTimer = null;
136         this.executor = Executors.newFixedThreadPool(4);
137         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139         String rTimer = System.getProperty("of.messageResponseTimer");
140         if (rTimer != null) {
141             try {
142                 responseTimerValue = Integer.decode(rTimer);
143             } catch (NumberFormatException e) {
144                 logger.warn(
145                         "Invalid of.messageResponseTimer: {} use default({})",
146                         rTimer, MESSAGE_RESPONSE_TIMER);
147             }
148         }
149     }
150
151     public void start() {
152         try {
153             startTransmitThread();
154             setupCommChannel();
155             sendFirstHello();
156             startHandlerThread();
157         } catch (Exception e) {
158             reportError(e);
159         }
160     }
161
162     private void startHandlerThread() {
163         switchHandlerThread = new Thread(new Runnable() {
164             @Override
165             public void run() {
166                 running = true;
167                 while (running) {
168                     try {
169                         // wait for an incoming connection
170                         selector.select(0);
171                         Iterator<SelectionKey> selectedKeys = selector
172                                 .selectedKeys().iterator();
173                         while (selectedKeys.hasNext()) {
174                             SelectionKey skey = selectedKeys.next();
175                             selectedKeys.remove();
176                             if (skey.isValid() && skey.isWritable()) {
177                                 resumeSend();
178                             }
179                             if (skey.isValid() && skey.isReadable()) {
180                                 handleMessages();
181                             }
182                         }
183                     } catch (Exception e) {
184                         reportError(e);
185                     }
186                 }
187             }
188         }, instanceName);
189         switchHandlerThread.start();
190     }
191
192     public void stop() {
193         running = false;
194         cancelSwitchTimer();
195         try {
196             selector.wakeup();
197             selector.close();
198         } catch (Exception e) {
199         }
200         try {
201             socket.close();
202         } catch (Exception e) {
203         }
204         try {
205             msgReadWriteService.stop();
206         } catch (Exception e) {
207         }
208         executor.shutdown();
209
210         msgReadWriteService = null;
211
212         if (switchHandlerThread != null) {
213             switchHandlerThread.interrupt();
214         }
215         if (transmitThread != null) {
216             transmitThread.interrupt();
217         }
218     }
219
220     @Override
221     public int getNextXid() {
222         return this.xid.incrementAndGet();
223     }
224
225     /**
226      * This method puts the message in an outgoing priority queue with normal
227      * priority. It will be served after high priority messages. The method
228      * should be used for non-critical messages such as statistics request,
229      * discovery packets, etc. An unique XID is generated automatically and
230      * inserted into the message.
231      *
232      * @param msg
233      *            The OF message to be sent
234      * @return The XID used
235      */
236     @Override
237     public Integer asyncSend(OFMessage msg) {
238         return asyncSend(msg, getNextXid());
239     }
240
241     private Object syncSend(OFMessage msg, int xid) {
242         return syncMessageInternal(msg, xid, true);
243     }
244
245     /**
246      * This method puts the message in an outgoing priority queue with normal
247      * priority. It will be served after high priority messages. The method
248      * should be used for non-critical messages such as statistics request,
249      * discovery packets, etc. The specified XID is inserted into the message.
250      *
251      * @param msg
252      *            The OF message to be Sent
253      * @param xid
254      *            The XID to be used in the message
255      * @return The XID used
256      */
257     @Override
258     public Integer asyncSend(OFMessage msg, int xid) {
259         msg.setXid(xid);
260         if (transmitQ != null) {
261             transmitQ.add(new PriorityMessage(msg, 0));
262         }
263         return xid;
264     }
265
266     /**
267      * This method puts the message in an outgoing priority queue with high
268      * priority. It will be served first before normal priority messages. The
269      * method should be used for critical messages such as hello, echo reply
270      * etc. An unique XID is generated automatically and inserted into the
271      * message.
272      *
273      * @param msg
274      *            The OF message to be sent
275      * @return The XID used
276      */
277     @Override
278     public Integer asyncFastSend(OFMessage msg) {
279         return asyncFastSend(msg, getNextXid());
280     }
281
282     /**
283      * This method puts the message in an outgoing priority queue with high
284      * priority. It will be served first before normal priority messages. The
285      * method should be used for critical messages such as hello, echo reply
286      * etc. The specified XID is inserted into the message.
287      *
288      * @param msg
289      *            The OF message to be sent
290      * @return The XID used
291      */
292     @Override
293     public Integer asyncFastSend(OFMessage msg, int xid) {
294         msg.setXid(xid);
295         if (transmitQ != null) {
296             transmitQ.add(new PriorityMessage(msg, 1));
297         }
298         return xid;
299     }
300
301     public void resumeSend() {
302         try {
303             if (msgReadWriteService != null) {
304                 msgReadWriteService.resumeSend();
305             }
306         } catch (Exception e) {
307             reportError(e);
308         }
309     }
310
311     /**
312      * This method bypasses the transmit queue and sends the message over the
313      * socket directly. If the input xid is not null, the specified xid is
314      * inserted into the message. Otherwise, an unique xid is generated
315      * automatically and inserted into the message.
316      *
317      * @param msg
318      *            Message to be sent
319      * @param xid
320      *            Message xid
321      */
322     private void asyncSendNow(OFMessage msg, Integer xid) {
323         if (xid == null) {
324             xid = getNextXid();
325         }
326         msg.setXid(xid);
327
328         asyncSendNow(msg);
329     }
330
331     /**
332      * This method bypasses the transmit queue and sends the message over the
333      * socket directly.
334      *
335      * @param msg
336      *            Message to be sent
337      */
338     private void asyncSendNow(OFMessage msg) {
339         if (msgReadWriteService == null) {
340             logger.warn(
341                     "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
342                     msg);
343             return;
344         }
345
346         try {
347             msgReadWriteService.asyncSend(msg);
348         } catch (Exception e) {
349             reportError(e);
350         }
351     }
352
353     public void handleMessages() {
354         List<OFMessage> msgs = null;
355
356         try {
357             if (msgReadWriteService != null) {
358                 msgs = msgReadWriteService.readMessages();
359             }
360         } catch (Exception e) {
361             reportError(e);
362         }
363
364         if (msgs == null) {
365             logger.debug("{} is down", this);
366             // the connection is down, inform core
367             reportSwitchStateChange(false);
368             return;
369         }
370         for (OFMessage msg : msgs) {
371             logger.trace("Message received: {}", msg);
372             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
373             OFType type = msg.getType();
374             switch (type) {
375             case HELLO:
376                 // send feature request
377                 OFMessage featureRequest = factory
378                         .getMessage(OFType.FEATURES_REQUEST);
379                 asyncFastSend(featureRequest);
380                 // delete all pre-existing flows
381                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
382                 OFFlowMod flowMod = (OFFlowMod) factory
383                         .getMessage(OFType.FLOW_MOD);
384                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
385                         .setOutPort(OFPort.OFPP_NONE)
386                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
387                 asyncFastSend(flowMod);
388                 this.state = SwitchState.WAIT_FEATURES_REPLY;
389                 startSwitchTimer();
390                 break;
391             case ECHO_REQUEST:
392                 OFEchoReply echoReply = (OFEchoReply) factory
393                         .getMessage(OFType.ECHO_REPLY);
394                 // respond immediately
395                 asyncSendNow(echoReply, msg.getXid());
396                 break;
397             case ECHO_REPLY:
398                 this.probeSent = false;
399                 break;
400             case FEATURES_REPLY:
401                 processFeaturesReply((OFFeaturesReply) msg);
402                 break;
403             case GET_CONFIG_REPLY:
404                 // make sure that the switch can send the whole packet to the
405                 // controller
406                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
407                     this.state = SwitchState.OPERATIONAL;
408                 }
409                 break;
410             case BARRIER_REPLY:
411                 processBarrierReply((OFBarrierReply) msg);
412                 break;
413             case ERROR:
414                 processErrorReply((OFError) msg);
415                 break;
416             case PORT_STATUS:
417                 processPortStatusMsg((OFPortStatus) msg);
418                 break;
419             case STATS_REPLY:
420                 processStatsReply((OFStatisticsReply) msg);
421                 break;
422             case PACKET_IN:
423                 break;
424             default:
425                 break;
426             } // end of switch
427             if (isOperational()) {
428                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
429             }
430         } // end of for
431     }
432
433     private void processPortStatusMsg(OFPortStatus msg) {
434         OFPhysicalPort port = msg.getDesc();
435         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
436             updatePhysicalPort(port);
437         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
438             updatePhysicalPort(port);
439         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
440                 .ordinal()) {
441             deletePhysicalPort(port);
442         }
443
444     }
445
446     private void startSwitchTimer() {
447         this.periodicTimer = new Timer();
448         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
449             @Override
450             public void run() {
451                 try {
452                     Long now = System.currentTimeMillis();
453                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
454                         if (probeSent) {
455                             // switch failed to respond to our probe, consider
456                             // it down
457                             logger.warn("{} is idle for too long, disconnect",
458                                     toString());
459                             reportSwitchStateChange(false);
460                         } else {
461                             // send a probe to see if the switch is still alive
462                             logger.debug(
463                                     "Send idle probe (Echo Request) to {}",
464                                     this);
465                             probeSent = true;
466                             OFMessage echo = factory
467                                     .getMessage(OFType.ECHO_REQUEST);
468                             asyncFastSend(echo);
469                         }
470                     } else {
471                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
472                             // send another features request
473                             OFMessage request = factory
474                                     .getMessage(OFType.FEATURES_REQUEST);
475                             asyncFastSend(request);
476                         } else {
477                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
478                                 // send another config request
479                                 OFSetConfig config = (OFSetConfig) factory
480                                         .getMessage(OFType.SET_CONFIG);
481                                 config.setMissSendLength((short) 0xffff)
482                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
483                                 asyncFastSend(config);
484                                 OFMessage getConfig = factory
485                                         .getMessage(OFType.GET_CONFIG_REQUEST);
486                                 asyncFastSend(getConfig);
487                             }
488                         }
489                     }
490                 } catch (Exception e) {
491                     reportError(e);
492                 }
493             }
494         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
495     }
496
497     private void cancelSwitchTimer() {
498         if (this.periodicTimer != null) {
499             this.periodicTimer.cancel();
500         }
501     }
502
503     private void reportError(Exception e) {
504         if (e instanceof AsynchronousCloseException
505                 || e instanceof InterruptedException
506                 || e instanceof SocketException || e instanceof IOException
507                 || e instanceof ClosedSelectorException) {
508             if (logger.isDebugEnabled()) {
509               logger.debug("Caught exception {}", e.getMessage());
510             }
511         } else {
512             logger.warn("Caught exception ", e);
513         }
514         // notify core of this error event and disconnect the switch
515         ((Controller) core).takeSwitchEventError(this);
516     }
517
518     private void reportSwitchStateChange(boolean added) {
519         if (added) {
520             ((Controller) core).takeSwitchEventAdd(this);
521         } else {
522             ((Controller) core).takeSwitchEventDelete(this);
523         }
524     }
525
526     @Override
527     public Long getId() {
528         return this.sid;
529     }
530
531     private void processFeaturesReply(OFFeaturesReply reply) {
532         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
533             this.sid = reply.getDatapathId();
534             this.buffers = reply.getBuffers();
535             this.capabilities = reply.getCapabilities();
536             this.tables = reply.getTables();
537             this.actions = reply.getActions();
538             // notify core of this error event
539             for (OFPhysicalPort port : reply.getPorts()) {
540                 updatePhysicalPort(port);
541             }
542             // config the switch to send full data packet
543             OFSetConfig config = (OFSetConfig) factory
544                     .getMessage(OFType.SET_CONFIG);
545             config.setMissSendLength((short) 0xffff).setLengthU(
546                     OFSetConfig.MINIMUM_LENGTH);
547             asyncFastSend(config);
548             // send config request to make sure the switch can handle the set
549             // config
550             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
551             asyncFastSend(getConfig);
552             this.state = SwitchState.WAIT_CONFIG_REPLY;
553             // inform core that a new switch is now operational
554             reportSwitchStateChange(true);
555         }
556     }
557
558     private void updatePhysicalPort(OFPhysicalPort port) {
559         Short portNumber = port.getPortNumber();
560         physicalPorts.put(portNumber, port);
561         portBandwidth
562                 .put(portNumber,
563                         port.getCurrentFeatures()
564                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
565                                         | OFPortFeatures.OFPPF_10MB_HD
566                                                 .getValue()
567                                         | OFPortFeatures.OFPPF_100MB_FD
568                                                 .getValue()
569                                         | OFPortFeatures.OFPPF_100MB_HD
570                                                 .getValue()
571                                         | OFPortFeatures.OFPPF_1GB_FD
572                                                 .getValue()
573                                         | OFPortFeatures.OFPPF_1GB_HD
574                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
575                                             .getValue()));
576     }
577
578     private void deletePhysicalPort(OFPhysicalPort port) {
579         Short portNumber = port.getPortNumber();
580         physicalPorts.remove(portNumber);
581         portBandwidth.remove(portNumber);
582     }
583
584     @Override
585     public boolean isOperational() {
586         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
587     }
588
589     @Override
590     public String toString() {
591         try {
592             return ("Switch:"
593                     + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
594                     + " SWID:" + (isOperational() ? HexString
595                     .toHexString(this.sid) : "unknown"));
596         } catch (Exception e) {
597             return (isOperational() ? HexString.toHexString(this.sid)
598                     : "unknown");
599         }
600
601     }
602
603     @Override
604     public Date getConnectedDate() {
605         return this.connectedDate;
606     }
607
608     public String getInstanceName() {
609         return instanceName;
610     }
611
612     @Override
613     public Object getStatistics(OFStatisticsRequest req) {
614         int xid = getNextXid();
615         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
616         messageWaitingDone.put(xid, worker);
617         Future<Object> submit = executor.submit(worker);
618         Object result = null;
619         try {
620             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
621             return result;
622         } catch (Exception e) {
623             logger.warn("Timeout while waiting for {} replies", req.getType());
624             result = null; // to indicate timeout has occurred
625             worker.wakeup();
626             return result;
627         }
628     }
629
630     @Override
631     public Object syncSend(OFMessage msg) {
632         int xid = getNextXid();
633         return syncSend(msg, xid);
634     }
635
636     /*
637      * Either a BarrierReply or a OFError is received. If this is a reply for an
638      * outstanding sync message, wake up associated task so that it can continue
639      */
640     private void processBarrierReply(OFBarrierReply msg) {
641         Integer xid = msg.getXid();
642         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
643                 .remove(xid);
644         if (worker == null) {
645             return;
646         }
647         worker.wakeup();
648     }
649
650     private void processErrorReply(OFError errorMsg) {
651         OFMessage offendingMsg = errorMsg.getOffendingMsg();
652         Integer xid;
653         if (offendingMsg != null) {
654             xid = offendingMsg.getXid();
655         } else {
656             xid = errorMsg.getXid();
657         }
658         /*
659          * the error can be a reply to a synchronous message or to a statistic
660          * request message
661          */
662         Callable<?> worker = messageWaitingDone.remove(xid);
663         if (worker == null) {
664             return;
665         }
666         if (worker instanceof SynchronousMessage) {
667             ((SynchronousMessage) worker).wakeup(errorMsg);
668         } else {
669             ((StatisticsCollector) worker).wakeup(errorMsg);
670         }
671     }
672
673     private void processStatsReply(OFStatisticsReply reply) {
674         Integer xid = reply.getXid();
675         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
676                 .get(xid);
677         if (worker == null) {
678             return;
679         }
680         if (worker.collect(reply)) {
681             // if all the stats records are received (collect() returns true)
682             // then we are done.
683             messageWaitingDone.remove(xid);
684             worker.wakeup();
685         }
686     }
687
688     @Override
689     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
690         return this.physicalPorts;
691     }
692
693     @Override
694     public OFPhysicalPort getPhysicalPort(Short portNumber) {
695         return this.physicalPorts.get(portNumber);
696     }
697
698     @Override
699     public Integer getPortBandwidth(Short portNumber) {
700         return this.portBandwidth.get(portNumber);
701     }
702
703     @Override
704     public Set<Short> getPorts() {
705         return this.physicalPorts.keySet();
706     }
707
708     @Override
709     public Byte getTables() {
710         return this.tables;
711     }
712
713     @Override
714     public Integer getActions() {
715         return this.actions;
716     }
717
718     @Override
719     public Integer getCapabilities() {
720         return this.capabilities;
721     }
722
723     @Override
724     public Integer getBuffers() {
725         return this.buffers;
726     }
727
728     @Override
729     public boolean isPortEnabled(short portNumber) {
730         return isPortEnabled(physicalPorts.get(portNumber));
731     }
732
733     @Override
734     public boolean isPortEnabled(OFPhysicalPort port) {
735         if (port == null) {
736             return false;
737         }
738         int portConfig = port.getConfig();
739         int portState = port.getState();
740         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
741             return false;
742         }
743         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
744             return false;
745         }
746         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
747                 .getValue()) {
748             return false;
749         }
750         return true;
751     }
752
753     @Override
754     public List<OFPhysicalPort> getEnabledPorts() {
755         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
756         synchronized (this.physicalPorts) {
757             for (OFPhysicalPort port : physicalPorts.values()) {
758                 if (isPortEnabled(port)) {
759                     result.add(port);
760                 }
761             }
762         }
763         return result;
764     }
765
766     /*
767      * Transmit thread polls the message out of the priority queue and invokes
768      * messaging service to transmit it over the socket channel
769      */
770     class PriorityMessageTransmit implements Runnable {
771         @Override
772         public void run() {
773             running = true;
774             while (running) {
775                 try {
776                     PriorityMessage pmsg = transmitQ.take();
777                     msgReadWriteService.asyncSend(pmsg.msg);
778                     /*
779                      * If syncReply is set to true, wait for the response back.
780                      */
781                     if (pmsg.syncReply) {
782                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
783                     }
784                 } catch (InterruptedException ie) {
785                     reportError(new InterruptedException(
786                             "PriorityMessageTransmit thread interrupted"));
787                 } catch (Exception e) {
788                     reportError(e);
789                 }
790             }
791             transmitQ = null;
792         }
793     }
794
795     /*
796      * Setup and start the transmit thread
797      */
798     private void startTransmitThread() {
799         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
800                 new Comparator<PriorityMessage>() {
801                     @Override
802                     public int compare(PriorityMessage p1, PriorityMessage p2) {
803                         if (p2.priority != p1.priority) {
804                             return p2.priority - p1.priority;
805                         } else {
806                             return (p2.seqNum < p1.seqNum) ? 1 : -1;
807                         }
808                     }
809                 });
810         this.transmitThread = new Thread(new PriorityMessageTransmit());
811         this.transmitThread.start();
812     }
813
814     /*
815      * Setup communication services
816      */
817     private void setupCommChannel() throws Exception {
818         this.selector = SelectorProvider.provider().openSelector();
819         this.socket.configureBlocking(false);
820         this.socket.socket().setTcpNoDelay(true);
821         this.msgReadWriteService = getMessageReadWriteService();
822     }
823
824     private void sendFirstHello() {
825         try {
826             OFMessage msg = factory.getMessage(OFType.HELLO);
827             asyncFastSend(msg);
828         } catch (Exception e) {
829             reportError(e);
830         }
831     }
832
833     private IMessageReadWrite getMessageReadWriteService() throws Exception {
834         String str = System.getProperty("secureChannelEnabled");
835         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
836                 socket, selector) : new MessageReadWriteService(socket,
837                 selector);
838     }
839
840     /**
841      * Send Barrier message synchronously. The caller will be blocked until the
842      * Barrier reply is received.
843      */
844     @Override
845     public Object syncSendBarrierMessage() {
846         OFBarrierRequest barrierMsg = new OFBarrierRequest();
847         return syncSend(barrierMsg);
848     }
849
850     /**
851      * Send Barrier message asynchronously. The caller is not blocked. The
852      * Barrier message will be sent in a transmit thread which will be blocked
853      * until the Barrier reply is received.
854      */
855     @Override
856     public Object asyncSendBarrierMessage() {
857         if (transmitQ == null) {
858             return Boolean.FALSE;
859         }
860
861         OFBarrierRequest barrierMsg = new OFBarrierRequest();
862         int xid = getNextXid();
863
864         barrierMsg.setXid(xid);
865         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
866
867         return Boolean.TRUE;
868     }
869
870     /**
871      * This method returns the switch liveness timeout value. If controller did
872      * not receive any message from the switch for such a long period,
873      * controller will tear down the connection to the switch.
874      *
875      * @return The timeout value
876      */
877     private static int getSwitchLivenessTimeout() {
878         String timeout = System.getProperty("of.switchLivenessTimeout");
879         int rv = 60500;
880
881         try {
882             if (timeout != null) {
883                 rv = Integer.parseInt(timeout);
884             }
885         } catch (Exception e) {
886         }
887
888         return rv;
889     }
890
891     /**
892      * This method performs synchronous operations for a given message. If
893      * syncRequest is set to true, the message will be sent out followed by a
894      * Barrier request message. Then it's blocked until the Barrier rely arrives
895      * or timeout. If syncRequest is false, it simply skips the message send and
896      * just waits for the response back.
897      *
898      * @param msg
899      *            Message to be sent
900      * @param xid
901      *            Message XID
902      * @param request
903      *            If set to true, the message the message will be sent out
904      *            followed by a Barrier request message. If set to false, it
905      *            simply skips the sending and just waits for the Barrier reply.
906      * @return the result
907      */
908     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
909         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
910         messageWaitingDone.put(xid, worker);
911         Object result = null;
912         Boolean status = false;
913         Future<Object> submit = executor.submit(worker);
914         try {
915             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
916             messageWaitingDone.remove(xid);
917             if (result == null) {
918                 // if result is null, then it means the switch can handle this
919                 // message successfully
920                 // convert the result into a Boolean with value true
921                 status = true;
922                 // logger.debug("Successfully send " +
923                 // msg.getType().toString());
924                 result = status;
925             } else {
926                 // if result is not null, this means the switch can't handle
927                 // this message
928                 // the result if OFError already
929                 if (logger.isDebugEnabled()) {
930                   logger.debug("Send {} failed --> {}", msg.getType(),
931                                (result));
932                 }
933             }
934             return result;
935         } catch (Exception e) {
936             logger.warn("Timeout while waiting for {} reply", msg.getType()
937                     .toString());
938             // convert the result into a Boolean with value false
939             status = false;
940             result = status;
941             worker.wakeup();
942             return result;
943         }
944     }
945 }