Merge "Added support for annotations in generated APIs."
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.nio.channels.spi.SelectorProvider;
18 import java.util.ArrayList;
19 import java.util.Comparator;
20 import java.util.Date;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.PriorityBlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.openflow.protocol.OFBarrierReply;
41 import org.openflow.protocol.OFBarrierRequest;
42 import org.openflow.protocol.OFEchoReply;
43 import org.openflow.protocol.OFError;
44 import org.openflow.protocol.OFFeaturesReply;
45 import org.openflow.protocol.OFFlowMod;
46 import org.openflow.protocol.OFGetConfigReply;
47 import org.openflow.protocol.OFMatch;
48 import org.openflow.protocol.OFMessage;
49 import org.openflow.protocol.OFPhysicalPort;
50 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
51 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
52 import org.openflow.protocol.OFPhysicalPort.OFPortState;
53 import org.openflow.protocol.OFPort;
54 import org.openflow.protocol.OFPortStatus;
55 import org.openflow.protocol.OFPortStatus.OFPortReason;
56 import org.openflow.protocol.OFSetConfig;
57 import org.openflow.protocol.OFStatisticsReply;
58 import org.openflow.protocol.OFStatisticsRequest;
59 import org.openflow.protocol.OFType;
60 import org.openflow.protocol.factory.BasicFactory;
61 import org.openflow.util.HexString;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 public class SwitchHandler implements ISwitch {
66     private static final Logger logger = LoggerFactory
67             .getLogger(SwitchHandler.class);
68     private static final int SWITCH_LIVENESS_TIMER = 5000;
69     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
70     private int MESSAGE_RESPONSE_TIMER = 2000;
71
72     private String instanceName;
73     private ISwitch thisISwitch;
74     private IController core;
75     private Long sid;
76     private Integer buffers;
77     private Integer capabilities;
78     private Byte tables;
79     private Integer actions;
80     private Selector selector;
81     private SocketChannel socket;
82     private BasicFactory factory;
83     private AtomicInteger xid;
84     private SwitchState state;
85     private Timer periodicTimer;
86     private Map<Short, OFPhysicalPort> physicalPorts;
87     private Map<Short, Integer> portBandwidth;
88     private Date connectedDate;
89     private Long lastMsgReceivedTimeStamp;
90     private Boolean probeSent;
91     private ExecutorService executor;
92     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
93     private boolean running;
94     private IMessageReadWrite msgReadWriteService;
95     private Thread switchHandlerThread;
96     private Integer responseTimerValue;
97     private PriorityBlockingQueue<PriorityMessage> transmitQ;
98     private Thread transmitThread;
99
100     private enum SwitchState {
101         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
102                 3);
103
104         private int value;
105
106         private SwitchState(int value) {
107             this.value = value;
108         }
109
110         @SuppressWarnings("unused")
111         public int value() {
112             return this.value;
113         }
114     }
115
116     public SwitchHandler(Controller core, SocketChannel sc, String name) {
117         this.instanceName = name;
118         this.thisISwitch = this;
119         this.sid = (long) 0;
120         this.buffers = (int) 0;
121         this.capabilities = (int) 0;
122         this.tables = (byte) 0;
123         this.actions = (int) 0;
124         this.core = core;
125         this.socket = sc;
126         this.factory = new BasicFactory();
127         this.connectedDate = new Date();
128         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
129         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
130         this.portBandwidth = new HashMap<Short, Integer>();
131         this.state = SwitchState.NON_OPERATIONAL;
132         this.probeSent = false;
133         this.xid = new AtomicInteger(this.socket.hashCode());
134         this.periodicTimer = null;
135         this.executor = Executors.newFixedThreadPool(4);
136         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
137         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
138         String rTimer = System.getProperty("of.messageResponseTimer");
139         if (rTimer != null) {
140             try {
141                 responseTimerValue = Integer.decode(rTimer);
142             } catch (NumberFormatException e) {
143                 logger.warn(
144                         "Invalid of.messageResponseTimer: {} use default({})",
145                         rTimer, MESSAGE_RESPONSE_TIMER);
146             }
147         }
148     }
149
150     public void start() {
151         try {
152             startTransmitThread();
153             setupCommChannel();
154             sendFirstHello();
155             startHandlerThread();
156         } catch (Exception e) {
157             reportError(e);
158         }
159     }
160
161     private void startHandlerThread() {
162         switchHandlerThread = new Thread(new Runnable() {
163             @Override
164             public void run() {
165                 running = true;
166                 while (running) {
167                     try {
168                         // wait for an incoming connection
169                         selector.select(0);
170                         Iterator<SelectionKey> selectedKeys = selector
171                                 .selectedKeys().iterator();
172                         while (selectedKeys.hasNext()) {
173                             SelectionKey skey = selectedKeys.next();
174                             selectedKeys.remove();
175                             if (skey.isValid() && skey.isWritable()) {
176                                 resumeSend();
177                             }
178                             if (skey.isValid() && skey.isReadable()) {
179                                 handleMessages();
180                             }
181                         }
182                     } catch (Exception e) {
183                         reportError(e);
184                     }
185                 }
186             }
187         }, instanceName);
188         switchHandlerThread.start();
189     }
190
191     public void stop() {
192         running = false;
193         cancelSwitchTimer();
194         try {
195             selector.wakeup();
196             selector.close();
197         } catch (Exception e) {
198         }
199         try {
200             socket.close();
201         } catch (Exception e) {
202         }
203         try {
204             msgReadWriteService.stop();
205         } catch (Exception e) {
206         }
207         executor.shutdown();
208
209         selector = null;
210         msgReadWriteService = null;
211
212         if (switchHandlerThread != null) {
213             switchHandlerThread.interrupt();
214         }
215         if (transmitThread != null) {
216             transmitThread.interrupt();
217         }
218     }
219
220     @Override
221     public int getNextXid() {
222         return this.xid.incrementAndGet();
223     }
224
225     /**
226      * This method puts the message in an outgoing priority queue with normal
227      * priority. It will be served after high priority messages. The method
228      * should be used for non-critical messages such as statistics request,
229      * discovery packets, etc. An unique XID is generated automatically and
230      * inserted into the message.
231      * 
232      * @param msg
233      *            The OF message to be sent
234      * @return The XID used
235      */
236     @Override
237     public Integer asyncSend(OFMessage msg) {
238         return asyncSend(msg, getNextXid());
239     }
240
241     private Object syncSend(OFMessage msg, int xid) {
242         SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
243         messageWaitingDone.put(xid, worker);
244         Object result = null;
245         Boolean status = false;
246         Future<Object> submit = executor.submit(worker);
247         try {
248             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
249             messageWaitingDone.remove(xid);
250             if (result == null) {
251                 // if result is null, then it means the switch can handle this
252                 // message successfully
253                 // convert the result into a Boolean with value true
254                 status = true;
255                 // logger.debug("Successfully send " +
256                 // msg.getType().toString());
257                 result = status;
258             } else {
259                 // if result is not null, this means the switch can't handle
260                 // this message
261                 // the result if OFError already
262                 logger.debug("Send {} failed --> {}", msg.getType().toString(),
263                         ((OFError) result).toString());
264             }
265             return result;
266         } catch (Exception e) {
267             logger.warn("Timeout while waiting for {} reply", msg.getType()
268                     .toString());
269             // convert the result into a Boolean with value false
270             status = false;
271             result = status;
272             return result;
273         }
274     }
275
276     /**
277      * This method puts the message in an outgoing priority queue with normal
278      * priority. It will be served after high priority messages. The method
279      * should be used for non-critical messages such as statistics request,
280      * discovery packets, etc. The specified XID is inserted into the message.
281      * 
282      * @param msg
283      *            The OF message to be Sent
284      * @param xid
285      *            The XID to be used in the message
286      * @return The XID used
287      */
288     @Override
289     public Integer asyncSend(OFMessage msg, int xid) {
290         msg.setXid(xid);
291         if (transmitQ != null) {
292             transmitQ.add(new PriorityMessage(msg, 0));
293         }
294         return xid;
295     }
296
297     /**
298      * This method puts the message in an outgoing priority queue with high
299      * priority. It will be served first before normal priority messages. The
300      * method should be used for critical messages such as hello, echo reply
301      * etc. An unique XID is generated automatically and inserted into the
302      * message.
303      * 
304      * @param msg
305      *            The OF message to be sent
306      * @return The XID used
307      */
308     @Override
309     public Integer asyncFastSend(OFMessage msg) {
310         return asyncFastSend(msg, getNextXid());
311     }
312
313     /**
314      * This method puts the message in an outgoing priority queue with high
315      * priority. It will be served first before normal priority messages. The
316      * method should be used for critical messages such as hello, echo reply
317      * etc. The specified XID is inserted into the message.
318      * 
319      * @param msg
320      *            The OF message to be sent
321      * @return The XID used
322      */
323     @Override
324     public Integer asyncFastSend(OFMessage msg, int xid) {
325         msg.setXid(xid);
326         if (transmitQ != null) {
327             transmitQ.add(new PriorityMessage(msg, 1));
328         }
329         return xid;
330     }
331
332     public void resumeSend() {
333         try {
334             if (msgReadWriteService != null) {
335                 msgReadWriteService.resumeSend();
336             }
337         } catch (Exception e) {
338             reportError(e);
339         }
340     }
341
342     public void handleMessages() {
343         List<OFMessage> msgs = null;
344
345         try {
346             msgs = msgReadWriteService.readMessages();
347         } catch (Exception e) {
348             reportError(e);
349         }
350
351         if (msgs == null) {
352             logger.debug("{} is down", toString());
353             // the connection is down, inform core
354             reportSwitchStateChange(false);
355             return;
356         }
357         for (OFMessage msg : msgs) {
358             logger.trace("Message received: {}", msg.toString());
359             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
360             OFType type = msg.getType();
361             switch (type) {
362             case HELLO:
363                 // send feature request
364                 OFMessage featureRequest = factory
365                         .getMessage(OFType.FEATURES_REQUEST);
366                 asyncFastSend(featureRequest);
367                 // delete all pre-existing flows
368                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
369                 OFFlowMod flowMod = (OFFlowMod) factory
370                         .getMessage(OFType.FLOW_MOD);
371                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
372                         .setOutPort(OFPort.OFPP_NONE)
373                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
374                 asyncFastSend(flowMod);
375                 this.state = SwitchState.WAIT_FEATURES_REPLY;
376                 startSwitchTimer();
377                 break;
378             case ECHO_REQUEST:
379                 OFEchoReply echoReply = (OFEchoReply) factory
380                         .getMessage(OFType.ECHO_REPLY);
381                 asyncFastSend(echoReply);
382                 break;
383             case ECHO_REPLY:
384                 this.probeSent = false;
385                 break;
386             case FEATURES_REPLY:
387                 processFeaturesReply((OFFeaturesReply) msg);
388                 break;
389             case GET_CONFIG_REPLY:
390                 // make sure that the switch can send the whole packet to the
391                 // controller
392                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
393                     this.state = SwitchState.OPERATIONAL;
394                 }
395                 break;
396             case BARRIER_REPLY:
397                 processBarrierReply((OFBarrierReply) msg);
398                 break;
399             case ERROR:
400                 processErrorReply((OFError) msg);
401                 break;
402             case PORT_STATUS:
403                 processPortStatusMsg((OFPortStatus) msg);
404                 break;
405             case STATS_REPLY:
406                 processStatsReply((OFStatisticsReply) msg);
407                 break;
408             case PACKET_IN:
409                 break;
410             default:
411                 break;
412             } // end of switch
413             if (isOperational()) {
414                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
415             }
416         } // end of for
417     }
418
419     private void processPortStatusMsg(OFPortStatus msg) {
420         OFPhysicalPort port = msg.getDesc();
421         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
422             updatePhysicalPort(port);
423         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
424             updatePhysicalPort(port);
425         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
426                 .ordinal()) {
427             deletePhysicalPort(port);
428         }
429
430     }
431
432     private void startSwitchTimer() {
433         this.periodicTimer = new Timer();
434         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
435             @Override
436             public void run() {
437                 try {
438                     Long now = System.currentTimeMillis();
439                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
440                         if (probeSent) {
441                             // switch failed to respond to our probe, consider
442                             // it down
443                             logger.warn("{} is idle for too long, disconnect",
444                                     toString());
445                             reportSwitchStateChange(false);
446                         } else {
447                             // send a probe to see if the switch is still alive
448                             logger.debug(
449                                     "Send idle probe (Echo Request) to {}",
450                                     toString());
451                             probeSent = true;
452                             OFMessage echo = factory
453                                     .getMessage(OFType.ECHO_REQUEST);
454                             asyncFastSend(echo);
455                         }
456                     } else {
457                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
458                             // send another features request
459                             OFMessage request = factory
460                                     .getMessage(OFType.FEATURES_REQUEST);
461                             asyncFastSend(request);
462                         } else {
463                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
464                                 // send another config request
465                                 OFSetConfig config = (OFSetConfig) factory
466                                         .getMessage(OFType.SET_CONFIG);
467                                 config.setMissSendLength((short) 0xffff)
468                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
469                                 asyncFastSend(config);
470                                 OFMessage getConfig = factory
471                                         .getMessage(OFType.GET_CONFIG_REQUEST);
472                                 asyncFastSend(getConfig);
473                             }
474                         }
475                     }
476                 } catch (Exception e) {
477                     reportError(e);
478                 }
479             }
480         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
481     }
482
483     private void cancelSwitchTimer() {
484         if (this.periodicTimer != null) {
485             this.periodicTimer.cancel();
486         }
487     }
488
489     private void reportError(Exception e) {
490         if (e instanceof AsynchronousCloseException
491                 || e instanceof InterruptedException
492                 || e instanceof SocketException || e instanceof IOException) {
493             logger.debug("Caught exception {}", e.getMessage());
494         } else {
495             logger.warn("Caught exception ", e);
496         }
497         // notify core of this error event and disconnect the switch
498         ((Controller) core).takeSwitchEventError(this);
499     }
500
501     private void reportSwitchStateChange(boolean added) {
502         if (added) {
503             ((Controller) core).takeSwitchEventAdd(this);
504         } else {
505             ((Controller) core).takeSwitchEventDelete(this);
506         }
507     }
508
509     @Override
510     public Long getId() {
511         return this.sid;
512     }
513
514     private void processFeaturesReply(OFFeaturesReply reply) {
515         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
516             this.sid = reply.getDatapathId();
517             this.buffers = reply.getBuffers();
518             this.capabilities = reply.getCapabilities();
519             this.tables = reply.getTables();
520             this.actions = reply.getActions();
521             // notify core of this error event
522             for (OFPhysicalPort port : reply.getPorts()) {
523                 updatePhysicalPort(port);
524             }
525             // config the switch to send full data packet
526             OFSetConfig config = (OFSetConfig) factory
527                     .getMessage(OFType.SET_CONFIG);
528             config.setMissSendLength((short) 0xffff).setLengthU(
529                     OFSetConfig.MINIMUM_LENGTH);
530             asyncFastSend(config);
531             // send config request to make sure the switch can handle the set
532             // config
533             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
534             asyncFastSend(getConfig);
535             this.state = SwitchState.WAIT_CONFIG_REPLY;
536             // inform core that a new switch is now operational
537             reportSwitchStateChange(true);
538         }
539     }
540
541     private void updatePhysicalPort(OFPhysicalPort port) {
542         Short portNumber = port.getPortNumber();
543         physicalPorts.put(portNumber, port);
544         portBandwidth
545                 .put(portNumber,
546                         port.getCurrentFeatures()
547                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
548                                         | OFPortFeatures.OFPPF_10MB_HD
549                                                 .getValue()
550                                         | OFPortFeatures.OFPPF_100MB_FD
551                                                 .getValue()
552                                         | OFPortFeatures.OFPPF_100MB_HD
553                                                 .getValue()
554                                         | OFPortFeatures.OFPPF_1GB_FD
555                                                 .getValue()
556                                         | OFPortFeatures.OFPPF_1GB_HD
557                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
558                                             .getValue()));
559     }
560
561     private void deletePhysicalPort(OFPhysicalPort port) {
562         Short portNumber = port.getPortNumber();
563         physicalPorts.remove(portNumber);
564         portBandwidth.remove(portNumber);
565     }
566
567     @Override
568     public boolean isOperational() {
569         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
570     }
571
572     @Override
573     public String toString() {
574         try {
575             return ("Switch:"
576                     + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
577                     + " SWID:" + (isOperational() ? HexString
578                     .toHexString(this.sid) : "unknown"));
579         } catch (Exception e) {
580             return (isOperational() ? HexString.toHexString(this.sid)
581                     : "unknown");
582         }
583
584     }
585
586     @Override
587     public Date getConnectedDate() {
588         return this.connectedDate;
589     }
590
591     public String getInstanceName() {
592         return instanceName;
593     }
594
595     @Override
596     public Object getStatistics(OFStatisticsRequest req) {
597         int xid = getNextXid();
598         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
599         messageWaitingDone.put(xid, worker);
600         Future<Object> submit = executor.submit(worker);
601         Object result = null;
602         try {
603             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
604             return result;
605         } catch (Exception e) {
606             logger.warn("Timeout while waiting for {} replies", req.getType());
607             result = null; // to indicate timeout has occurred
608             return result;
609         }
610     }
611
612     @Override
613     public Object syncSend(OFMessage msg) {
614         int xid = getNextXid();
615         return syncSend(msg, xid);
616     }
617
618     /*
619      * Either a BarrierReply or a OFError is received. If this is a reply for an
620      * outstanding sync message, wake up associated task so that it can continue
621      */
622     private void processBarrierReply(OFBarrierReply msg) {
623         Integer xid = msg.getXid();
624         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
625                 .remove(xid);
626         if (worker == null) {
627             return;
628         }
629         worker.wakeup();
630     }
631
632     private void processErrorReply(OFError errorMsg) {
633         OFMessage offendingMsg = errorMsg.getOffendingMsg();
634         Integer xid;
635         if (offendingMsg != null) {
636             xid = offendingMsg.getXid();
637         } else {
638             xid = errorMsg.getXid();
639         }
640         /*
641          * the error can be a reply to a synchronous message or to a statistic
642          * request message
643          */
644         Callable<?> worker = messageWaitingDone.remove(xid);
645         if (worker == null) {
646             return;
647         }
648         if (worker instanceof SynchronousMessage) {
649             ((SynchronousMessage) worker).wakeup(errorMsg);
650         } else {
651             ((StatisticsCollector) worker).wakeup(errorMsg);
652         }
653     }
654
655     private void processStatsReply(OFStatisticsReply reply) {
656         Integer xid = reply.getXid();
657         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
658                 .get(xid);
659         if (worker == null) {
660             return;
661         }
662         if (worker.collect(reply)) {
663             // if all the stats records are received (collect() returns true)
664             // then we are done.
665             messageWaitingDone.remove(xid);
666             worker.wakeup();
667         }
668     }
669
670     @Override
671     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
672         return this.physicalPorts;
673     }
674
675     @Override
676     public OFPhysicalPort getPhysicalPort(Short portNumber) {
677         return this.physicalPorts.get(portNumber);
678     }
679
680     @Override
681     public Integer getPortBandwidth(Short portNumber) {
682         return this.portBandwidth.get(portNumber);
683     }
684
685     @Override
686     public Set<Short> getPorts() {
687         return this.physicalPorts.keySet();
688     }
689
690     @Override
691     public Byte getTables() {
692         return this.tables;
693     }
694
695     @Override
696     public Integer getActions() {
697         return this.actions;
698     }
699
700     @Override
701     public Integer getCapabilities() {
702         return this.capabilities;
703     }
704
705     @Override
706     public Integer getBuffers() {
707         return this.buffers;
708     }
709
710     @Override
711     public boolean isPortEnabled(short portNumber) {
712         return isPortEnabled(physicalPorts.get(portNumber));
713     }
714
715     @Override
716     public boolean isPortEnabled(OFPhysicalPort port) {
717         if (port == null) {
718             return false;
719         }
720         int portConfig = port.getConfig();
721         int portState = port.getState();
722         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
723             return false;
724         }
725         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
726             return false;
727         }
728         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
729                 .getValue()) {
730             return false;
731         }
732         return true;
733     }
734
735     @Override
736     public List<OFPhysicalPort> getEnabledPorts() {
737         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
738         synchronized (this.physicalPorts) {
739             for (OFPhysicalPort port : physicalPorts.values()) {
740                 if (isPortEnabled(port)) {
741                     result.add(port);
742                 }
743             }
744         }
745         return result;
746     }
747
748     /*
749      * Transmit thread polls the message out of the priority queue and invokes
750      * messaging service to transmit it over the socket channel
751      */
752     class PriorityMessageTransmit implements Runnable {
753         public void run() {
754             running = true;
755             while (running) {
756                 try {
757                     if (!transmitQ.isEmpty()) {
758                         PriorityMessage pmsg = transmitQ.poll();
759                         msgReadWriteService.asyncSend(pmsg.msg);
760                         logger.trace("Message sent: {}", pmsg.toString());
761                     }
762                     Thread.sleep(10);
763                 } catch (InterruptedException ie) {
764                     reportError(new InterruptedException(
765                             "PriorityMessageTransmit thread interrupted"));
766                 } catch (Exception e) {
767                     reportError(e);
768                 }
769             }
770             transmitQ = null;
771         }
772     }
773
774     /*
775      * Setup and start the transmit thread
776      */
777     private void startTransmitThread() {
778         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
779                 new Comparator<PriorityMessage>() {
780                     public int compare(PriorityMessage p1, PriorityMessage p2) {
781                         if (p2.priority != p1.priority) {
782                             return p2.priority - p1.priority;
783                         } else {
784                             return (p2.seqNum < p1.seqNum) ? 1 : -1;
785                         }
786                     }
787                 });
788         this.transmitThread = new Thread(new PriorityMessageTransmit());
789         this.transmitThread.start();
790     }
791
792     /*
793      * Setup communication services
794      */
795     private void setupCommChannel() throws Exception {
796         this.selector = SelectorProvider.provider().openSelector();
797         this.socket.configureBlocking(false);
798         this.socket.socket().setTcpNoDelay(true);
799         this.msgReadWriteService = getMessageReadWriteService();
800     }
801
802     private void sendFirstHello() {
803         try {
804             OFMessage msg = factory.getMessage(OFType.HELLO);
805             asyncFastSend(msg);
806         } catch (Exception e) {
807             reportError(e);
808         }
809     }
810
811     private IMessageReadWrite getMessageReadWriteService() throws Exception {
812         String str = System.getProperty("secureChannelEnabled");
813         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
814                 socket, selector) : new MessageReadWriteService(socket,
815                 selector);
816     }
817
818     /**
819      * Sends synchronous Barrier message
820      */
821     @Override
822     public Object sendBarrierMessage() {
823         OFBarrierRequest barrierMsg = new OFBarrierRequest();
824         return syncSend(barrierMsg);
825     }
826
827     /**
828      * This method returns the switch liveness timeout value. If controller did
829      * not receive any message from the switch for such a long period,
830      * controller will tear down the connection to the switch.
831      * 
832      * @return The timeout value
833      */
834     private static int getSwitchLivenessTimeout() {
835         String timeout = System.getProperty("of.switchLivenessTimeout");
836         int rv = 60500;
837
838         try {
839             if (timeout != null) {
840                 rv = Integer.parseInt(timeout);
841             }
842         } catch (Exception e) {
843         }
844
845         return rv;
846     }
847 }