b87b785e3c1aa8f0d855c12208bba8088a421cf1
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11
12 import java.nio.channels.AsynchronousCloseException;
13 import java.nio.channels.SelectionKey;
14 import java.nio.channels.Selector;
15 import java.nio.channels.SocketChannel;
16 import java.nio.channels.spi.SelectorProvider;
17 import java.util.ArrayList;
18 import java.util.Comparator;
19 import java.util.Date;
20 import java.util.HashMap;
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Set;
25 import java.util.Timer;
26 import java.util.TimerTask;
27 import java.util.concurrent.Callable;
28 import java.util.concurrent.ConcurrentHashMap;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.PriorityBlockingQueue;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicInteger;
35
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
39 import org.openflow.protocol.OFBarrierReply;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFError;
42 import org.openflow.protocol.OFFeaturesReply;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFGetConfigReply;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFMessage;
47 import org.openflow.protocol.OFPhysicalPort;
48 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
49 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
50 import org.openflow.protocol.OFPhysicalPort.OFPortState;
51 import org.openflow.protocol.OFPort;
52 import org.openflow.protocol.OFPortStatus;
53 import org.openflow.protocol.OFPortStatus.OFPortReason;
54 import org.openflow.protocol.OFSetConfig;
55 import org.openflow.protocol.OFStatisticsReply;
56 import org.openflow.protocol.OFStatisticsRequest;
57 import org.openflow.protocol.OFType;
58 import org.openflow.protocol.factory.BasicFactory;
59 import org.openflow.util.HexString;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class SwitchHandler implements ISwitch {
64     private static final Logger logger = LoggerFactory
65             .getLogger(SwitchHandler.class);
66     private static final int SWITCH_LIVENESS_TIMER = 5000;
67     private static final int SWITCH_LIVENESS_TIMEOUT = 2 * SWITCH_LIVENESS_TIMER + 500;
68     private int MESSAGE_RESPONSE_TIMER = 2000;
69
70     private String instanceName;
71     private ISwitch thisISwitch;
72     private IController core;
73     private Long sid;
74     private Integer buffers;
75     private Integer capabilities;
76     private Byte tables;
77     private Integer actions;
78     private Selector selector;
79     private SocketChannel socket;
80     private BasicFactory factory;
81     private AtomicInteger xid;
82     private SwitchState state;
83     private Timer periodicTimer;
84     private Map<Short, OFPhysicalPort> physicalPorts;
85     private Map<Short, Integer> portBandwidth;
86     private Date connectedDate;
87     private Long lastMsgReceivedTimeStamp;
88     private Boolean probeSent;
89     private ExecutorService executor;
90     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
91     private boolean running;
92     private IMessageReadWrite msgReadWriteService;
93     private Thread switchHandlerThread;
94     private Integer responseTimerValue;
95         private PriorityBlockingQueue<PriorityMessage> transmitQ;
96     private Thread transmitThread;
97     
98     private enum SwitchState {
99         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
100                 3);
101
102         private int value;
103
104         private SwitchState(int value) {
105             this.value = value;
106         }
107
108         @SuppressWarnings("unused")
109         public int value() {
110             return this.value;
111         }
112     }
113
114     public SwitchHandler(Controller core, SocketChannel sc, String name) {
115         this.instanceName = name;
116         this.thisISwitch = this;
117         this.sid = (long) 0;
118         this.buffers = (int)0;
119         this.capabilities = (int)0;
120         this.tables = (byte)0;
121         this.actions = (int)0;
122         this.core = core;
123         this.socket = sc;
124         this.factory = new BasicFactory();
125         this.connectedDate = new Date();
126         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
127         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
128         this.portBandwidth = new HashMap<Short, Integer>();
129         this.state = SwitchState.NON_OPERATIONAL;
130         this.probeSent = false;
131         this.xid = new AtomicInteger(this.socket.hashCode());
132         this.periodicTimer = null;
133         this.executor = Executors.newFixedThreadPool(4);
134         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
135         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
136         String rTimer = System.getProperty("of.messageResponseTimer");
137         if (rTimer != null) {
138                 try {
139                         responseTimerValue = Integer.decode(rTimer);
140                 } catch (NumberFormatException e) {
141                                 logger.warn("Invalid of.messageResponseTimer: {} use default({})",
142                                                 rTimer, MESSAGE_RESPONSE_TIMER);
143                 }
144         }
145         }
146
147     public void start() {
148         try {
149                 startTransmitThread();
150                 setupCommChannel();
151                 sendFirstHello();
152             startHandlerThread();
153         } catch (Exception e) {
154             reportError(e);
155         }
156     }
157
158     private void startHandlerThread() {
159         switchHandlerThread = new Thread(new Runnable() {
160             @Override
161             public void run() {
162                 running = true;
163                 while (running) {
164                     try {
165                         // wait for an incoming connection
166                         selector.select(0);
167                         Iterator<SelectionKey> selectedKeys = selector
168                                 .selectedKeys().iterator();
169                         while (selectedKeys.hasNext()) {
170                             SelectionKey skey = selectedKeys.next();
171                             selectedKeys.remove();
172                             if (skey.isValid() && skey.isWritable()) {
173                                 resumeSend();
174                             }
175                             if (skey.isValid() && skey.isReadable()) {
176                                 handleMessages();
177                             }
178                         }
179                     } catch (Exception e) {
180                         reportError(e);
181                     }
182                 }
183             }
184         }, instanceName);
185         switchHandlerThread.start();
186     }
187
188     public void stop() {
189         try {
190             running = false;
191             selector.wakeup();
192             cancelSwitchTimer();
193             this.selector.close();
194             this.socket.close();
195             executor.shutdown();
196         } catch (Exception e) {
197                 // do nothing since we are shutting down.
198                 return;
199         }
200     }
201
202     @Override
203     public int getNextXid() {
204         return this.xid.incrementAndGet();
205     }
206
207         /**
208          * This method puts the message in an outgoing priority queue with normal
209          * priority. It will be served after high priority messages. The method
210          * should be used for non-critical messages such as statistics request,
211          * discovery packets, etc. An unique XID is generated automatically and
212          * inserted into the message.
213          * 
214          * @param msg The OF message to be sent
215          * @return The XID used
216          */
217     @Override
218     public Integer asyncSend(OFMessage msg) {
219         return asyncSend(msg, getNextXid());
220     }
221
222         /**
223          * This method puts the message in an outgoing priority queue with normal
224          * priority. It will be served after high priority messages. The method
225          * should be used for non-critical messages such as statistics request,
226          * discovery packets, etc. The specified XID is inserted into the message.
227          * 
228          * @param msg The OF message to be Sent
229          * @param xid The XID to be used in the message
230          * @return The XID used
231          */
232     @Override
233     public Integer asyncSend(OFMessage msg, int xid) {
234         msg.setXid(xid);
235         transmitQ.add(new PriorityMessage(msg, 0));
236         return xid;
237     }
238
239         /**
240          * This method puts the message in an outgoing priority queue with high
241          * priority. It will be served first before normal priority messages. The
242          * method should be used for critical messages such as hello, echo reply
243          * etc. An unique XID is generated automatically and inserted into the
244          * message.
245          * 
246          * @param msg The OF message to be sent
247          * @return The XID used
248          */
249     @Override
250     public Integer asyncFastSend(OFMessage msg) {
251         return asyncFastSend(msg, getNextXid());
252     }
253
254         /**
255          * This method puts the message in an outgoing priority queue with high
256          * priority. It will be served first before normal priority messages. The
257          * method should be used for critical messages such as hello, echo reply
258          * etc. The specified XID is inserted into the message.
259          * 
260          * @param msg The OF message to be sent
261          * @return The XID used
262          */
263     @Override
264     public Integer asyncFastSend(OFMessage msg, int xid) {
265         msg.setXid(xid);
266         transmitQ.add(new PriorityMessage(msg, 1));
267         return xid;
268     }
269
270    public void resumeSend() {
271         try {
272                         msgReadWriteService.resumeSend();
273                 } catch (Exception e) {
274                         reportError(e);
275                 }
276     }
277
278     public void handleMessages() {
279         List<OFMessage> msgs = null;
280         
281         try {
282                 msgs = msgReadWriteService.readMessages();
283                 } catch (Exception e) {
284                         reportError(e);
285                 }
286                 
287         if (msgs == null) {
288             logger.debug("{} is down", toString());
289             // the connection is down, inform core
290             reportSwitchStateChange(false);
291             return;
292         }
293         for (OFMessage msg : msgs) {
294             logger.trace("Message received: {}", msg.toString());
295             /*
296             if  ((msg.getType() != OFType.ECHO_REQUEST) &&
297                         (msg.getType() != OFType.ECHO_REPLY)) {
298                 logger.debug(msg.getType().toString() + " received from sw " + toString());
299             }
300              */
301             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
302             OFType type = msg.getType();
303             switch (type) {
304             case HELLO:
305                 // send feature request
306                 OFMessage featureRequest = factory
307                         .getMessage(OFType.FEATURES_REQUEST);
308                 asyncFastSend(featureRequest);
309                 // delete all pre-existing flows
310                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
311                 OFFlowMod flowMod = (OFFlowMod) factory
312                         .getMessage(OFType.FLOW_MOD);
313                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
314                         .setOutPort(OFPort.OFPP_NONE).setLength(
315                                 (short) OFFlowMod.MINIMUM_LENGTH);
316                 asyncFastSend(flowMod);
317                 this.state = SwitchState.WAIT_FEATURES_REPLY;
318                 startSwitchTimer();
319                 break;
320             case ECHO_REQUEST:
321                 OFEchoReply echoReply = (OFEchoReply) factory
322                         .getMessage(OFType.ECHO_REPLY);
323                 asyncFastSend(echoReply);
324                 break;
325             case ECHO_REPLY:
326                 this.probeSent = false;
327                 break;
328             case FEATURES_REPLY:
329                 processFeaturesReply((OFFeaturesReply) msg);
330                 break;
331             case GET_CONFIG_REPLY:
332                 // make sure that the switch can send the whole packet to the controller
333                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
334                     this.state = SwitchState.OPERATIONAL;
335                 }
336                 break;
337             case BARRIER_REPLY:
338                 processBarrierReply((OFBarrierReply) msg);
339                 break;
340             case ERROR:
341                 processErrorReply((OFError) msg);
342                 break;
343             case PORT_STATUS:
344                 processPortStatusMsg((OFPortStatus) msg);
345                 break;
346             case STATS_REPLY:
347                 processStatsReply((OFStatisticsReply) msg);
348                 break;
349             case PACKET_IN:
350                 break;
351             default:
352                 break;
353             } // end of switch
354             if (isOperational()) {
355                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
356             }
357         } // end of for
358     }
359
360     private void processPortStatusMsg(OFPortStatus msg) {
361         //short portNumber = msg.getDesc().getPortNumber();
362         OFPhysicalPort port = msg.getDesc();
363         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
364             updatePhysicalPort(port);
365             //logger.debug("Port " + portNumber + " on " + toString() + " modified");
366         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
367             updatePhysicalPort(port);
368             //logger.debug("Port " + portNumber + " on " + toString() + " added");
369         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
370                 .ordinal()) {
371             deletePhysicalPort(port);
372             //logger.debug("Port " + portNumber + " on " + toString() + " deleted");
373         }
374
375     }
376
377     private void startSwitchTimer() {
378         this.periodicTimer = new Timer();
379         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
380             @Override
381             public void run() {
382                 try {
383                     Long now = System.currentTimeMillis();
384                     if ((now - lastMsgReceivedTimeStamp) > SWITCH_LIVENESS_TIMEOUT) {
385                         if (probeSent) {
386                             // switch failed to respond to our probe, consider it down
387                             logger.warn("{} is idle for too long, disconnect", toString());
388                             reportSwitchStateChange(false);
389                         } else {
390                             // send a probe to see if the switch is still alive
391                             //logger.debug("Send idle probe (Echo Request) to " + switchName());
392                             probeSent = true;
393                             OFMessage echo = factory
394                                     .getMessage(OFType.ECHO_REQUEST);
395                             asyncFastSend(echo);
396                         }
397                     } else {
398                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
399                             // send another features request
400                             OFMessage request = factory
401                                     .getMessage(OFType.FEATURES_REQUEST);
402                             asyncFastSend(request);
403                         } else {
404                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
405                                 //  send another config request
406                                 OFSetConfig config = (OFSetConfig) factory
407                                         .getMessage(OFType.SET_CONFIG);
408                                 config.setMissSendLength((short) 0xffff)
409                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
410                                 asyncFastSend(config);
411                                 OFMessage getConfig = factory
412                                         .getMessage(OFType.GET_CONFIG_REQUEST);
413                                 asyncFastSend(getConfig);
414                             }
415                         }
416                     }
417                 } catch (Exception e) {
418                     reportError(e);
419                 }
420             }
421         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
422     }
423
424     private void cancelSwitchTimer() {
425         if (this.periodicTimer != null) {
426             this.periodicTimer.cancel();
427         }
428     }
429
430     private void reportError(Exception e) {
431         if (e instanceof AsynchronousCloseException) {
432                 logger.debug("Caught exception {}", e.getMessage());
433         } else {
434                 logger.warn("Caught exception {}", e.getMessage());
435         }
436         // notify core of this error event and disconnect the switch
437         ((Controller) core).takeSwitchEventError(this);
438     }
439
440     private void reportSwitchStateChange(boolean added) {
441         if (added) {
442             ((Controller) core).takeSwtichEventAdd(this);
443         } else {
444             ((Controller) core).takeSwitchEventDelete(this);
445         }
446     }
447
448     @Override
449     public Long getId() {
450         return this.sid;
451     }
452
453     private void processFeaturesReply(OFFeaturesReply reply) {
454         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
455             this.sid = reply.getDatapathId();
456             this.buffers = reply.getBuffers();
457             this.capabilities = reply.getCapabilities();
458             this.tables = reply.getTables();
459             this.actions = reply.getActions();
460             // notify core of this error event
461             for (OFPhysicalPort port : reply.getPorts()) {
462                 updatePhysicalPort(port);
463             }
464             // config the switch to send full data packet
465             OFSetConfig config = (OFSetConfig) factory
466                     .getMessage(OFType.SET_CONFIG);
467             config.setMissSendLength((short) 0xffff).setLengthU(
468                     OFSetConfig.MINIMUM_LENGTH);
469             asyncFastSend(config);
470             // send config request to make sure the switch can handle the set config
471             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
472             asyncFastSend(getConfig);
473             this.state = SwitchState.WAIT_CONFIG_REPLY;
474             // inform core that a new switch is now operational
475             reportSwitchStateChange(true);
476         }
477     }
478
479     private void updatePhysicalPort(OFPhysicalPort port) {
480         Short portNumber = port.getPortNumber();
481         physicalPorts.put(portNumber, port);
482         portBandwidth
483                 .put(
484                         portNumber,
485                         port.getCurrentFeatures()
486                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
487                                         | OFPortFeatures.OFPPF_10MB_HD
488                                                 .getValue()
489                                         | OFPortFeatures.OFPPF_100MB_FD
490                                                 .getValue()
491                                         | OFPortFeatures.OFPPF_100MB_HD
492                                                 .getValue()
493                                         | OFPortFeatures.OFPPF_1GB_FD
494                                                 .getValue()
495                                         | OFPortFeatures.OFPPF_1GB_HD
496                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
497                                         .getValue()));
498     }
499
500     private void deletePhysicalPort(OFPhysicalPort port) {
501         Short portNumber = port.getPortNumber();
502         physicalPorts.remove(portNumber);
503         portBandwidth.remove(portNumber);
504     }
505
506     @Override
507     public boolean isOperational() {
508         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
509     }
510
511     @Override
512     public String toString() {
513         return ("["
514                 + this.socket.toString()
515                 + " SWID "
516                 + (isOperational() ? HexString.toHexString(this.sid)
517                         : "unkbown") + "]");
518     }
519
520     @Override
521     public Date getConnectedDate() {
522         return this.connectedDate;
523     }
524
525     public String getInstanceName() {
526         return instanceName;
527     }
528
529     @Override
530     public Object getStatistics(OFStatisticsRequest req) {
531         int xid = getNextXid();
532         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
533         messageWaitingDone.put(xid, worker);
534         Future<Object> submit = executor.submit(worker);
535         Object result = null;
536         try {
537             result = submit
538                     .get(MESSAGE_RESPONSE_TIMER, TimeUnit.MILLISECONDS);
539             return result;
540         } catch (Exception e) {
541             logger.warn("Timeout while waiting for {} replies", req.getType());
542             result = null; // to indicate timeout has occurred
543             return result;
544         }
545     }
546
547     @Override
548     public Object syncSend(OFMessage msg) {
549         Integer xid = getNextXid();
550         SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
551         messageWaitingDone.put(xid, worker);
552         Object result = null;
553         Boolean status = false;
554         Future<Object> submit = executor.submit(worker);
555         try {
556             result = submit
557                     .get(responseTimerValue, TimeUnit.MILLISECONDS);
558             messageWaitingDone.remove(xid);
559             if (result == null) {
560                 // if result  is null, then it means the switch can handle this message successfully
561                 // convert the result into a Boolean with value true
562                 status = true;
563                 //logger.debug("Successfully send " + msg.getType().toString());
564                 result = status;
565             } else {
566                 // if result  is not null, this means the switch can't handle this message
567                 // the result if OFError already
568                 logger.debug("Send {} failed --> {}", 
569                                 msg.getType().toString(), ((OFError) result).toString());
570             }
571             return result;
572         } catch (Exception e) {
573             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
574             // convert the result into a Boolean with value false
575             status = false;
576             result = status;
577             return result;
578         }
579     }
580
581     /*
582      * Either a BarrierReply or a OFError is received. If this is a reply for an outstanding sync message,
583      * wake up associated task so that it can continue
584      */
585     private void processBarrierReply(OFBarrierReply msg) {
586         Integer xid = msg.getXid();
587         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
588                 .remove(xid);
589         if (worker == null) {
590             return;
591         }
592         worker.wakeup();
593     }
594
595     private void processErrorReply(OFError errorMsg) {
596         OFMessage offendingMsg = errorMsg.getOffendingMsg();
597         Integer xid;
598         if (offendingMsg != null) {
599             xid = offendingMsg.getXid();
600         } else {
601             xid = errorMsg.getXid();
602         }
603         /*
604          * the error can be a reply to a synchronous message or to a statistic request message
605          */
606         Callable<?> worker = messageWaitingDone.remove(xid);
607         if (worker == null) {
608             return;
609         }
610         if (worker instanceof SynchronousMessage) {
611             ((SynchronousMessage) worker).wakeup(errorMsg);
612         } else {
613             ((StatisticsCollector) worker).wakeup(errorMsg);
614         }
615     }
616
617     private void processStatsReply(OFStatisticsReply reply) {
618         Integer xid = reply.getXid();
619         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
620                 .get(xid);
621         if (worker == null) {
622             return;
623         }
624         if (worker.collect(reply)) {
625             // if all the stats records are received (collect() returns true)
626             // then we are done.
627             messageWaitingDone.remove(xid);
628             worker.wakeup();
629         }
630     }
631     
632     @Override
633     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
634         return this.physicalPorts;
635     }
636
637     @Override
638     public OFPhysicalPort getPhysicalPort(Short portNumber) {
639         return this.physicalPorts.get(portNumber);
640     }
641
642     @Override
643     public Integer getPortBandwidth(Short portNumber) {
644         return this.portBandwidth.get(portNumber);
645     }
646
647     @Override
648     public Set<Short> getPorts() {
649         return this.physicalPorts.keySet();
650     }
651
652     @Override
653     public Byte getTables() {
654         return this.tables;
655     }
656     
657     @Override
658     public Integer getActions() {
659         return this.actions;
660     }
661     
662     @Override
663     public Integer getCapabilities() {
664         return this.capabilities;
665     }
666
667     @Override
668     public Integer getBuffers() {
669         return this.buffers;
670     }
671
672     @Override
673     public boolean isPortEnabled(short portNumber) {
674         return isPortEnabled(physicalPorts.get(portNumber));
675     }
676
677     @Override
678     public boolean isPortEnabled(OFPhysicalPort port) {
679         if (port == null) {
680             return false;
681         }
682         int portConfig = port.getConfig();
683         int portState = port.getState();
684         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
685             return false;
686         }
687         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
688             return false;
689         }
690         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
691                 .getValue()) {
692             return false;
693         }
694         return true;
695     }
696
697     @Override
698     public List<OFPhysicalPort> getEnabledPorts() {
699         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
700         synchronized (this.physicalPorts) {
701             for (OFPhysicalPort port : physicalPorts.values()) {
702                 if (isPortEnabled(port)) {
703                     result.add(port);
704                 }
705             }
706         }
707         return result;
708     }
709
710         /*
711          * Transmit thread polls the message out of the priority queue and invokes
712          * messaging service to transmit it over the socket channel
713          */
714     class PriorityMessageTransmit implements Runnable {
715         public void run() {
716             while (true) {
717                 try {
718                         if (!transmitQ.isEmpty()) {
719                                 PriorityMessage pmsg = transmitQ.poll();
720                                 msgReadWriteService.asyncSend(pmsg.msg);
721                                 logger.trace("Message sent: {}", pmsg.toString());
722                         }
723                         Thread.sleep(10);
724                 } catch (Exception e) {
725                         reportError(e);
726                 }
727             }
728         }
729     }
730
731     /*
732      * Setup and start the transmit thread
733      */
734     private void startTransmitThread() {        
735         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, 
736                                 new Comparator<PriorityMessage>() {
737                                         public int compare(PriorityMessage p1, PriorityMessage p2) {
738                                                 return p2.priority - p1.priority;
739                                         }
740                                 });
741         this.transmitThread = new Thread(new PriorityMessageTransmit());
742         this.transmitThread.start();
743     }
744     
745     /*
746      * Setup communication services
747      */
748     private void setupCommChannel() throws Exception {
749         this.selector = SelectorProvider.provider().openSelector();
750         this.socket.configureBlocking(false);
751         this.socket.socket().setTcpNoDelay(true);        
752         this.msgReadWriteService = getMessageReadWriteService();
753     }
754
755     private void sendFirstHello() {
756         try {
757                 OFMessage msg = factory.getMessage(OFType.HELLO);
758                 asyncFastSend(msg);
759         } catch (Exception e) {
760                 reportError(e);
761         }
762     }
763     
764     private IMessageReadWrite getMessageReadWriteService() throws Exception {
765         String str = System.getProperty("secureChannelEnabled");
766         return ((str != null) && (str.equalsIgnoreCase("true"))) ? 
767                         new SecureMessageReadWriteService(socket, selector) : 
768                         new MessageReadWriteService(socket, selector);
769     }
770 }