Merge "Added javadoc to maven-yang-plugin."
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.nio.channels.spi.SelectorProvider;
18 import java.util.ArrayList;
19 import java.util.Comparator;
20 import java.util.Date;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.PriorityBlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.openflow.protocol.OFBarrierReply;
41 import org.openflow.protocol.OFEchoReply;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFeaturesReply;
44 import org.openflow.protocol.OFFlowMod;
45 import org.openflow.protocol.OFGetConfigReply;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFPhysicalPort;
49 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
50 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
51 import org.openflow.protocol.OFPhysicalPort.OFPortState;
52 import org.openflow.protocol.OFPort;
53 import org.openflow.protocol.OFPortStatus;
54 import org.openflow.protocol.OFPortStatus.OFPortReason;
55 import org.openflow.protocol.OFSetConfig;
56 import org.openflow.protocol.OFStatisticsReply;
57 import org.openflow.protocol.OFStatisticsRequest;
58 import org.openflow.protocol.OFType;
59 import org.openflow.protocol.factory.BasicFactory;
60 import org.openflow.util.HexString;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 public class SwitchHandler implements ISwitch {
65     private static final Logger logger = LoggerFactory
66             .getLogger(SwitchHandler.class);
67     private static final int SWITCH_LIVENESS_TIMER = 5000;
68     private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
69     private int MESSAGE_RESPONSE_TIMER = 2000;
70
71     private String instanceName;
72     private ISwitch thisISwitch;
73     private IController core;
74     private Long sid;
75     private Integer buffers;
76     private Integer capabilities;
77     private Byte tables;
78     private Integer actions;
79     private Selector selector;
80     private SocketChannel socket;
81     private BasicFactory factory;
82     private AtomicInteger xid;
83     private SwitchState state;
84     private Timer periodicTimer;
85     private Map<Short, OFPhysicalPort> physicalPorts;
86     private Map<Short, Integer> portBandwidth;
87     private Date connectedDate;
88     private Long lastMsgReceivedTimeStamp;
89     private Boolean probeSent;
90     private ExecutorService executor;
91     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
92     private boolean running;
93     private IMessageReadWrite msgReadWriteService;
94     private Thread switchHandlerThread;
95     private Integer responseTimerValue;
96         private PriorityBlockingQueue<PriorityMessage> transmitQ;
97     private Thread transmitThread;
98     
99     private enum SwitchState {
100         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
101                 3);
102
103         private int value;
104
105         private SwitchState(int value) {
106             this.value = value;
107         }
108
109         @SuppressWarnings("unused")
110         public int value() {
111             return this.value;
112         }
113     }
114
115     public SwitchHandler(Controller core, SocketChannel sc, String name) {
116         this.instanceName = name;
117         this.thisISwitch = this;
118         this.sid = (long) 0;
119         this.buffers = (int)0;
120         this.capabilities = (int)0;
121         this.tables = (byte)0;
122         this.actions = (int)0;
123         this.core = core;
124         this.socket = sc;
125         this.factory = new BasicFactory();
126         this.connectedDate = new Date();
127         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
128         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
129         this.portBandwidth = new HashMap<Short, Integer>();
130         this.state = SwitchState.NON_OPERATIONAL;
131         this.probeSent = false;
132         this.xid = new AtomicInteger(this.socket.hashCode());
133         this.periodicTimer = null;
134         this.executor = Executors.newFixedThreadPool(4);
135         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
136         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
137         String rTimer = System.getProperty("of.messageResponseTimer");
138         if (rTimer != null) {
139                 try {
140                         responseTimerValue = Integer.decode(rTimer);
141                 } catch (NumberFormatException e) {
142                                 logger.warn("Invalid of.messageResponseTimer: {} use default({})",
143                                                 rTimer, MESSAGE_RESPONSE_TIMER);
144                 }
145         }
146         }
147
148     public void start() {
149         try {
150                 startTransmitThread();
151                 setupCommChannel();
152                 sendFirstHello();
153             startHandlerThread();
154         } catch (Exception e) {
155             reportError(e);
156         }
157     }
158
159     private void startHandlerThread() {
160         switchHandlerThread = new Thread(new Runnable() {
161             @Override
162             public void run() {
163                 running = true;
164                 while (running) {
165                     try {
166                         // wait for an incoming connection
167                         selector.select(0);
168                         Iterator<SelectionKey> selectedKeys = selector
169                                 .selectedKeys().iterator();
170                         while (selectedKeys.hasNext()) {
171                             SelectionKey skey = selectedKeys.next();
172                             selectedKeys.remove();
173                             if (skey.isValid() && skey.isWritable()) {
174                                 resumeSend();
175                             }
176                             if (skey.isValid() && skey.isReadable()) {
177                                 handleMessages();
178                             }
179                         }
180                     } catch (Exception e) {
181                         reportError(e);
182                     }
183                 }
184             }
185         }, instanceName);
186         switchHandlerThread.start();
187     }
188
189     public void stop() {
190         running = false;
191         cancelSwitchTimer();
192         try {
193                 selector.wakeup();
194                 selector.close();
195                 } catch (Exception e) {
196                 }
197         try {
198                         socket.close();
199                 } catch (Exception e) {
200                 }
201         try {
202                         msgReadWriteService.stop();
203                 } catch (Exception e) {
204                 }
205         executor.shutdown();
206         
207         selector = null;
208         socket = null;
209                 msgReadWriteService = null;
210                 
211                 if (switchHandlerThread != null) {
212                         switchHandlerThread.interrupt();
213                 }
214                 if (transmitThread != null) {
215                         transmitThread.interrupt();
216                 }
217     }
218
219     @Override
220     public int getNextXid() {
221         return this.xid.incrementAndGet();
222     }
223
224         /**
225          * This method puts the message in an outgoing priority queue with normal
226          * priority. It will be served after high priority messages. The method
227          * should be used for non-critical messages such as statistics request,
228          * discovery packets, etc. An unique XID is generated automatically and
229          * inserted into the message.
230          * 
231          * @param msg The OF message to be sent
232          * @return The XID used
233          */
234     @Override
235     public Integer asyncSend(OFMessage msg) {
236         return asyncSend(msg, getNextXid());
237     }
238
239         /**
240          * This method puts the message in an outgoing priority queue with normal
241          * priority. It will be served after high priority messages. The method
242          * should be used for non-critical messages such as statistics request,
243          * discovery packets, etc. The specified XID is inserted into the message.
244          * 
245          * @param msg The OF message to be Sent
246          * @param xid The XID to be used in the message
247          * @return The XID used
248          */
249     @Override
250     public Integer asyncSend(OFMessage msg, int xid) {
251         msg.setXid(xid);
252         if (transmitQ != null) {
253                 transmitQ.add(new PriorityMessage(msg, 0));
254         }
255         return xid;
256     }
257
258         /**
259          * This method puts the message in an outgoing priority queue with high
260          * priority. It will be served first before normal priority messages. The
261          * method should be used for critical messages such as hello, echo reply
262          * etc. An unique XID is generated automatically and inserted into the
263          * message.
264          * 
265          * @param msg The OF message to be sent
266          * @return The XID used
267          */
268     @Override
269     public Integer asyncFastSend(OFMessage msg) {
270         return asyncFastSend(msg, getNextXid());
271     }
272
273         /**
274          * This method puts the message in an outgoing priority queue with high
275          * priority. It will be served first before normal priority messages. The
276          * method should be used for critical messages such as hello, echo reply
277          * etc. The specified XID is inserted into the message.
278          * 
279          * @param msg The OF message to be sent
280          * @return The XID used
281          */
282     @Override
283     public Integer asyncFastSend(OFMessage msg, int xid) {
284         msg.setXid(xid);
285         if (transmitQ != null) {
286                 transmitQ.add(new PriorityMessage(msg, 1));
287         }
288         return xid;
289     }
290
291    public void resumeSend() {
292         try {
293                 if (msgReadWriteService != null) {
294                         msgReadWriteService.resumeSend();
295                 }
296                 } catch (Exception e) {
297                         reportError(e);
298                 }
299     }
300
301     public void handleMessages() {
302         List<OFMessage> msgs = null;
303         
304         try {
305                 msgs = msgReadWriteService.readMessages();
306                 } catch (Exception e) {
307                         reportError(e);
308                 }
309                 
310         if (msgs == null) {
311             logger.debug("{} is down", toString());
312             // the connection is down, inform core
313             reportSwitchStateChange(false);
314             return;
315         }
316         for (OFMessage msg : msgs) {
317             logger.trace("Message received: {}", msg.toString());
318             /*
319             if  ((msg.getType() != OFType.ECHO_REQUEST) &&
320                         (msg.getType() != OFType.ECHO_REPLY)) {
321                 logger.debug(msg.getType().toString() + " received from sw " + toString());
322             }
323              */
324             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
325             OFType type = msg.getType();
326             switch (type) {
327             case HELLO:
328                 // send feature request
329                 OFMessage featureRequest = factory
330                         .getMessage(OFType.FEATURES_REQUEST);
331                 asyncFastSend(featureRequest);
332                 // delete all pre-existing flows
333                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
334                 OFFlowMod flowMod = (OFFlowMod) factory
335                         .getMessage(OFType.FLOW_MOD);
336                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
337                         .setOutPort(OFPort.OFPP_NONE).setLength(
338                                 (short) OFFlowMod.MINIMUM_LENGTH);
339                 asyncFastSend(flowMod);
340                 this.state = SwitchState.WAIT_FEATURES_REPLY;
341                 startSwitchTimer();
342                 break;
343             case ECHO_REQUEST:
344                 OFEchoReply echoReply = (OFEchoReply) factory
345                         .getMessage(OFType.ECHO_REPLY);
346                 asyncFastSend(echoReply);
347                 break;
348             case ECHO_REPLY:
349                 this.probeSent = false;
350                 break;
351             case FEATURES_REPLY:
352                 processFeaturesReply((OFFeaturesReply) msg);
353                 break;
354             case GET_CONFIG_REPLY:
355                 // make sure that the switch can send the whole packet to the controller
356                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
357                     this.state = SwitchState.OPERATIONAL;
358                 }
359                 break;
360             case BARRIER_REPLY:
361                 processBarrierReply((OFBarrierReply) msg);
362                 break;
363             case ERROR:
364                 processErrorReply((OFError) msg);
365                 break;
366             case PORT_STATUS:
367                 processPortStatusMsg((OFPortStatus) msg);
368                 break;
369             case STATS_REPLY:
370                 processStatsReply((OFStatisticsReply) msg);
371                 break;
372             case PACKET_IN:
373                 break;
374             default:
375                 break;
376             } // end of switch
377             if (isOperational()) {
378                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
379             }
380         } // end of for
381     }
382
383     private void processPortStatusMsg(OFPortStatus msg) {
384         //short portNumber = msg.getDesc().getPortNumber();
385         OFPhysicalPort port = msg.getDesc();
386         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
387             updatePhysicalPort(port);
388             //logger.debug("Port " + portNumber + " on " + toString() + " modified");
389         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
390             updatePhysicalPort(port);
391             //logger.debug("Port " + portNumber + " on " + toString() + " added");
392         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
393                 .ordinal()) {
394             deletePhysicalPort(port);
395             //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
396         }
397
398     }
399
400     private void startSwitchTimer() {
401         this.periodicTimer = new Timer();
402         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
403             @Override
404             public void run() {
405                 try {
406                     Long now = System.currentTimeMillis();
407                     if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
408                         if (probeSent) {
409                             // switch failed to respond to our probe, consider it down
410                             logger.warn("{} is idle for too long, disconnect", toString());
411                             reportSwitchStateChange(false);
412                         } else {
413                             // send a probe to see if the switch is still alive
414                             //logger.debug("Send idle probe (Echo Request) to " + switchName());
415                             probeSent = true;
416                             OFMessage echo = factory
417                                     .getMessage(OFType.ECHO_REQUEST);
418                             asyncFastSend(echo);
419                         }
420                     } else {
421                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
422                             // send another features request
423                             OFMessage request = factory
424                                     .getMessage(OFType.FEATURES_REQUEST);
425                             asyncFastSend(request);
426                         } else {
427                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
428                                 //  send another config request
429                                 OFSetConfig config = (OFSetConfig) factory
430                                         .getMessage(OFType.SET_CONFIG);
431                                 config.setMissSendLength((short) 0xffff)
432                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
433                                 asyncFastSend(config);
434                                 OFMessage getConfig = factory
435                                         .getMessage(OFType.GET_CONFIG_REQUEST);
436                                 asyncFastSend(getConfig);
437                             }
438                         }
439                     }
440                 } catch (Exception e) {
441                     reportError(e);
442                 }
443             }
444         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
445     }
446
447     private void cancelSwitchTimer() {
448         if (this.periodicTimer != null) {
449             this.periodicTimer.cancel();
450         }
451     }
452
453     private void reportError(Exception e) {
454         if (e instanceof AsynchronousCloseException ||
455                 e instanceof InterruptedException ||
456                 e instanceof SocketException) {
457                 logger.debug("Caught exception {}", e.getMessage());
458         } else {
459                 logger.warn("Caught exception {}", e.getMessage());
460         }
461         // notify core of this error event and disconnect the switch
462         ((Controller) core).takeSwitchEventError(this);
463     }
464
465     private void reportSwitchStateChange(boolean added) {
466         if (added) {
467             ((Controller) core).takeSwtichEventAdd(this);
468         } else {
469             ((Controller) core).takeSwitchEventDelete(this);
470         }
471     }
472
473     @Override
474     public Long getId() {
475         return this.sid;
476     }
477
478     private void processFeaturesReply(OFFeaturesReply reply) {
479         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
480             this.sid = reply.getDatapathId();
481             this.buffers = reply.getBuffers();
482             this.capabilities = reply.getCapabilities();
483             this.tables = reply.getTables();
484             this.actions = reply.getActions();
485             // notify core of this error event
486             for (OFPhysicalPort port : reply.getPorts()) {
487                 updatePhysicalPort(port);
488             }
489             // config the switch to send full data packet
490             OFSetConfig config = (OFSetConfig) factory
491                     .getMessage(OFType.SET_CONFIG);
492             config.setMissSendLength((short) 0xffff).setLengthU(
493                     OFSetConfig.MINIMUM_LENGTH);
494             asyncFastSend(config);
495             // send config request to make sure the switch can handle the set config
496             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
497             asyncFastSend(getConfig);
498             this.state = SwitchState.WAIT_CONFIG_REPLY;
499             // inform core that a new switch is now operational
500             reportSwitchStateChange(true);
501         }
502     }
503
504     private void updatePhysicalPort(OFPhysicalPort port) {
505         Short portNumber = port.getPortNumber();
506         physicalPorts.put(portNumber, port);
507         portBandwidth
508                 .put(
509                         portNumber,
510                         port.getCurrentFeatures()
511                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
512                                         | OFPortFeatures.OFPPF_10MB_HD
513                                                 .getValue()
514                                         | OFPortFeatures.OFPPF_100MB_FD
515                                                 .getValue()
516                                         | OFPortFeatures.OFPPF_100MB_HD
517                                                 .getValue()
518                                         | OFPortFeatures.OFPPF_1GB_FD
519                                                 .getValue()
520                                         | OFPortFeatures.OFPPF_1GB_HD
521                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
522                                         .getValue()));
523     }
524
525     private void deletePhysicalPort(OFPhysicalPort port) {
526         Short portNumber = port.getPortNumber();
527         physicalPorts.remove(portNumber);
528         portBandwidth.remove(portNumber);
529     }
530
531     @Override
532     public boolean isOperational() {
533         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
534     }
535
536     @Override
537     public String toString() {
538         return ("["
539                 + this.socket.toString()
540                 + " SWID "
541                 + (isOperational() ? HexString.toHexString(this.sid)
542                         : "unkbown") + "]");
543     }
544
545     @Override
546     public Date getConnectedDate() {
547         return this.connectedDate;
548     }
549
550     public String getInstanceName() {
551         return instanceName;
552     }
553
554     @Override
555     public Object getStatistics(OFStatisticsRequest req) {
556         int xid = getNextXid();
557         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
558         messageWaitingDone.put(xid, worker);
559         Future<Object> submit = executor.submit(worker);
560         Object result = null;
561         try {
562             result = submit
563                     .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
564             return result;
565         } catch (Exception e) {
566             logger.warn("Timeout while waiting for {} replies", req.getType());
567             result = null; // to indicate timeout has occurred
568             return result;
569         }
570     }
571
572     @Override
573     public Object syncSend(OFMessage msg) {
574         Integer xid = getNextXid();
575         SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
576         messageWaitingDone.put(xid, worker);
577         Object result = null;
578         Boolean status = false;
579         Future<Object> submit = executor.submit(worker);
580         try {
581             result = submit
582                     .get(responseTimerValue, TimeUnit.MILLISECONDS);
583             messageWaitingDone.remove(xid);
584             if (result == null) {
585                 // if result  is null, then it means the switch can handle this message successfully
586                 // convert the result into a Boolean with value true
587                 status = true;
588                 //logger.debug("Successfully send " + msg.getType().toString());
589                 result = status;
590             } else {
591                 // if result  is not null, this means the switch can't handle this message
592                 // the result if OFError already
593                 logger.debug("Send {} failed --> {}", 
594                                 msg.getType().toString(), ((OFError) result).toString());
595             }
596             return result;
597         } catch (Exception e) {
598             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
599             // convert the result into a Boolean with value false
600             status = false;
601             result = status;
602             return result;
603         }
604     }
605
606     /*
607      * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
608      * wake up associated task so that it can continue
609      */
610     private void processBarrierReply(OFBarrierReply msg) {
611         Integer xid = msg.getXid();
612         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
613                 .remove(xid);
614         if (worker == null) {
615             return;
616         }
617         worker.wakeup();
618     }
619
620     private void processErrorReply(OFError errorMsg) {
621         OFMessage offendingMsg = errorMsg.getOffendingMsg();
622         Integer xid;
623         if (offendingMsg != null) {
624             xid = offendingMsg.getXid();
625         } else {
626             xid = errorMsg.getXid();
627         }
628         /*
629          * the error can be a reply to a synchronous message or to a statistic request message
630          */
631         Callable<?> worker = messageWaitingDone.remove(xid);
632         if (worker == null) {
633             return;
634         }
635         if (worker instanceof SynchronousMessage) {
636             ((SynchronousMessage) worker).wakeup(errorMsg);
637         } else {
638             ((StatisticsCollector) worker).wakeup(errorMsg);
639         }
640     }
641
642     private void processStatsReply(OFStatisticsReply reply) {
643         Integer xid = reply.getXid();
644         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
645                 .get(xid);
646         if (worker == null) {
647             return;
648         }
649         if (worker.collect(reply)) {
650             // if all the stats records are received (collect() returns true)
651             // then we are done.
652             messageWaitingDone.remove(xid);
653             worker.wakeup();
654         }
655     }
656     
657     @Override
658     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
659         return this.physicalPorts;
660     }
661
662     @Override
663     public OFPhysicalPort getPhysicalPort(Short portNumber) {
664         return this.physicalPorts.get(portNumber);
665     }
666
667     @Override
668     public Integer getPortBandwidth(Short portNumber) {
669         return this.portBandwidth.get(portNumber);
670     }
671
672     @Override
673     public Set<Short> getPorts() {
674         return this.physicalPorts.keySet();
675     }
676
677     @Override
678     public Byte getTables() {
679         return this.tables;
680     }
681     
682     @Override
683     public Integer getActions() {
684         return this.actions;
685     }
686     
687     @Override
688     public Integer getCapabilities() {
689         return this.capabilities;
690     }
691
692     @Override
693     public Integer getBuffers() {
694         return this.buffers;
695     }
696
697     @Override
698     public boolean isPortEnabled(short portNumber) {
699         return isPortEnabled(physicalPorts.get(portNumber));
700     }
701
702     @Override
703     public boolean isPortEnabled(OFPhysicalPort port) {
704         if (port == null) {
705             return false;
706         }
707         int portConfig = port.getConfig();
708         int portState = port.getState();
709         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
710             return false;
711         }
712         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
713             return false;
714         }
715         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
716                 .getValue()) {
717             return false;
718         }
719         return true;
720     }
721
722     @Override
723     public List<OFPhysicalPort> getEnabledPorts() {
724         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
725         synchronized (this.physicalPorts) {
726             for (OFPhysicalPort port : physicalPorts.values()) {
727                 if (isPortEnabled(port)) {
728                     result.add(port);
729                 }
730             }
731         }
732         return result;
733     }
734
735         /*
736          * Transmit thread polls the message out of the priority queue and invokes
737          * messaging service to transmit it over the socket channel
738          */
739     class PriorityMessageTransmit implements Runnable {
740         public void run() {
741             running = true;
742             while (running) {
743                 try {
744                         if (!transmitQ.isEmpty()) {
745                                 PriorityMessage pmsg = transmitQ.poll();
746                                 msgReadWriteService.asyncSend(pmsg.msg);
747                                 logger.trace("Message sent: {}", pmsg.toString());
748                         }
749                         Thread.sleep(10);
750                 } catch (InterruptedException ie) {
751                         reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
752                 } catch (Exception e) {
753                         reportError(e);
754                 }
755             }
756                 transmitQ = null;
757         }
758     }
759
760     /*
761      * Setup and start the transmit thread
762      */
763     private void startTransmitThread() {        
764         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, 
765                                 new Comparator<PriorityMessage>() {
766                                         public int compare(PriorityMessage p1, PriorityMessage p2) {
767                                                 return p2.priority - p1.priority;
768                                         }
769                                 });
770         this.transmitThread = new Thread(new PriorityMessageTransmit());
771         this.transmitThread.start();
772     }
773     
774     /*
775      * Setup communication services
776      */
777     private void setupCommChannel() throws Exception {
778         this.selector = SelectorProvider.provider().openSelector();
779         this.socket.configureBlocking(false);
780         this.socket.socket().setTcpNoDelay(true);        
781         this.msgReadWriteService = getMessageReadWriteService();
782     }
783
784     private void sendFirstHello() {
785         try {
786                 OFMessage msg = factory.getMessage(OFType.HELLO);
787                 asyncFastSend(msg);
788         } catch (Exception e) {
789                 reportError(e);
790         }
791     }
792     
793     private IMessageReadWrite getMessageReadWriteService() throws Exception {
794         String str = System.getProperty("secureChannelEnabled");
795         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? 
796                         new SecureMessageReadWriteService(socket, selector) : 
797                         new MessageReadWriteService(socket, selector);
798     }
799 }