Revert "Fix bug 171. EchoReply payload must be the same as the correspoding EchoReque...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.nio.channels.SelectionKey;
12 import java.nio.channels.Selector;
13 import java.nio.channels.SocketChannel;
14 import java.nio.channels.spi.SelectorProvider;
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.openflow.protocol.OFBarrierReply;
39 import org.openflow.protocol.OFBarrierRequest;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFError;
42 import org.openflow.protocol.OFFeaturesReply;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFGetConfigReply;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFMessage;
47 import org.openflow.protocol.OFPhysicalPort;
48 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
49 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
50 import org.openflow.protocol.OFPhysicalPort.OFPortState;
51 import org.openflow.protocol.OFPort;
52 import org.openflow.protocol.OFPortStatus;
53 import org.openflow.protocol.OFPortStatus.OFPortReason;
54 import org.openflow.protocol.OFSetConfig;
55 import org.openflow.protocol.OFStatisticsReply;
56 import org.openflow.protocol.OFStatisticsRequest;
57 import org.openflow.protocol.OFType;
58 import org.openflow.protocol.factory.BasicFactory;
59 import org.openflow.util.HexString;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class SwitchHandler implements ISwitch {
64     private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
65     private static final int SWITCH_LIVENESS_TIMER = 5000;
66     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
67     private final int MESSAGE_RESPONSE_TIMER = 2000;
68
69     private final String instanceName;
70     private final ISwitch thisISwitch;
71     private final IController core;
72     private Long sid;
73     private Integer buffers;
74     private Integer capabilities;
75     private Byte tables;
76     private Integer actions;
77     private Selector selector;
78     private final SocketChannel socket;
79     private final BasicFactory factory;
80     private final AtomicInteger xid;
81     private SwitchState state;
82     private Timer periodicTimer;
83     private final Map<Short, OFPhysicalPort> physicalPorts;
84     private final Map<Short, Integer> portBandwidth;
85     private final Date connectedDate;
86     private Long lastMsgReceivedTimeStamp;
87     private Boolean probeSent;
88     private final ExecutorService executor;
89     private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
90     private boolean running;
91     private IMessageReadWrite msgReadWriteService;
92     private Thread switchHandlerThread;
93     private Integer responseTimerValue;
94     private PriorityBlockingQueue<PriorityMessage> transmitQ;
95     private Thread transmitThread;
96
97     private enum SwitchState {
98         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
99
100         private int value;
101
102         private SwitchState(int value) {
103             this.value = value;
104         }
105
106         @SuppressWarnings("unused")
107         public int value() {
108             return this.value;
109         }
110     }
111
112     public SwitchHandler(Controller core, SocketChannel sc, String name) {
113         this.instanceName = name;
114         this.thisISwitch = this;
115         this.sid = (long) 0;
116         this.buffers = (int) 0;
117         this.capabilities = (int) 0;
118         this.tables = (byte) 0;
119         this.actions = (int) 0;
120         this.core = core;
121         this.socket = sc;
122         this.factory = new BasicFactory();
123         this.connectedDate = new Date();
124         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
125         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126         this.portBandwidth = new HashMap<Short, Integer>();
127         this.state = SwitchState.NON_OPERATIONAL;
128         this.probeSent = false;
129         this.xid = new AtomicInteger(this.socket.hashCode());
130         this.periodicTimer = null;
131         this.executor = Executors.newFixedThreadPool(4);
132         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
133         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
134         String rTimer = System.getProperty("of.messageResponseTimer");
135         if (rTimer != null) {
136             try {
137                 responseTimerValue = Integer.decode(rTimer);
138             } catch (NumberFormatException e) {
139                 logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
140             }
141         }
142     }
143
144     public void start() {
145         try {
146             startTransmitThread();
147             setupCommChannel();
148             sendFirstHello();
149             startHandlerThread();
150         } catch (Exception e) {
151             reportError(e);
152         }
153     }
154
155     private void startHandlerThread() {
156         switchHandlerThread = new Thread(new Runnable() {
157             @Override
158             public void run() {
159                 running = true;
160                 while (running) {
161                     try {
162                         // wait for an incoming connection
163                         selector.select(0);
164                         Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
165                         while (selectedKeys.hasNext()) {
166                             SelectionKey skey = selectedKeys.next();
167                             selectedKeys.remove();
168                             if (skey.isValid() && skey.isWritable()) {
169                                 resumeSend();
170                             }
171                             if (skey.isValid() && skey.isReadable()) {
172                                 handleMessages();
173                             }
174                         }
175                     } catch (Exception e) {
176                         reportError(e);
177                     }
178                 }
179             }
180         }, instanceName);
181         switchHandlerThread.start();
182     }
183
184     private void stopInternal() {
185         logger.debug("{} receives stop signal",
186                 (isOperational() ? HexString.toHexString(sid) : "unknown"));
187         running = false;
188         cancelSwitchTimer();
189         try {
190             selector.wakeup();
191             selector.close();
192         } catch (Exception e) {
193         }
194         try {
195             socket.close();
196         } catch (Exception e) {
197         }
198         try {
199             msgReadWriteService.stop();
200         } catch (Exception e) {
201         }
202         logger.debug("executor shutdown now");
203         executor.shutdownNow();
204
205         msgReadWriteService = null;
206     }
207
208     public void stop() {
209         stopInternal();
210
211         if (switchHandlerThread != null) {
212             switchHandlerThread.interrupt();
213         }
214         if (transmitThread != null) {
215             transmitThread.interrupt();
216         }
217     }
218
219     @Override
220     public int getNextXid() {
221         return this.xid.incrementAndGet();
222     }
223
224     /**
225      * This method puts the message in an outgoing priority queue with normal
226      * priority. It will be served after high priority messages. The method
227      * should be used for non-critical messages such as statistics request,
228      * discovery packets, etc. An unique XID is generated automatically and
229      * inserted into the message.
230      *
231      * @param msg
232      *            The OF message to be sent
233      * @return The XID used
234      */
235     @Override
236     public Integer asyncSend(OFMessage msg) {
237         return asyncSend(msg, getNextXid());
238     }
239
240     private Object syncSend(OFMessage msg, int xid) {
241         return syncMessageInternal(msg, xid, true);
242     }
243
244     /**
245      * This method puts the message in an outgoing priority queue with normal
246      * priority. It will be served after high priority messages. The method
247      * should be used for non-critical messages such as statistics request,
248      * discovery packets, etc. The specified XID is inserted into the message.
249      *
250      * @param msg
251      *            The OF message to be Sent
252      * @param xid
253      *            The XID to be used in the message
254      * @return The XID used
255      */
256     @Override
257     public Integer asyncSend(OFMessage msg, int xid) {
258         msg.setXid(xid);
259         if (transmitQ != null) {
260             transmitQ.add(new PriorityMessage(msg, 0));
261         }
262         return xid;
263     }
264
265     /**
266      * This method puts the message in an outgoing priority queue with high
267      * priority. It will be served first before normal priority messages. The
268      * method should be used for critical messages such as hello, echo reply
269      * etc. An unique XID is generated automatically and inserted into the
270      * message.
271      *
272      * @param msg
273      *            The OF message to be sent
274      * @return The XID used
275      */
276     @Override
277     public Integer asyncFastSend(OFMessage msg) {
278         return asyncFastSend(msg, getNextXid());
279     }
280
281     /**
282      * This method puts the message in an outgoing priority queue with high
283      * priority. It will be served first before normal priority messages. The
284      * method should be used for critical messages such as hello, echo reply
285      * etc. The specified XID is inserted into the message.
286      *
287      * @param msg
288      *            The OF message to be sent
289      * @return The XID used
290      */
291     @Override
292     public Integer asyncFastSend(OFMessage msg, int xid) {
293         msg.setXid(xid);
294         if (transmitQ != null) {
295             transmitQ.add(new PriorityMessage(msg, 1));
296         }
297         return xid;
298     }
299
300     public void resumeSend() {
301         try {
302             if (msgReadWriteService != null) {
303                 msgReadWriteService.resumeSend();
304             }
305         } catch (Exception e) {
306             reportError(e);
307         }
308     }
309
310     /**
311      * This method bypasses the transmit queue and sends the message over the
312      * socket directly. If the input xid is not null, the specified xid is
313      * inserted into the message. Otherwise, an unique xid is generated
314      * automatically and inserted into the message.
315      *
316      * @param msg
317      *            Message to be sent
318      * @param xid
319      *            Message xid
320      */
321     private void asyncSendNow(OFMessage msg, Integer xid) {
322         if (xid == null) {
323             xid = getNextXid();
324         }
325         msg.setXid(xid);
326
327         asyncSendNow(msg);
328     }
329
330     /**
331      * This method bypasses the transmit queue and sends the message over the
332      * socket directly.
333      *
334      * @param msg
335      *            Message to be sent
336      */
337     private void asyncSendNow(OFMessage msg) {
338         if (msgReadWriteService == null) {
339             logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
340             return;
341         }
342
343         try {
344             msgReadWriteService.asyncSend(msg);
345         } catch (Exception e) {
346             reportError(e);
347         }
348     }
349
350     public void handleMessages() {
351         List<OFMessage> msgs = null;
352
353         try {
354             if (msgReadWriteService != null) {
355                 msgs = msgReadWriteService.readMessages();
356             }
357         } catch (Exception e) {
358             reportError(e);
359         }
360
361         if (msgs == null) {
362             return;
363         }
364         for (OFMessage msg : msgs) {
365             logger.trace("Message received: {}", msg);
366             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
367             OFType type = msg.getType();
368             switch (type) {
369             case HELLO:
370                 sendFeaturesRequest();
371                 break;
372             case ECHO_REQUEST:
373                 OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
374                 // respond immediately
375                 asyncSendNow(echoReply, msg.getXid());
376
377                 // send features request if not sent yet
378                 sendFeaturesRequest();
379                 break;
380             case ECHO_REPLY:
381                 this.probeSent = false;
382                 break;
383             case FEATURES_REPLY:
384                 processFeaturesReply((OFFeaturesReply) msg);
385                 break;
386             case GET_CONFIG_REPLY:
387                 // make sure that the switch can send the whole packet to the
388                 // controller
389                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
390                     this.state = SwitchState.OPERATIONAL;
391                 }
392                 break;
393             case BARRIER_REPLY:
394                 processBarrierReply((OFBarrierReply) msg);
395                 break;
396             case ERROR:
397                 processErrorReply((OFError) msg);
398                 break;
399             case PORT_STATUS:
400                 processPortStatusMsg((OFPortStatus) msg);
401                 break;
402             case STATS_REPLY:
403                 processStatsReply((OFStatisticsReply) msg);
404                 break;
405             case PACKET_IN:
406                 break;
407             default:
408                 break;
409             } // end of switch
410             if (isOperational()) {
411                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
412             }
413         } // end of for
414     }
415
416     private void processPortStatusMsg(OFPortStatus msg) {
417         OFPhysicalPort port = msg.getDesc();
418         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
419             updatePhysicalPort(port);
420         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
421             updatePhysicalPort(port);
422         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
423             deletePhysicalPort(port);
424         }
425
426     }
427
428     private void startSwitchTimer() {
429         this.periodicTimer = new Timer();
430         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
431             @Override
432             public void run() {
433                 try {
434                     Long now = System.currentTimeMillis();
435                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
436                         if (probeSent) {
437                             // switch failed to respond to our probe, consider
438                             // it down
439                             logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
440                                     .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
441                                     : HexString.toHexString(sid));
442                             reportSwitchStateChange(false);
443                         } else {
444                             // send a probe to see if the switch is still alive
445                             logger.debug("Send idle probe (Echo Request) to {}", this);
446                             probeSent = true;
447                             OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
448                             asyncFastSend(echo);
449                         }
450                     } else {
451                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
452                             // send another features request
453                             OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
454                             asyncFastSend(request);
455                         } else {
456                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
457                                 // send another config request
458                                 OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
459                                 config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
460                                 asyncFastSend(config);
461                                 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
462                                 asyncFastSend(getConfig);
463                             }
464                         }
465                     }
466                 } catch (Exception e) {
467                     reportError(e);
468                 }
469             }
470         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
471     }
472
473     private void cancelSwitchTimer() {
474         if (this.periodicTimer != null) {
475             this.periodicTimer.cancel();
476         }
477     }
478
479     private void reportError(Exception e) {
480         if (!running) {
481             logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
482                     (isOperational() ? HexString.toHexString(sid) : "unknown"));
483             return;
484         }
485         logger.debug("Caught exception: ", e);
486
487         // notify core of this error event and disconnect the switch
488         ((Controller) core).takeSwitchEventError(this);
489
490         // clean up some internal states immediately
491         stopInternal();
492     }
493
494     private void reportSwitchStateChange(boolean added) {
495         if (added) {
496             ((Controller) core).takeSwitchEventAdd(this);
497         } else {
498             ((Controller) core).takeSwitchEventDelete(this);
499         }
500     }
501
502     @Override
503     public Long getId() {
504         return this.sid;
505     }
506
507     private void sendFeaturesRequest() {
508         if (!isOperational() && (this.state != SwitchState.WAIT_FEATURES_REPLY)) {
509             // send feature request
510             OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
511             asyncFastSend(featureRequest);
512             this.state = SwitchState.WAIT_FEATURES_REPLY;
513             startSwitchTimer();
514         }
515     }
516
517     private void processFeaturesReply(OFFeaturesReply reply) {
518         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
519             this.sid = reply.getDatapathId();
520             this.buffers = reply.getBuffers();
521             this.capabilities = reply.getCapabilities();
522             this.tables = reply.getTables();
523             this.actions = reply.getActions();
524             // notify core of this error event
525             for (OFPhysicalPort port : reply.getPorts()) {
526                 updatePhysicalPort(port);
527             }
528             // config the switch to send full data packet
529             OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
530             config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
531             asyncFastSend(config);
532             // send config request to make sure the switch can handle the set
533             // config
534             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
535             asyncFastSend(getConfig);
536             this.state = SwitchState.WAIT_CONFIG_REPLY;
537             // inform core that a new switch is now operational
538             reportSwitchStateChange(true);
539         }
540     }
541
542     private void updatePhysicalPort(OFPhysicalPort port) {
543         Short portNumber = port.getPortNumber();
544         physicalPorts.put(portNumber, port);
545         portBandwidth
546                 .put(portNumber,
547                         port.getCurrentFeatures()
548                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
549                                         | OFPortFeatures.OFPPF_100MB_FD.getValue()
550                                         | OFPortFeatures.OFPPF_100MB_HD.getValue()
551                                         | OFPortFeatures.OFPPF_1GB_FD.getValue()
552                                         | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
553                                             .getValue()));
554     }
555
556     private void deletePhysicalPort(OFPhysicalPort port) {
557         Short portNumber = port.getPortNumber();
558         physicalPorts.remove(portNumber);
559         portBandwidth.remove(portNumber);
560     }
561
562     @Override
563     public boolean isOperational() {
564         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
565     }
566
567     @Override
568     public String toString() {
569         try {
570             return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
571                     .toHexString(this.sid) : "unknown"));
572         } catch (Exception e) {
573             return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
574         }
575
576     }
577
578     @Override
579     public Date getConnectedDate() {
580         return this.connectedDate;
581     }
582
583     public String getInstanceName() {
584         return instanceName;
585     }
586
587     @Override
588     public Object getStatistics(OFStatisticsRequest req) {
589         int xid = getNextXid();
590         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
591         messageWaitingDone.put(xid, worker);
592         Future<Object> submit;
593         Object result = null;
594         try {
595             submit = executor.submit(worker);
596         } catch (RejectedExecutionException re) {
597             messageWaitingDone.remove(xid);
598             return result;
599         }
600         try {
601             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
602             return result;
603         } catch (Exception e) {
604             logger.warn("Timeout while waiting for {} replies from {}",
605                     req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
606             result = null; // to indicate timeout has occurred
607             worker.wakeup();
608             return result;
609         }
610     }
611
612     @Override
613     public Object syncSend(OFMessage msg) {
614         if (!running) {
615             logger.debug("Switch is going down, ignore syncSend");
616             return null;
617         }
618         int xid = getNextXid();
619         return syncSend(msg, xid);
620     }
621
622     /*
623      * Either a BarrierReply or a OFError is received. If this is a reply for an
624      * outstanding sync message, wake up associated task so that it can continue
625      */
626     private void processBarrierReply(OFBarrierReply msg) {
627         Integer xid = msg.getXid();
628         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
629         if (worker == null) {
630             return;
631         }
632         worker.wakeup();
633     }
634
635     private void processErrorReply(OFError errorMsg) {
636         OFMessage offendingMsg = errorMsg.getOffendingMsg();
637         Integer xid;
638         if (offendingMsg != null) {
639             xid = offendingMsg.getXid();
640         } else {
641             xid = errorMsg.getXid();
642         }
643         /*
644          * the error can be a reply to a synchronous message or to a statistic
645          * request message
646          */
647         Callable<?> worker = messageWaitingDone.remove(xid);
648         if (worker == null) {
649             return;
650         }
651         if (worker instanceof SynchronousMessage) {
652             ((SynchronousMessage) worker).wakeup(errorMsg);
653         } else {
654             ((StatisticsCollector) worker).wakeup(errorMsg);
655         }
656     }
657
658     private void processStatsReply(OFStatisticsReply reply) {
659         Integer xid = reply.getXid();
660         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
661         if (worker == null) {
662             return;
663         }
664         if (worker.collect(reply)) {
665             // if all the stats records are received (collect() returns true)
666             // then we are done.
667             messageWaitingDone.remove(xid);
668             worker.wakeup();
669         }
670     }
671
672     @Override
673     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
674         return this.physicalPorts;
675     }
676
677     @Override
678     public OFPhysicalPort getPhysicalPort(Short portNumber) {
679         return this.physicalPorts.get(portNumber);
680     }
681
682     @Override
683     public Integer getPortBandwidth(Short portNumber) {
684         return this.portBandwidth.get(portNumber);
685     }
686
687     @Override
688     public Set<Short> getPorts() {
689         return this.physicalPorts.keySet();
690     }
691
692     @Override
693     public Byte getTables() {
694         return this.tables;
695     }
696
697     @Override
698     public Integer getActions() {
699         return this.actions;
700     }
701
702     @Override
703     public Integer getCapabilities() {
704         return this.capabilities;
705     }
706
707     @Override
708     public Integer getBuffers() {
709         return this.buffers;
710     }
711
712     @Override
713     public boolean isPortEnabled(short portNumber) {
714         return isPortEnabled(physicalPorts.get(portNumber));
715     }
716
717     @Override
718     public boolean isPortEnabled(OFPhysicalPort port) {
719         if (port == null) {
720             return false;
721         }
722         int portConfig = port.getConfig();
723         int portState = port.getState();
724         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
725             return false;
726         }
727         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
728             return false;
729         }
730         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
731             return false;
732         }
733         return true;
734     }
735
736     @Override
737     public List<OFPhysicalPort> getEnabledPorts() {
738         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
739         synchronized (this.physicalPorts) {
740             for (OFPhysicalPort port : physicalPorts.values()) {
741                 if (isPortEnabled(port)) {
742                     result.add(port);
743                 }
744             }
745         }
746         return result;
747     }
748
749     /*
750      * Transmit thread polls the message out of the priority queue and invokes
751      * messaging service to transmit it over the socket channel
752      */
753     class PriorityMessageTransmit implements Runnable {
754         @Override
755         public void run() {
756             running = true;
757             while (running) {
758                 try {
759                     PriorityMessage pmsg = transmitQ.take();
760                     msgReadWriteService.asyncSend(pmsg.msg);
761                     /*
762                      * If syncReply is set to true, wait for the response back.
763                      */
764                     if (pmsg.syncReply) {
765                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
766                     }
767                 } catch (InterruptedException ie) {
768                     reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
769                 } catch (Exception e) {
770                     reportError(e);
771                 }
772             }
773             transmitQ = null;
774         }
775     }
776
777     /*
778      * Setup and start the transmit thread
779      */
780     private void startTransmitThread() {
781         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
782             @Override
783             public int compare(PriorityMessage p1, PriorityMessage p2) {
784                 if (p2.priority != p1.priority) {
785                     return p2.priority - p1.priority;
786                 } else {
787                     return (p2.seqNum < p1.seqNum) ? 1 : -1;
788                 }
789             }
790         });
791         this.transmitThread = new Thread(new PriorityMessageTransmit());
792         this.transmitThread.start();
793     }
794
795     /*
796      * Setup communication services
797      */
798     private void setupCommChannel() throws Exception {
799         this.selector = SelectorProvider.provider().openSelector();
800         this.socket.configureBlocking(false);
801         this.socket.socket().setTcpNoDelay(true);
802         this.msgReadWriteService = getMessageReadWriteService();
803     }
804
805     private void sendFirstHello() {
806         try {
807             OFMessage msg = factory.getMessage(OFType.HELLO);
808             asyncFastSend(msg);
809         } catch (Exception e) {
810             reportError(e);
811         }
812     }
813
814     private IMessageReadWrite getMessageReadWriteService() throws Exception {
815         String str = System.getProperty("secureChannelEnabled");
816         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
817                 selector) : new MessageReadWriteService(socket, selector);
818     }
819
820     /**
821      * Send Barrier message synchronously. The caller will be blocked until the
822      * Barrier reply is received.
823      */
824     @Override
825     public Object syncSendBarrierMessage() {
826         OFBarrierRequest barrierMsg = new OFBarrierRequest();
827         return syncSend(barrierMsg);
828     }
829
830     /**
831      * Send Barrier message asynchronously. The caller is not blocked. The
832      * Barrier message will be sent in a transmit thread which will be blocked
833      * until the Barrier reply is received.
834      */
835     @Override
836     public Object asyncSendBarrierMessage() {
837         if (transmitQ == null) {
838             return Boolean.FALSE;
839         }
840
841         OFBarrierRequest barrierMsg = new OFBarrierRequest();
842         int xid = getNextXid();
843
844         barrierMsg.setXid(xid);
845         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
846
847         return Boolean.TRUE;
848     }
849
850     /**
851      * This method returns the switch liveness timeout value. If controller did
852      * not receive any message from the switch for such a long period,
853      * controller will tear down the connection to the switch.
854      *
855      * @return The timeout value
856      */
857     private static int getSwitchLivenessTimeout() {
858         String timeout = System.getProperty("of.switchLivenessTimeout");
859         int rv = 60500;
860
861         try {
862             if (timeout != null) {
863                 rv = Integer.parseInt(timeout);
864             }
865         } catch (Exception e) {
866         }
867
868         return rv;
869     }
870
871     /**
872      * This method performs synchronous operations for a given message. If
873      * syncRequest is set to true, the message will be sent out followed by a
874      * Barrier request message. Then it's blocked until the Barrier rely arrives
875      * or timeout. If syncRequest is false, it simply skips the message send and
876      * just waits for the response back.
877      *
878      * @param msg
879      *            Message to be sent
880      * @param xid
881      *            Message XID
882      * @param request
883      *            If set to true, the message the message will be sent out
884      *            followed by a Barrier request message. If set to false, it
885      *            simply skips the sending and just waits for the Barrier reply.
886      * @return the result
887      */
888     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
889         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
890         messageWaitingDone.put(xid, worker);
891         Object result = null;
892         Boolean status = false;
893         Future<Object> submit;
894         try {
895            submit = executor.submit(worker);
896         } catch (RejectedExecutionException re) {
897             messageWaitingDone.remove(xid);
898             return result;
899         }
900         try {
901             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
902             messageWaitingDone.remove(xid);
903             if (result == null) {
904                 // if result is null, then it means the switch can handle this
905                 // message successfully
906                 // convert the result into a Boolean with value true
907                 status = true;
908                 // logger.debug("Successfully send " +
909                 // msg.getType().toString());
910                 result = status;
911             } else {
912                 // if result is not null, this means the switch can't handle
913                 // this message
914                 // the result if OFError already
915                 if (logger.isDebugEnabled()) {
916                     logger.debug("Send {} failed --> {}", msg.getType(), (result));
917                 }
918             }
919             return result;
920         } catch (Exception e) {
921             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
922             // convert the result into a Boolean with value false
923             status = false;
924             result = status;
925             worker.wakeup();
926             return result;
927         }
928     }
929
930     @Override
931     public void deleteAllFlows() {
932         logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
933         OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
934         OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
935         flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
936                 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
937         asyncFastSend(flowMod);
938     }
939 }