Added support for annotations in generated APIs.
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.SelectionKey;
15 import java.nio.channels.Selector;
16 import java.nio.channels.SocketChannel;
17 import java.nio.channels.spi.SelectorProvider;
18 import java.util.ArrayList;
19 import java.util.Comparator;
20 import java.util.Date;
21 import java.util.HashMap;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.Timer;
27 import java.util.TimerTask;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ExecutorService;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.Future;
33 import java.util.concurrent.PriorityBlockingQueue;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36
37 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
38 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
40 import org.openflow.protocol.OFBarrierReply;
41 import org.openflow.protocol.OFBarrierRequest;
42 import org.openflow.protocol.OFEchoReply;
43 import org.openflow.protocol.OFError;
44 import org.openflow.protocol.OFFeaturesReply;
45 import org.openflow.protocol.OFFlowMod;
46 import org.openflow.protocol.OFGetConfigReply;
47 import org.openflow.protocol.OFMatch;
48 import org.openflow.protocol.OFMessage;
49 import org.openflow.protocol.OFPhysicalPort;
50 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
51 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
52 import org.openflow.protocol.OFPhysicalPort.OFPortState;
53 import org.openflow.protocol.OFPort;
54 import org.openflow.protocol.OFPortStatus;
55 import org.openflow.protocol.OFPortStatus.OFPortReason;
56 import org.openflow.protocol.OFSetConfig;
57 import org.openflow.protocol.OFStatisticsReply;
58 import org.openflow.protocol.OFStatisticsRequest;
59 import org.openflow.protocol.OFType;
60 import org.openflow.protocol.factory.BasicFactory;
61 import org.openflow.util.HexString;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 public class SwitchHandler implements ISwitch {
66     private static final Logger logger = LoggerFactory
67             .getLogger(SwitchHandler.class);
68     private static final int SWITCH_LIVENESS_TIMER = 5000;
69     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
70     private int MESSAGE_RESPONSE_TIMER = 2000;
71
72     private String instanceName;
73     private ISwitch thisISwitch;
74     private IController core;
75     private Long sid;
76     private Integer buffers;
77     private Integer capabilities;
78     private Byte tables;
79     private Integer actions;
80     private Selector selector;
81     private SocketChannel socket;
82     private BasicFactory factory;
83     private AtomicInteger xid;
84     private SwitchState state;
85     private Timer periodicTimer;
86     private Map<Short, OFPhysicalPort> physicalPorts;
87     private Map<Short, Integer> portBandwidth;
88     private Date connectedDate;
89     private Long lastMsgReceivedTimeStamp;
90     private Boolean probeSent;
91     private ExecutorService executor;
92     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
93     private boolean running;
94     private IMessageReadWrite msgReadWriteService;
95     private Thread switchHandlerThread;
96     private Integer responseTimerValue;
97     private PriorityBlockingQueue<PriorityMessage> transmitQ;
98     private Thread transmitThread;
99
100     private enum SwitchState {
101         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
102                 3);
103
104         private int value;
105
106         private SwitchState(int value) {
107             this.value = value;
108         }
109
110         @SuppressWarnings("unused")
111         public int value() {
112             return this.value;
113         }
114     }
115
116     public SwitchHandler(Controller core, SocketChannel sc, String name) {
117         this.instanceName = name;
118         this.thisISwitch = this;
119         this.sid = (long) 0;
120         this.buffers = (int) 0;
121         this.capabilities = (int) 0;
122         this.tables = (byte) 0;
123         this.actions = (int) 0;
124         this.core = core;
125         this.socket = sc;
126         this.factory = new BasicFactory();
127         this.connectedDate = new Date();
128         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
129         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
130         this.portBandwidth = new HashMap<Short, Integer>();
131         this.state = SwitchState.NON_OPERATIONAL;
132         this.probeSent = false;
133         this.xid = new AtomicInteger(this.socket.hashCode());
134         this.periodicTimer = null;
135         this.executor = Executors.newFixedThreadPool(4);
136         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
137         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
138         String rTimer = System.getProperty("of.messageResponseTimer");
139         if (rTimer != null) {
140             try {
141                 responseTimerValue = Integer.decode(rTimer);
142             } catch (NumberFormatException e) {
143                 logger.warn(
144                         "Invalid of.messageResponseTimer: {} use default({})",
145                         rTimer, MESSAGE_RESPONSE_TIMER);
146             }
147         }
148     }
149
150     public void start() {
151         try {
152             startTransmitThread();
153             setupCommChannel();
154             sendFirstHello();
155             startHandlerThread();
156         } catch (Exception e) {
157             reportError(e);
158         }
159     }
160
161     private void startHandlerThread() {
162         switchHandlerThread = new Thread(new Runnable() {
163             @Override
164             public void run() {
165                 running = true;
166                 while (running) {
167                     try {
168                         // wait for an incoming connection
169                         selector.select(0);
170                         Iterator<SelectionKey> selectedKeys = selector
171                                 .selectedKeys().iterator();
172                         while (selectedKeys.hasNext()) {
173                             SelectionKey skey = selectedKeys.next();
174                             selectedKeys.remove();
175                             if (skey.isValid() && skey.isWritable()) {
176                                 resumeSend();
177                             }
178                             if (skey.isValid() && skey.isReadable()) {
179                                 handleMessages();
180                             }
181                         }
182                     } catch (Exception e) {
183                         reportError(e);
184                     }
185                 }
186             }
187         }, instanceName);
188         switchHandlerThread.start();
189     }
190
191     public void stop() {
192         running = false;
193         cancelSwitchTimer();
194         try {
195             selector.wakeup();
196             selector.close();
197         } catch (Exception e) {
198         }
199         try {
200             socket.close();
201         } catch (Exception e) {
202         }
203         try {
204             msgReadWriteService.stop();
205         } catch (Exception e) {
206         }
207         executor.shutdown();
208
209         selector = null;
210         msgReadWriteService = null;
211
212         if (switchHandlerThread != null) {
213             switchHandlerThread.interrupt();
214         }
215         if (transmitThread != null) {
216             transmitThread.interrupt();
217         }
218     }
219
220     @Override
221     public int getNextXid() {
222         return this.xid.incrementAndGet();
223     }
224
225     /**
226      * This method puts the message in an outgoing priority queue with normal
227      * priority. It will be served after high priority messages. The method
228      * should be used for non-critical messages such as statistics request,
229      * discovery packets, etc. An unique XID is generated automatically and
230      * inserted into the message.
231      * 
232      * @param msg
233      *            The OF message to be sent
234      * @return The XID used
235      */
236     @Override
237     public Integer asyncSend(OFMessage msg) {
238         return asyncSend(msg, getNextXid());
239     }
240
241     private Object syncSend(OFMessage msg, int xid) {
242         SynchronousMessage worker = new SynchronousMessage(this, xid, msg);
243         messageWaitingDone.put(xid, worker);
244         Object result = null;
245         Boolean status = false;
246         Future<Object> submit = executor.submit(worker);
247         try {
248             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
249             messageWaitingDone.remove(xid);
250             if (result == null) {
251                 // if result is null, then it means the switch can handle this
252                 // message successfully
253                 // convert the result into a Boolean with value true
254                 status = true;
255                 // logger.debug("Successfully send " +
256                 // msg.getType().toString());
257                 result = status;
258             } else {
259                 // if result is not null, this means the switch can't handle
260                 // this message
261                 // the result if OFError already
262                 logger.debug("Send {} failed --> {}", msg.getType().toString(),
263                         ((OFError) result).toString());
264             }
265             return result;
266         } catch (Exception e) {
267             logger.warn("Timeout while waiting for {} reply", msg.getType()
268                     .toString());
269             // convert the result into a Boolean with value false
270             status = false;
271             result = status;
272             return result;
273         }
274     }
275
276     /**
277      * This method puts the message in an outgoing priority queue with normal
278      * priority. It will be served after high priority messages. The method
279      * should be used for non-critical messages such as statistics request,
280      * discovery packets, etc. The specified XID is inserted into the message.
281      * 
282      * @param msg
283      *            The OF message to be Sent
284      * @param xid
285      *            The XID to be used in the message
286      * @return The XID used
287      */
288     @Override
289     public Integer asyncSend(OFMessage msg, int xid) {
290         msg.setXid(xid);
291         if (transmitQ != null) {
292             transmitQ.add(new PriorityMessage(msg, 0));
293         }
294         return xid;
295     }
296
297     /**
298      * This method puts the message in an outgoing priority queue with high
299      * priority. It will be served first before normal priority messages. The
300      * method should be used for critical messages such as hello, echo reply
301      * etc. An unique XID is generated automatically and inserted into the
302      * message.
303      * 
304      * @param msg
305      *            The OF message to be sent
306      * @return The XID used
307      */
308     @Override
309     public Integer asyncFastSend(OFMessage msg) {
310         return asyncFastSend(msg, getNextXid());
311     }
312
313     /**
314      * This method puts the message in an outgoing priority queue with high
315      * priority. It will be served first before normal priority messages. The
316      * method should be used for critical messages such as hello, echo reply
317      * etc. The specified XID is inserted into the message.
318      * 
319      * @param msg
320      *            The OF message to be sent
321      * @return The XID used
322      */
323     @Override
324     public Integer asyncFastSend(OFMessage msg, int xid) {
325         msg.setXid(xid);
326         if (transmitQ != null) {
327             transmitQ.add(new PriorityMessage(msg, 1));
328         }
329         return xid;
330     }
331
332     public void resumeSend() {
333         try {
334             if (msgReadWriteService != null) {
335                 msgReadWriteService.resumeSend();
336             }
337         } catch (Exception e) {
338             reportError(e);
339         }
340     }
341
342     public void handleMessages() {
343         List<OFMessage> msgs = null;
344
345         try {
346             msgs = msgReadWriteService.readMessages();
347         } catch (Exception e) {
348             reportError(e);
349         }
350
351         if (msgs == null) {
352             logger.debug("{} is down", toString());
353             // the connection is down, inform core
354             reportSwitchStateChange(false);
355             return;
356         }
357         for (OFMessage msg : msgs) {
358             logger.trace("Message received: {}", msg.toString());
359             /*
360              * if ((msg.getType() != OFType.ECHO_REQUEST) && (msg.getType() !=
361              * OFType.ECHO_REPLY)) { logger.debug(msg.getType().toString() +
362              * " received from sw " + toString()); }
363              */
364             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
365             OFType type = msg.getType();
366             switch (type) {
367             case HELLO:
368                 // send feature request
369                 OFMessage featureRequest = factory
370                         .getMessage(OFType.FEATURES_REQUEST);
371                 asyncFastSend(featureRequest);
372                 // delete all pre-existing flows
373                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
374                 OFFlowMod flowMod = (OFFlowMod) factory
375                         .getMessage(OFType.FLOW_MOD);
376                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
377                         .setOutPort(OFPort.OFPP_NONE)
378                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
379                 asyncFastSend(flowMod);
380                 this.state = SwitchState.WAIT_FEATURES_REPLY;
381                 startSwitchTimer();
382                 break;
383             case ECHO_REQUEST:
384                 OFEchoReply echoReply = (OFEchoReply) factory
385                         .getMessage(OFType.ECHO_REPLY);
386                 asyncFastSend(echoReply);
387                 break;
388             case ECHO_REPLY:
389                 this.probeSent = false;
390                 break;
391             case FEATURES_REPLY:
392                 processFeaturesReply((OFFeaturesReply) msg);
393                 break;
394             case GET_CONFIG_REPLY:
395                 // make sure that the switch can send the whole packet to the
396                 // controller
397                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
398                     this.state = SwitchState.OPERATIONAL;
399                 }
400                 break;
401             case BARRIER_REPLY:
402                 processBarrierReply((OFBarrierReply) msg);
403                 break;
404             case ERROR:
405                 processErrorReply((OFError) msg);
406                 break;
407             case PORT_STATUS:
408                 processPortStatusMsg((OFPortStatus) msg);
409                 break;
410             case STATS_REPLY:
411                 processStatsReply((OFStatisticsReply) msg);
412                 break;
413             case PACKET_IN:
414                 break;
415             default:
416                 break;
417             } // end of switch
418             if (isOperational()) {
419                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
420             }
421         } // end of for
422     }
423
424     private void processPortStatusMsg(OFPortStatus msg) {
425         // short portNumber = msg.getDesc().getPortNumber();
426         OFPhysicalPort port = msg.getDesc();
427         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
428             updatePhysicalPort(port);
429             // logger.debug("Port " + portNumber + " on " + toString() +
430             // " modified");
431         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
432             updatePhysicalPort(port);
433             // logger.debug("Port " + portNumber + " on " + toString() +
434             // " added");
435         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
436                 .ordinal()) {
437             deletePhysicalPort(port);
438             // logger.debug("Port " + portNumber + " on " + toString() +
439             // " deleted");
440         }
441
442     }
443
444     private void startSwitchTimer() {
445         this.periodicTimer = new Timer();
446         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
447             @Override
448             public void run() {
449                 try {
450                     Long now = System.currentTimeMillis();
451                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
452                         if (probeSent) {
453                             // switch failed to respond to our probe, consider
454                             // it down
455                             logger.warn("{} is idle for too long, disconnect",
456                                     toString());
457                             reportSwitchStateChange(false);
458                         } else {
459                             // send a probe to see if the switch is still alive
460                             // logger.debug("Send idle probe (Echo Request) to "
461                             // + switchName());
462                             probeSent = true;
463                             OFMessage echo = factory
464                                     .getMessage(OFType.ECHO_REQUEST);
465                             asyncFastSend(echo);
466                         }
467                     } else {
468                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
469                             // send another features request
470                             OFMessage request = factory
471                                     .getMessage(OFType.FEATURES_REQUEST);
472                             asyncFastSend(request);
473                         } else {
474                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
475                                 // send another config request
476                                 OFSetConfig config = (OFSetConfig) factory
477                                         .getMessage(OFType.SET_CONFIG);
478                                 config.setMissSendLength((short) 0xffff)
479                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
480                                 asyncFastSend(config);
481                                 OFMessage getConfig = factory
482                                         .getMessage(OFType.GET_CONFIG_REQUEST);
483                                 asyncFastSend(getConfig);
484                             }
485                         }
486                     }
487                 } catch (Exception e) {
488                     reportError(e);
489                 }
490             }
491         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
492     }
493
494     private void cancelSwitchTimer() {
495         if (this.periodicTimer != null) {
496             this.periodicTimer.cancel();
497         }
498     }
499
500     private void reportError(Exception e) {
501         if (e instanceof AsynchronousCloseException
502                 || e instanceof InterruptedException
503                 || e instanceof SocketException
504                 || e instanceof IOException) {
505             logger.debug("Caught exception {}", e.getMessage());
506         } else {
507             logger.warn("Caught exception ", e);
508         }
509         // notify core of this error event and disconnect the switch
510         ((Controller) core).takeSwitchEventError(this);
511     }
512
513     private void reportSwitchStateChange(boolean added) {
514         if (added) {
515             ((Controller) core).takeSwtichEventAdd(this);
516         } else {
517             ((Controller) core).takeSwitchEventDelete(this);
518         }
519     }
520
521     @Override
522     public Long getId() {
523         return this.sid;
524     }
525
526     private void processFeaturesReply(OFFeaturesReply reply) {
527         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
528             this.sid = reply.getDatapathId();
529             this.buffers = reply.getBuffers();
530             this.capabilities = reply.getCapabilities();
531             this.tables = reply.getTables();
532             this.actions = reply.getActions();
533             // notify core of this error event
534             for (OFPhysicalPort port : reply.getPorts()) {
535                 updatePhysicalPort(port);
536             }
537             // config the switch to send full data packet
538             OFSetConfig config = (OFSetConfig) factory
539                     .getMessage(OFType.SET_CONFIG);
540             config.setMissSendLength((short) 0xffff).setLengthU(
541                     OFSetConfig.MINIMUM_LENGTH);
542             asyncFastSend(config);
543             // send config request to make sure the switch can handle the set
544             // config
545             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
546             asyncFastSend(getConfig);
547             this.state = SwitchState.WAIT_CONFIG_REPLY;
548             // inform core that a new switch is now operational
549             reportSwitchStateChange(true);
550         }
551     }
552
553     private void updatePhysicalPort(OFPhysicalPort port) {
554         Short portNumber = port.getPortNumber();
555         physicalPorts.put(portNumber, port);
556         portBandwidth
557                 .put(portNumber,
558                         port.getCurrentFeatures()
559                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
560                                         | OFPortFeatures.OFPPF_10MB_HD
561                                                 .getValue()
562                                         | OFPortFeatures.OFPPF_100MB_FD
563                                                 .getValue()
564                                         | OFPortFeatures.OFPPF_100MB_HD
565                                                 .getValue()
566                                         | OFPortFeatures.OFPPF_1GB_FD
567                                                 .getValue()
568                                         | OFPortFeatures.OFPPF_1GB_HD
569                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
570                                         .getValue()));
571     }
572
573     private void deletePhysicalPort(OFPhysicalPort port) {
574         Short portNumber = port.getPortNumber();
575         physicalPorts.remove(portNumber);
576         portBandwidth.remove(portNumber);
577     }
578
579     @Override
580     public boolean isOperational() {
581         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
582     }
583
584     @Override
585     public String toString() {
586         return ("["
587                 + this.socket.toString()
588                 + " SWID "
589                 + (isOperational() ? HexString.toHexString(this.sid)
590                         : "unkbown") + "]");
591     }
592
593     @Override
594     public Date getConnectedDate() {
595         return this.connectedDate;
596     }
597
598     public String getInstanceName() {
599         return instanceName;
600     }
601
602     @Override
603     public Object getStatistics(OFStatisticsRequest req) {
604         int xid = getNextXid();
605         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
606         messageWaitingDone.put(xid, worker);
607         Future<Object> submit = executor.submit(worker);
608         Object result = null;
609         try {
610             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
611             return result;
612         } catch (Exception e) {
613             logger.warn("Timeout while waiting for {} replies", req.getType());
614             result = null; // to indicate timeout has occurred
615             return result;
616         }
617     }
618
619     @Override
620     public Object syncSend(OFMessage msg) {
621         int xid = getNextXid();
622         return syncSend(msg, xid);
623     }
624
625     /*
626      * Either a BarrierReply or a OFError is received. If this is a reply for an
627      * outstanding sync message, wake up associated task so that it can continue
628      */
629     private void processBarrierReply(OFBarrierReply msg) {
630         Integer xid = msg.getXid();
631         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
632                 .remove(xid);
633         if (worker == null) {
634             return;
635         }
636         worker.wakeup();
637     }
638
639     private void processErrorReply(OFError errorMsg) {
640         OFMessage offendingMsg = errorMsg.getOffendingMsg();
641         Integer xid;
642         if (offendingMsg != null) {
643             xid = offendingMsg.getXid();
644         } else {
645             xid = errorMsg.getXid();
646         }
647         /*
648          * the error can be a reply to a synchronous message or to a statistic
649          * request message
650          */
651         Callable<?> worker = messageWaitingDone.remove(xid);
652         if (worker == null) {
653             return;
654         }
655         if (worker instanceof SynchronousMessage) {
656             ((SynchronousMessage) worker).wakeup(errorMsg);
657         } else {
658             ((StatisticsCollector) worker).wakeup(errorMsg);
659         }
660     }
661
662     private void processStatsReply(OFStatisticsReply reply) {
663         Integer xid = reply.getXid();
664         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
665                 .get(xid);
666         if (worker == null) {
667             return;
668         }
669         if (worker.collect(reply)) {
670             // if all the stats records are received (collect() returns true)
671             // then we are done.
672             messageWaitingDone.remove(xid);
673             worker.wakeup();
674         }
675     }
676
677     @Override
678     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
679         return this.physicalPorts;
680     }
681
682     @Override
683     public OFPhysicalPort getPhysicalPort(Short portNumber) {
684         return this.physicalPorts.get(portNumber);
685     }
686
687     @Override
688     public Integer getPortBandwidth(Short portNumber) {
689         return this.portBandwidth.get(portNumber);
690     }
691
692     @Override
693     public Set<Short> getPorts() {
694         return this.physicalPorts.keySet();
695     }
696
697     @Override
698     public Byte getTables() {
699         return this.tables;
700     }
701
702     @Override
703     public Integer getActions() {
704         return this.actions;
705     }
706
707     @Override
708     public Integer getCapabilities() {
709         return this.capabilities;
710     }
711
712     @Override
713     public Integer getBuffers() {
714         return this.buffers;
715     }
716
717     @Override
718     public boolean isPortEnabled(short portNumber) {
719         return isPortEnabled(physicalPorts.get(portNumber));
720     }
721
722     @Override
723     public boolean isPortEnabled(OFPhysicalPort port) {
724         if (port == null) {
725             return false;
726         }
727         int portConfig = port.getConfig();
728         int portState = port.getState();
729         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
730             return false;
731         }
732         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
733             return false;
734         }
735         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
736                 .getValue()) {
737             return false;
738         }
739         return true;
740     }
741
742     @Override
743     public List<OFPhysicalPort> getEnabledPorts() {
744         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
745         synchronized (this.physicalPorts) {
746             for (OFPhysicalPort port : physicalPorts.values()) {
747                 if (isPortEnabled(port)) {
748                     result.add(port);
749                 }
750             }
751         }
752         return result;
753     }
754
755     /*
756      * Transmit thread polls the message out of the priority queue and invokes
757      * messaging service to transmit it over the socket channel
758      */
759     class PriorityMessageTransmit implements Runnable {
760         public void run() {
761             running = true;
762             while (running) {
763                 try {
764                     if (!transmitQ.isEmpty()) {
765                         PriorityMessage pmsg = transmitQ.poll();
766                         msgReadWriteService.asyncSend(pmsg.msg);
767                         logger.trace("Message sent: {}", pmsg.toString());
768                     }
769                     Thread.sleep(10);
770                 } catch (InterruptedException ie) {
771                     reportError(new InterruptedException(
772                             "PriorityMessageTransmit thread interrupted"));
773                 } catch (Exception e) {
774                     reportError(e);
775                 }
776             }
777             transmitQ = null;
778         }
779     }
780
781     /*
782      * Setup and start the transmit thread
783      */
784     private void startTransmitThread() {
785         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
786                 new Comparator<PriorityMessage>() {
787                     public int compare(PriorityMessage p1, PriorityMessage p2) {
788                         if (p2.priority != p1.priority) {
789                             return p2.priority - p1.priority;
790                         } else {
791                             return (p2.seqNum < p1.seqNum) ? 1 : -1;
792                         }
793                     }
794                 });
795         this.transmitThread = new Thread(new PriorityMessageTransmit());
796         this.transmitThread.start();
797     }
798
799     /*
800      * Setup communication services
801      */
802     private void setupCommChannel() throws Exception {
803         this.selector = SelectorProvider.provider().openSelector();
804         this.socket.configureBlocking(false);
805         this.socket.socket().setTcpNoDelay(true);
806         this.msgReadWriteService = getMessageReadWriteService();
807     }
808
809     private void sendFirstHello() {
810         try {
811             OFMessage msg = factory.getMessage(OFType.HELLO);
812             asyncFastSend(msg);
813         } catch (Exception e) {
814             reportError(e);
815         }
816     }
817
818     private IMessageReadWrite getMessageReadWriteService() throws Exception {
819         String str = System.getProperty("secureChannelEnabled");
820         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
821                 socket, selector) : new MessageReadWriteService(socket,
822                 selector);
823     }
824
825     /**
826      * Sends synchronous Barrier message 
827      */
828     @Override
829     public Object sendBarrierMessage() {
830         OFBarrierRequest barrierMsg = new OFBarrierRequest();
831         return syncSend(barrierMsg);        
832     }
833     
834     /**
835      * This method returns the switch liveness timeout value. If controller did
836      * not receive any message from the switch for such a long period,
837      * controller will tear down the connection to the switch.
838      * 
839      * @return The timeout value
840      */
841     private static int getSwitchLivenessTimeout() {
842         String timeout = System.getProperty("of.switchLivenessTimeout");
843         int rv = 60500;
844
845         try {
846             if (timeout != null) {
847                 rv = Integer.parseInt(timeout);
848             }
849         } catch (Exception e) {
850         }
851
852         return rv;
853     }
854 }