78ab3275a4ac674333f8b4cb26464b61488d2cda
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11
12 import java.io.IOException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.nio.channels.spi.SelectorProvider;
18 import java.util.ArrayList;
19 import java.util.Comparator;
20 import java.util.Date;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.PriorityBlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.openflow.protocol.OFBarrierReply;
41 import org.openflow.protocol.OFEchoReply;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFeaturesReply;
44 import org.openflow.protocol.OFFlowMod;
45 import org.openflow.protocol.OFGetConfigReply;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFPhysicalPort;
49 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
50 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
51 import org.openflow.protocol.OFPhysicalPort.OFPortState;
52 import org.openflow.protocol.OFPort;
53 import org.openflow.protocol.OFPortStatus;
54 import org.openflow.protocol.OFPortStatus.OFPortReason;
55 import org.openflow.protocol.OFSetConfig;
56 import org.openflow.protocol.OFStatisticsReply;
57 import org.openflow.protocol.OFStatisticsRequest;
58 import org.openflow.protocol.OFType;
59 import org.openflow.protocol.factory.BasicFactory;
60 import org.openflow.util.HexString;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 public class SwitchHandler implements ISwitch {
65     private static final Logger logger = LoggerFactory
66             .getLogger(SwitchHandler.class);
67     private static final int SWITCH_LIVENESS_TIMER = 5000;
68     private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
69     private int MESSAGE_RESPONSE_TIMER = 2000;
70
71     private String instanceName;
72     private ISwitch thisISwitch;
73     private IController core;
74     private Long sid;
75     private Integer buffers;
76     private Integer capabilities;
77     private Byte tables;
78     private Integer actions;
79     private Selector selector;
80     private SocketChannel socket;
81     private BasicFactory factory;
82     private AtomicInteger xid;
83     private SwitchState state;
84     private Timer periodicTimer;
85     private Map<Short, OFPhysicalPort> physicalPorts;
86     private Map<Short, Integer> portBandwidth;
87     private Date connectedDate;
88     private Long lastMsgReceivedTimeStamp;
89     private Boolean probeSent;
90     private ExecutorService executor;
91     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
92     private boolean running;
93     private IMessageReadWrite msgReadWriteService;
94     private Thread switchHandlerThread;
95     private Integer responseTimerValue;
96         private PriorityBlockingQueue<PriorityMessage> transmitQ;
97     private Thread transmitThread;
98     
99     private enum SwitchState {
100         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
101                 3);
102
103         private int value;
104
105         private SwitchState(int value) {
106             this.value = value;
107         }
108
109         @SuppressWarnings("unused")
110         public int value() {
111             return this.value;
112         }
113     }
114
115     public SwitchHandler(Controller core, SocketChannel sc, String name) {
116         this.instanceName = name;
117         this.thisISwitch = this;
118         this.sid = (long) 0;
119         this.buffers = (int)0;
120         this.capabilities = (int)0;
121         this.tables = (byte)0;
122         this.actions = (int)0;
123         this.core = core;
124         this.socket = sc;
125         this.factory = new BasicFactory();
126         this.connectedDate = new Date();
127         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
128         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
129         this.portBandwidth = new HashMap<Short, Integer>();
130         this.state = SwitchState.NON_OPERATIONAL;
131         this.probeSent = false;
132         this.xid = new AtomicInteger(this.socket.hashCode());
133         this.periodicTimer = null;
134         this.executor = Executors.newFixedThreadPool(4);
135         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
136         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
137         String rTimer = System.getProperty("of.messageResponseTimer");
138         if (rTimer != null) {
139                 try {
140                         responseTimerValue = Integer.decode(rTimer);
141                 } catch (NumberFormatException e) {
142                                 logger.warn("Invalid of.messageResponseTimer: {} use default({})",
143                                                 rTimer, MESSAGE_RESPONSE_TIMER);
144                 }
145         }
146         }
147
148     public void start() {
149         try {
150                 startTransmitThread();
151                 setupCommChannel();
152                 sendFirstHello();
153             startHandlerThread();
154         } catch (Exception e) {
155             reportError(e);
156         }
157     }
158
159     private void startHandlerThread() {
160         switchHandlerThread = new Thread(new Runnable() {
161             @Override
162             public void run() {
163                 running = true;
164                 while (running) {
165                     try {
166                         // wait for an incoming connection
167                         selector.select(0);
168                         Iterator<SelectionKey> selectedKeys = selector
169                                 .selectedKeys().iterator();
170                         while (selectedKeys.hasNext()) {
171                             SelectionKey skey = selectedKeys.next();
172                             selectedKeys.remove();
173                             if (skey.isValid() && skey.isWritable()) {
174                                 resumeSend();
175                             }
176                             if (skey.isValid() && skey.isReadable()) {
177                                 handleMessages();
178                             }
179                         }
180                     } catch (Exception e) {
181                         reportError(e);
182                     }
183                 }
184             }
185         }, instanceName);
186         switchHandlerThread.start();
187     }
188
189     public void stop() {
190         running = false;
191         cancelSwitchTimer();
192         try {
193                 selector.wakeup();
194                 selector.close();
195                 } catch (Exception e) {
196                 }
197         try {
198                         socket.close();
199                 } catch (Exception e) {
200                 }
201         try {
202                         msgReadWriteService.stop();
203                 } catch (Exception e) {
204                 }
205         executor.shutdown();
206         
207         selector = null;
208         socket = null;
209                 msgReadWriteService = null;
210                 
211                 if (switchHandlerThread != null) {
212                         switchHandlerThread.interrupt();
213                 }
214                 if (transmitThread != null) {
215                         transmitThread.interrupt();
216                 }
217     }
218
219     @Override
220     public int getNextXid() {
221         return this.xid.incrementAndGet();
222     }
223
224         /**
225          * This method puts the message in an outgoing priority queue with normal
226          * priority. It will be served after high priority messages. The method
227          * should be used for non-critical messages such as statistics request,
228          * discovery packets, etc. An unique XID is generated automatically and
229          * inserted into the message.
230          * 
231          * @param msg The OF message to be sent
232          * @return The XID used
233          */
234     @Override
235     public Integer asyncSend(OFMessage msg) {
236         return asyncSend(msg, getNextXid());
237     }
238
239         /**
240          * This method puts the message in an outgoing priority queue with normal
241          * priority. It will be served after high priority messages. The method
242          * should be used for non-critical messages such as statistics request,
243          * discovery packets, etc. The specified XID is inserted into the message.
244          * 
245          * @param msg The OF message to be Sent
246          * @param xid The XID to be used in the message
247          * @return The XID used
248          */
249     @Override
250     public Integer asyncSend(OFMessage msg, int xid) {
251         msg.setXid(xid);
252         transmitQ.add(new PriorityMessage(msg, 0));
253         return xid;
254     }
255
256         /**
257          * This method puts the message in an outgoing priority queue with high
258          * priority. It will be served first before normal priority messages. The
259          * method should be used for critical messages such as hello, echo reply
260          * etc. An unique XID is generated automatically and inserted into the
261          * message.
262          * 
263          * @param msg The OF message to be sent
264          * @return The XID used
265          */
266     @Override
267     public Integer asyncFastSend(OFMessage msg) {
268         return asyncFastSend(msg, getNextXid());
269     }
270
271         /**
272          * This method puts the message in an outgoing priority queue with high
273          * priority. It will be served first before normal priority messages. The
274          * method should be used for critical messages such as hello, echo reply
275          * etc. The specified XID is inserted into the message.
276          * 
277          * @param msg The OF message to be sent
278          * @return The XID used
279          */
280     @Override
281     public Integer asyncFastSend(OFMessage msg, int xid) {
282         msg.setXid(xid);
283         transmitQ.add(new PriorityMessage(msg, 1));
284         return xid;
285     }
286
287    public void resumeSend() {
288         try {
289                         msgReadWriteService.resumeSend();
290                 } catch (Exception e) {
291                         reportError(e);
292                 }
293     }
294
295     public void handleMessages() {
296         List<OFMessage> msgs = null;
297         
298         try {
299                 msgs = msgReadWriteService.readMessages();
300                 } catch (Exception e) {
301                         reportError(e);
302                 }
303                 
304         if (msgs == null) {
305             logger.debug("{} is down", toString());
306             // the connection is down, inform core
307             reportSwitchStateChange(false);
308             return;
309         }
310         for (OFMessage msg : msgs) {
311             logger.trace("Message received: {}", msg.toString());
312             /*
313             if  ((msg.getType() != OFType.ECHO_REQUEST) &&
314                         (msg.getType() != OFType.ECHO_REPLY)) {
315                 logger.debug(msg.getType().toString() + " received from sw " + toString());
316             }
317              */
318             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
319             OFType type = msg.getType();
320             switch (type) {
321             case HELLO:
322                 // send feature request
323                 OFMessage featureRequest = factory
324                         .getMessage(OFType.FEATURES_REQUEST);
325                 asyncFastSend(featureRequest);
326                 // delete all pre-existing flows
327                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
328                 OFFlowMod flowMod = (OFFlowMod) factory
329                         .getMessage(OFType.FLOW_MOD);
330                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
331                         .setOutPort(OFPort.OFPP_NONE).setLength(
332                                 (short) OFFlowMod.MINIMUM_LENGTH);
333                 asyncFastSend(flowMod);
334                 this.state = SwitchState.WAIT_FEATURES_REPLY;
335                 startSwitchTimer();
336                 break;
337             case ECHO_REQUEST:
338                 OFEchoReply echoReply = (OFEchoReply) factory
339                         .getMessage(OFType.ECHO_REPLY);
340                 asyncFastSend(echoReply);
341                 break;
342             case ECHO_REPLY:
343                 this.probeSent = false;
344                 break;
345             case FEATURES_REPLY:
346                 processFeaturesReply((OFFeaturesReply) msg);
347                 break;
348             case GET_CONFIG_REPLY:
349                 // make sure that the switch can send the whole packet to the controller
350                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
351                     this.state = SwitchState.OPERATIONAL;
352                 }
353                 break;
354             case BARRIER_REPLY:
355                 processBarrierReply((OFBarrierReply) msg);
356                 break;
357             case ERROR:
358                 processErrorReply((OFError) msg);
359                 break;
360             case PORT_STATUS:
361                 processPortStatusMsg((OFPortStatus) msg);
362                 break;
363             case STATS_REPLY:
364                 processStatsReply((OFStatisticsReply) msg);
365                 break;
366             case PACKET_IN:
367                 break;
368             default:
369                 break;
370             } // end of switch
371             if (isOperational()) {
372                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
373             }
374         } // end of for
375     }
376
377     private void processPortStatusMsg(OFPortStatus msg) {
378         //short portNumber = msg.getDesc().getPortNumber();
379         OFPhysicalPort port = msg.getDesc();
380         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
381             updatePhysicalPort(port);
382             //logger.debug("Port " + portNumber + " on " + toString() + " modified");
383         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
384             updatePhysicalPort(port);
385             //logger.debug("Port " + portNumber + " on " + toString() + " added");
386         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
387                 .ordinal()) {
388             deletePhysicalPort(port);
389             //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
390         }
391
392     }
393
394     private void startSwitchTimer() {
395         this.periodicTimer = new Timer();
396         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
397             @Override
398             public void run() {
399                 try {
400                     Long now = System.currentTimeMillis();
401                     if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
402                         if (probeSent) {
403                             // switch failed to respond to our probe, consider it down
404                             logger.warn("{} is idle for too long, disconnect", toString());
405                             reportSwitchStateChange(false);
406                         } else {
407                             // send a probe to see if the switch is still alive
408                             //logger.debug("Send idle probe (Echo Request) to " + switchName());
409                             probeSent = true;
410                             OFMessage echo = factory
411                                     .getMessage(OFType.ECHO_REQUEST);
412                             asyncFastSend(echo);
413                         }
414                     } else {
415                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
416                             // send another features request
417                             OFMessage request = factory
418                                     .getMessage(OFType.FEATURES_REQUEST);
419                             asyncFastSend(request);
420                         } else {
421                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
422                                 //  send another config request
423                                 OFSetConfig config = (OFSetConfig) factory
424                                         .getMessage(OFType.SET_CONFIG);
425                                 config.setMissSendLength((short) 0xffff)
426                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
427                                 asyncFastSend(config);
428                                 OFMessage getConfig = factory
429                                         .getMessage(OFType.GET_CONFIG_REQUEST);
430                                 asyncFastSend(getConfig);
431                             }
432                         }
433                     }
434                 } catch (Exception e) {
435                     reportError(e);
436                 }
437             }
438         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
439     }
440
441     private void cancelSwitchTimer() {
442         if (this.periodicTimer != null) {
443             this.periodicTimer.cancel();
444         }
445     }
446
447     private void reportError(Exception e) {
448         if (e instanceof AsynchronousCloseException) {
449                 logger.debug("Caught exception {}", e.getMessage());
450         } else {
451                 logger.warn("Caught exception {}", e.getMessage());
452         }
453         // notify core of this error event and disconnect the switch
454         ((Controller) core).takeSwitchEventError(this);
455     }
456
457     private void reportSwitchStateChange(boolean added) {
458         if (added) {
459             ((Controller) core).takeSwtichEventAdd(this);
460         } else {
461             ((Controller) core).takeSwitchEventDelete(this);
462         }
463     }
464
465     @Override
466     public Long getId() {
467         return this.sid;
468     }
469
470     private void processFeaturesReply(OFFeaturesReply reply) {
471         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
472             this.sid = reply.getDatapathId();
473             this.buffers = reply.getBuffers();
474             this.capabilities = reply.getCapabilities();
475             this.tables = reply.getTables();
476             this.actions = reply.getActions();
477             // notify core of this error event
478             for (OFPhysicalPort port : reply.getPorts()) {
479                 updatePhysicalPort(port);
480             }
481             // config the switch to send full data packet
482             OFSetConfig config = (OFSetConfig) factory
483                     .getMessage(OFType.SET_CONFIG);
484             config.setMissSendLength((short) 0xffff).setLengthU(
485                     OFSetConfig.MINIMUM_LENGTH);
486             asyncFastSend(config);
487             // send config request to make sure the switch can handle the set config
488             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
489             asyncFastSend(getConfig);
490             this.state = SwitchState.WAIT_CONFIG_REPLY;
491             // inform core that a new switch is now operational
492             reportSwitchStateChange(true);
493         }
494     }
495
496     private void updatePhysicalPort(OFPhysicalPort port) {
497         Short portNumber = port.getPortNumber();
498         physicalPorts.put(portNumber, port);
499         portBandwidth
500                 .put(
501                         portNumber,
502                         port.getCurrentFeatures()
503                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
504                                         | OFPortFeatures.OFPPF_10MB_HD
505                                                 .getValue()
506                                         | OFPortFeatures.OFPPF_100MB_FD
507                                                 .getValue()
508                                         | OFPortFeatures.OFPPF_100MB_HD
509                                                 .getValue()
510                                         | OFPortFeatures.OFPPF_1GB_FD
511                                                 .getValue()
512                                         | OFPortFeatures.OFPPF_1GB_HD
513                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
514                                         .getValue()));
515     }
516
517     private void deletePhysicalPort(OFPhysicalPort port) {
518         Short portNumber = port.getPortNumber();
519         physicalPorts.remove(portNumber);
520         portBandwidth.remove(portNumber);
521     }
522
523     @Override
524     public boolean isOperational() {
525         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
526     }
527
528     @Override
529     public String toString() {
530         return ("["
531                 + this.socket.toString()
532                 + " SWID "
533                 + (isOperational() ? HexString.toHexString(this.sid)
534                         : "unkbown") + "]");
535     }
536
537     @Override
538     public Date getConnectedDate() {
539         return this.connectedDate;
540     }
541
542     public String getInstanceName() {
543         return instanceName;
544     }
545
546     @Override
547     public Object getStatistics(OFStatisticsRequest req) {
548         int xid = getNextXid();
549         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
550         messageWaitingDone.put(xid, worker);
551         Future<Object> submit = executor.submit(worker);
552         Object result = null;
553         try {
554             result = submit
555                     .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
556             return result;
557         } catch (Exception e) {
558             logger.warn("Timeout while waiting for {} replies", req.getType());
559             result = null; // to indicate timeout has occurred
560             return result;
561         }
562     }
563
564     @Override
565     public Object syncSend(OFMessage msg) {
566         Integer xid = getNextXid();
567         SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
568         messageWaitingDone.put(xid, worker);
569         Object result = null;
570         Boolean status = false;
571         Future<Object> submit = executor.submit(worker);
572         try {
573             result = submit
574                     .get(responseTimerValue, TimeUnit.MILLISECONDS);
575             messageWaitingDone.remove(xid);
576             if (result == null) {
577                 // if result  is null, then it means the switch can handle this message successfully
578                 // convert the result into a Boolean with value true
579                 status = true;
580                 //logger.debug("Successfully send " + msg.getType().toString());
581                 result = status;
582             } else {
583                 // if result  is not null, this means the switch can't handle this message
584                 // the result if OFError already
585                 logger.debug("Send {} failed --> {}", 
586                                 msg.getType().toString(), ((OFError) result).toString());
587             }
588             return result;
589         } catch (Exception e) {
590             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
591             // convert the result into a Boolean with value false
592             status = false;
593             result = status;
594             return result;
595         }
596     }
597
598     /*
599      * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
600      * wake up associated task so that it can continue
601      */
602     private void processBarrierReply(OFBarrierReply msg) {
603         Integer xid = msg.getXid();
604         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
605                 .remove(xid);
606         if (worker == null) {
607             return;
608         }
609         worker.wakeup();
610     }
611
612     private void processErrorReply(OFError errorMsg) {
613         OFMessage offendingMsg = errorMsg.getOffendingMsg();
614         Integer xid;
615         if (offendingMsg != null) {
616             xid = offendingMsg.getXid();
617         } else {
618             xid = errorMsg.getXid();
619         }
620         /*
621          * the error can be a reply to a synchronous message or to a statistic request message
622          */
623         Callable<?> worker = messageWaitingDone.remove(xid);
624         if (worker == null) {
625             return;
626         }
627         if (worker instanceof SynchronousMessage) {
628             ((SynchronousMessage) worker).wakeup(errorMsg);
629         } else {
630             ((StatisticsCollector) worker).wakeup(errorMsg);
631         }
632     }
633
634     private void processStatsReply(OFStatisticsReply reply) {
635         Integer xid = reply.getXid();
636         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
637                 .get(xid);
638         if (worker == null) {
639             return;
640         }
641         if (worker.collect(reply)) {
642             // if all the stats records are received (collect() returns true)
643             // then we are done.
644             messageWaitingDone.remove(xid);
645             worker.wakeup();
646         }
647     }
648     
649     @Override
650     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
651         return this.physicalPorts;
652     }
653
654     @Override
655     public OFPhysicalPort getPhysicalPort(Short portNumber) {
656         return this.physicalPorts.get(portNumber);
657     }
658
659     @Override
660     public Integer getPortBandwidth(Short portNumber) {
661         return this.portBandwidth.get(portNumber);
662     }
663
664     @Override
665     public Set<Short> getPorts() {
666         return this.physicalPorts.keySet();
667     }
668
669     @Override
670     public Byte getTables() {
671         return this.tables;
672     }
673     
674     @Override
675     public Integer getActions() {
676         return this.actions;
677     }
678     
679     @Override
680     public Integer getCapabilities() {
681         return this.capabilities;
682     }
683
684     @Override
685     public Integer getBuffers() {
686         return this.buffers;
687     }
688
689     @Override
690     public boolean isPortEnabled(short portNumber) {
691         return isPortEnabled(physicalPorts.get(portNumber));
692     }
693
694     @Override
695     public boolean isPortEnabled(OFPhysicalPort port) {
696         if (port == null) {
697             return false;
698         }
699         int portConfig = port.getConfig();
700         int portState = port.getState();
701         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
702             return false;
703         }
704         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
705             return false;
706         }
707         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
708                 .getValue()) {
709             return false;
710         }
711         return true;
712     }
713
714     @Override
715     public List<OFPhysicalPort> getEnabledPorts() {
716         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
717         synchronized (this.physicalPorts) {
718             for (OFPhysicalPort port : physicalPorts.values()) {
719                 if (isPortEnabled(port)) {
720                     result.add(port);
721                 }
722             }
723         }
724         return result;
725     }
726
727         /*
728          * Transmit thread polls the message out of the priority queue and invokes
729          * messaging service to transmit it over the socket channel
730          */
731     class PriorityMessageTransmit implements Runnable {
732         public void run() {
733             running = true;
734             while (running) {
735                 try {
736                         if (!transmitQ.isEmpty()) {
737                                 PriorityMessage pmsg = transmitQ.poll();
738                                 msgReadWriteService.asyncSend(pmsg.msg);
739                                 logger.trace("Message sent: {}", pmsg.toString());
740                         }
741                         Thread.sleep(10);
742                 } catch (Exception e) {
743                         reportError(e);
744                 }
745             }
746                 transmitQ = null;
747         }
748     }
749
750     /*
751      * Setup and start the transmit thread
752      */
753     private void startTransmitThread() {        
754         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, 
755                                 new Comparator<PriorityMessage>() {
756                                         public int compare(PriorityMessage p1, PriorityMessage p2) {
757                                                 return p2.priority - p1.priority;
758                                         }
759                                 });
760         this.transmitThread = new Thread(new PriorityMessageTransmit());
761         this.transmitThread.start();
762     }
763     
764     /*
765      * Setup communication services
766      */
767     private void setupCommChannel() throws Exception {
768         this.selector = SelectorProvider.provider().openSelector();
769         this.socket.configureBlocking(false);
770         this.socket.socket().setTcpNoDelay(true);        
771         this.msgReadWriteService = getMessageReadWriteService();
772     }
773
774     private void sendFirstHello() {
775         try {
776                 OFMessage msg = factory.getMessage(OFType.HELLO);
777                 asyncFastSend(msg);
778         } catch (Exception e) {
779                 reportError(e);
780         }
781     }
782     
783     private IMessageReadWrite getMessageReadWriteService() throws Exception {
784         String str = System.getProperty("secureChannelEnabled").trim();
785         return ((str != null) && (str.equalsIgnoreCase("true"))) ? 
786                         new SecureMessageReadWriteService(socket, selector) : 
787                         new MessageReadWriteService(socket, selector);
788     }
789 }