Merge "1. Controller switchEvents queue should be priority based. The queue holds...
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.nio.channels.SelectionKey;
12 import java.nio.channels.Selector;
13 import java.nio.channels.SocketChannel;
14 import java.nio.channels.spi.SelectorProvider;
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.openflow.protocol.OFBarrierReply;
39 import org.openflow.protocol.OFBarrierRequest;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFError;
42 import org.openflow.protocol.OFFeaturesReply;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFGetConfigReply;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFMessage;
47 import org.openflow.protocol.OFPhysicalPort;
48 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
49 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
50 import org.openflow.protocol.OFPhysicalPort.OFPortState;
51 import org.openflow.protocol.OFPort;
52 import org.openflow.protocol.OFPortStatus;
53 import org.openflow.protocol.OFPortStatus.OFPortReason;
54 import org.openflow.protocol.OFSetConfig;
55 import org.openflow.protocol.OFStatisticsReply;
56 import org.openflow.protocol.OFStatisticsRequest;
57 import org.openflow.protocol.OFType;
58 import org.openflow.protocol.factory.BasicFactory;
59 import org.openflow.util.HexString;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class SwitchHandler implements ISwitch {
64     private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
65     private static final int SWITCH_LIVENESS_TIMER = 5000;
66     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
67     private final int MESSAGE_RESPONSE_TIMER = 2000;
68
69     private final String instanceName;
70     private final ISwitch thisISwitch;
71     private final IController core;
72     private Long sid;
73     private Integer buffers;
74     private Integer capabilities;
75     private Byte tables;
76     private Integer actions;
77     private Selector selector;
78     private final SocketChannel socket;
79     private final BasicFactory factory;
80     private final AtomicInteger xid;
81     private SwitchState state;
82     private Timer periodicTimer;
83     private final Map<Short, OFPhysicalPort> physicalPorts;
84     private final Map<Short, Integer> portBandwidth;
85     private final Date connectedDate;
86     private Long lastMsgReceivedTimeStamp;
87     private Boolean probeSent;
88     private final ExecutorService executor;
89     private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
90     private boolean running;
91     private IMessageReadWrite msgReadWriteService;
92     private Thread switchHandlerThread;
93     private Integer responseTimerValue;
94     private PriorityBlockingQueue<PriorityMessage> transmitQ;
95     private Thread transmitThread;
96
97     private enum SwitchState {
98         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
99
100         private int value;
101
102         private SwitchState(int value) {
103             this.value = value;
104         }
105
106         @SuppressWarnings("unused")
107         public int value() {
108             return this.value;
109         }
110     }
111
112     public SwitchHandler(Controller core, SocketChannel sc, String name) {
113         this.instanceName = name;
114         this.thisISwitch = this;
115         this.sid = (long) 0;
116         this.buffers = (int) 0;
117         this.capabilities = (int) 0;
118         this.tables = (byte) 0;
119         this.actions = (int) 0;
120         this.core = core;
121         this.socket = sc;
122         this.factory = new BasicFactory();
123         this.connectedDate = new Date();
124         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
125         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126         this.portBandwidth = new HashMap<Short, Integer>();
127         this.state = SwitchState.NON_OPERATIONAL;
128         this.probeSent = false;
129         this.xid = new AtomicInteger(this.socket.hashCode());
130         this.periodicTimer = null;
131         this.executor = Executors.newFixedThreadPool(4);
132         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
133         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
134         String rTimer = System.getProperty("of.messageResponseTimer");
135         if (rTimer != null) {
136             try {
137                 responseTimerValue = Integer.decode(rTimer);
138             } catch (NumberFormatException e) {
139                 logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
140             }
141         }
142     }
143
144     public void start() {
145         try {
146             startTransmitThread();
147             setupCommChannel();
148             sendFirstHello();
149             startHandlerThread();
150         } catch (Exception e) {
151             reportError(e);
152         }
153     }
154
155     private void startHandlerThread() {
156         switchHandlerThread = new Thread(new Runnable() {
157             @Override
158             public void run() {
159                 running = true;
160                 while (running) {
161                     try {
162                         // wait for an incoming connection
163                         selector.select(0);
164                         Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
165                         while (selectedKeys.hasNext()) {
166                             SelectionKey skey = selectedKeys.next();
167                             selectedKeys.remove();
168                             if (skey.isValid() && skey.isWritable()) {
169                                 resumeSend();
170                             }
171                             if (skey.isValid() && skey.isReadable()) {
172                                 handleMessages();
173                             }
174                         }
175                     } catch (Exception e) {
176                         reportError(e);
177                     }
178                 }
179             }
180         }, instanceName);
181         switchHandlerThread.start();
182     }
183
184     private void stopInternal() {
185         logger.debug("{} receives stop signal",
186                 (isOperational() ? HexString.toHexString(sid) : "unknown"));
187         running = false;
188         cancelSwitchTimer();
189         try {
190             selector.wakeup();
191             selector.close();
192         } catch (Exception e) {
193         }
194         try {
195             socket.close();
196         } catch (Exception e) {
197         }
198         try {
199             msgReadWriteService.stop();
200         } catch (Exception e) {
201         }
202         logger.debug("executor shutdown now");
203         executor.shutdownNow();
204
205         msgReadWriteService = null;
206     }
207
208     public void stop() {
209         stopInternal();
210
211         if (switchHandlerThread != null) {
212             switchHandlerThread.interrupt();
213         }
214         if (transmitThread != null) {
215             transmitThread.interrupt();
216         }
217     }
218
219     @Override
220     public int getNextXid() {
221         return this.xid.incrementAndGet();
222     }
223
224     /**
225      * This method puts the message in an outgoing priority queue with normal
226      * priority. It will be served after high priority messages. The method
227      * should be used for non-critical messages such as statistics request,
228      * discovery packets, etc. An unique XID is generated automatically and
229      * inserted into the message.
230      *
231      * @param msg
232      *            The OF message to be sent
233      * @return The XID used
234      */
235     @Override
236     public Integer asyncSend(OFMessage msg) {
237         return asyncSend(msg, getNextXid());
238     }
239
240     private Object syncSend(OFMessage msg, int xid) {
241         return syncMessageInternal(msg, xid, true);
242     }
243
244     /**
245      * This method puts the message in an outgoing priority queue with normal
246      * priority. It will be served after high priority messages. The method
247      * should be used for non-critical messages such as statistics request,
248      * discovery packets, etc. The specified XID is inserted into the message.
249      *
250      * @param msg
251      *            The OF message to be Sent
252      * @param xid
253      *            The XID to be used in the message
254      * @return The XID used
255      */
256     @Override
257     public Integer asyncSend(OFMessage msg, int xid) {
258         msg.setXid(xid);
259         if (transmitQ != null) {
260             transmitQ.add(new PriorityMessage(msg, 0));
261         }
262         return xid;
263     }
264
265     /**
266      * This method puts the message in an outgoing priority queue with high
267      * priority. It will be served first before normal priority messages. The
268      * method should be used for critical messages such as hello, echo reply
269      * etc. An unique XID is generated automatically and inserted into the
270      * message.
271      *
272      * @param msg
273      *            The OF message to be sent
274      * @return The XID used
275      */
276     @Override
277     public Integer asyncFastSend(OFMessage msg) {
278         return asyncFastSend(msg, getNextXid());
279     }
280
281     /**
282      * This method puts the message in an outgoing priority queue with high
283      * priority. It will be served first before normal priority messages. The
284      * method should be used for critical messages such as hello, echo reply
285      * etc. The specified XID is inserted into the message.
286      *
287      * @param msg
288      *            The OF message to be sent
289      * @return The XID used
290      */
291     @Override
292     public Integer asyncFastSend(OFMessage msg, int xid) {
293         msg.setXid(xid);
294         if (transmitQ != null) {
295             transmitQ.add(new PriorityMessage(msg, 1));
296         }
297         return xid;
298     }
299
300     public void resumeSend() {
301         try {
302             if (msgReadWriteService != null) {
303                 msgReadWriteService.resumeSend();
304             }
305         } catch (Exception e) {
306             reportError(e);
307         }
308     }
309
310     /**
311      * This method bypasses the transmit queue and sends the message over the
312      * socket directly. If the input xid is not null, the specified xid is
313      * inserted into the message. Otherwise, an unique xid is generated
314      * automatically and inserted into the message.
315      *
316      * @param msg
317      *            Message to be sent
318      * @param xid
319      *            Message xid
320      */
321     private void asyncSendNow(OFMessage msg, Integer xid) {
322         if (xid == null) {
323             xid = getNextXid();
324         }
325         msg.setXid(xid);
326
327         asyncSendNow(msg);
328     }
329
330     /**
331      * This method bypasses the transmit queue and sends the message over the
332      * socket directly.
333      *
334      * @param msg
335      *            Message to be sent
336      */
337     private void asyncSendNow(OFMessage msg) {
338         if (msgReadWriteService == null) {
339             logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
340             return;
341         }
342
343         try {
344             msgReadWriteService.asyncSend(msg);
345         } catch (Exception e) {
346             reportError(e);
347         }
348     }
349
350     public void handleMessages() {
351         List<OFMessage> msgs = null;
352
353         try {
354             if (msgReadWriteService != null) {
355                 msgs = msgReadWriteService.readMessages();
356             }
357         } catch (Exception e) {
358             reportError(e);
359         }
360
361         if (msgs == null) {
362             logger.info("{} is down", this);
363             reportSwitchStateChange(false);
364             return;
365         }
366         for (OFMessage msg : msgs) {
367             logger.trace("Message received: {}", msg);
368             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
369             OFType type = msg.getType();
370             switch (type) {
371             case HELLO:
372                 // send feature request
373                 OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
374                 asyncFastSend(featureRequest);
375                 // delete all pre-existing flows
376                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
377                 OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
378                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
379                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
380                 asyncFastSend(flowMod);
381                 this.state = SwitchState.WAIT_FEATURES_REPLY;
382                 startSwitchTimer();
383                 break;
384             case ECHO_REQUEST:
385                 OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
386                 // respond immediately
387                 asyncSendNow(echoReply, msg.getXid());
388                 break;
389             case ECHO_REPLY:
390                 this.probeSent = false;
391                 break;
392             case FEATURES_REPLY:
393                 processFeaturesReply((OFFeaturesReply) msg);
394                 break;
395             case GET_CONFIG_REPLY:
396                 // make sure that the switch can send the whole packet to the
397                 // controller
398                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
399                     this.state = SwitchState.OPERATIONAL;
400                 }
401                 break;
402             case BARRIER_REPLY:
403                 processBarrierReply((OFBarrierReply) msg);
404                 break;
405             case ERROR:
406                 processErrorReply((OFError) msg);
407                 break;
408             case PORT_STATUS:
409                 processPortStatusMsg((OFPortStatus) msg);
410                 break;
411             case STATS_REPLY:
412                 processStatsReply((OFStatisticsReply) msg);
413                 break;
414             case PACKET_IN:
415                 break;
416             default:
417                 break;
418             } // end of switch
419             if (isOperational()) {
420                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
421             }
422         } // end of for
423     }
424
425     private void processPortStatusMsg(OFPortStatus msg) {
426         OFPhysicalPort port = msg.getDesc();
427         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
428             updatePhysicalPort(port);
429         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
430             updatePhysicalPort(port);
431         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
432             deletePhysicalPort(port);
433         }
434
435     }
436
437     private void startSwitchTimer() {
438         this.periodicTimer = new Timer();
439         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
440             @Override
441             public void run() {
442                 try {
443                     Long now = System.currentTimeMillis();
444                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
445                         if (probeSent) {
446                             // switch failed to respond to our probe, consider
447                             // it down
448                             logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
449                                     .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
450                                     : HexString.toHexString(sid));
451                             reportSwitchStateChange(false);
452                         } else {
453                             // send a probe to see if the switch is still alive
454                             logger.debug("Send idle probe (Echo Request) to {}", this);
455                             probeSent = true;
456                             OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
457                             asyncFastSend(echo);
458                         }
459                     } else {
460                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
461                             // send another features request
462                             OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
463                             asyncFastSend(request);
464                         } else {
465                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
466                                 // send another config request
467                                 OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
468                                 config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
469                                 asyncFastSend(config);
470                                 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
471                                 asyncFastSend(getConfig);
472                             }
473                         }
474                     }
475                 } catch (Exception e) {
476                     reportError(e);
477                 }
478             }
479         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
480     }
481
482     private void cancelSwitchTimer() {
483         if (this.periodicTimer != null) {
484             this.periodicTimer.cancel();
485         }
486     }
487
488     private void reportError(Exception e) {
489         if (!running) {
490             logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
491                     (isOperational() ? HexString.toHexString(sid) : "unknown"));
492             return;
493         }
494         logger.debug("Caught exception: ", e);
495
496         // notify core of this error event and disconnect the switch
497         ((Controller) core).takeSwitchEventError(this);
498
499         // clean up some internal states immediately
500         stopInternal();
501     }
502
503     private void reportSwitchStateChange(boolean added) {
504         if (added) {
505             ((Controller) core).takeSwitchEventAdd(this);
506         } else {
507             ((Controller) core).takeSwitchEventDelete(this);
508         }
509     }
510
511     @Override
512     public Long getId() {
513         return this.sid;
514     }
515
516     private void processFeaturesReply(OFFeaturesReply reply) {
517         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
518             this.sid = reply.getDatapathId();
519             this.buffers = reply.getBuffers();
520             this.capabilities = reply.getCapabilities();
521             this.tables = reply.getTables();
522             this.actions = reply.getActions();
523             // notify core of this error event
524             for (OFPhysicalPort port : reply.getPorts()) {
525                 updatePhysicalPort(port);
526             }
527             // config the switch to send full data packet
528             OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
529             config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
530             asyncFastSend(config);
531             // send config request to make sure the switch can handle the set
532             // config
533             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
534             asyncFastSend(getConfig);
535             this.state = SwitchState.WAIT_CONFIG_REPLY;
536             // inform core that a new switch is now operational
537             reportSwitchStateChange(true);
538         }
539     }
540
541     private void updatePhysicalPort(OFPhysicalPort port) {
542         Short portNumber = port.getPortNumber();
543         physicalPorts.put(portNumber, port);
544         portBandwidth
545                 .put(portNumber,
546                         port.getCurrentFeatures()
547                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
548                                         | OFPortFeatures.OFPPF_100MB_FD.getValue()
549                                         | OFPortFeatures.OFPPF_100MB_HD.getValue()
550                                         | OFPortFeatures.OFPPF_1GB_FD.getValue()
551                                         | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
552                                             .getValue()));
553     }
554
555     private void deletePhysicalPort(OFPhysicalPort port) {
556         Short portNumber = port.getPortNumber();
557         physicalPorts.remove(portNumber);
558         portBandwidth.remove(portNumber);
559     }
560
561     @Override
562     public boolean isOperational() {
563         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
564     }
565
566     @Override
567     public String toString() {
568         try {
569             return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
570                     .toHexString(this.sid) : "unknown"));
571         } catch (Exception e) {
572             return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
573         }
574
575     }
576
577     @Override
578     public Date getConnectedDate() {
579         return this.connectedDate;
580     }
581
582     public String getInstanceName() {
583         return instanceName;
584     }
585
586     @Override
587     public Object getStatistics(OFStatisticsRequest req) {
588         int xid = getNextXid();
589         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
590         messageWaitingDone.put(xid, worker);
591         Future<Object> submit;
592         Object result = null;
593         try {
594             submit = executor.submit(worker);
595         } catch (RejectedExecutionException re) {
596             messageWaitingDone.remove(xid);
597             return result;
598         }
599         try {
600             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
601             return result;
602         } catch (Exception e) {
603             logger.warn("Timeout while waiting for {} replies from {}",
604                     req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
605             result = null; // to indicate timeout has occurred
606             worker.wakeup();
607             return result;
608         }
609     }
610
611     @Override
612     public Object syncSend(OFMessage msg) {
613         if (!running) {
614             logger.debug("Switch is going down, ignore syncSend");
615             return null;
616         }
617         int xid = getNextXid();
618         return syncSend(msg, xid);
619     }
620
621     /*
622      * Either a BarrierReply or a OFError is received. If this is a reply for an
623      * outstanding sync message, wake up associated task so that it can continue
624      */
625     private void processBarrierReply(OFBarrierReply msg) {
626         Integer xid = msg.getXid();
627         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
628         if (worker == null) {
629             return;
630         }
631         worker.wakeup();
632     }
633
634     private void processErrorReply(OFError errorMsg) {
635         OFMessage offendingMsg = errorMsg.getOffendingMsg();
636         Integer xid;
637         if (offendingMsg != null) {
638             xid = offendingMsg.getXid();
639         } else {
640             xid = errorMsg.getXid();
641         }
642         /*
643          * the error can be a reply to a synchronous message or to a statistic
644          * request message
645          */
646         Callable<?> worker = messageWaitingDone.remove(xid);
647         if (worker == null) {
648             return;
649         }
650         if (worker instanceof SynchronousMessage) {
651             ((SynchronousMessage) worker).wakeup(errorMsg);
652         } else {
653             ((StatisticsCollector) worker).wakeup(errorMsg);
654         }
655     }
656
657     private void processStatsReply(OFStatisticsReply reply) {
658         Integer xid = reply.getXid();
659         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
660         if (worker == null) {
661             return;
662         }
663         if (worker.collect(reply)) {
664             // if all the stats records are received (collect() returns true)
665             // then we are done.
666             messageWaitingDone.remove(xid);
667             worker.wakeup();
668         }
669     }
670
671     @Override
672     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
673         return this.physicalPorts;
674     }
675
676     @Override
677     public OFPhysicalPort getPhysicalPort(Short portNumber) {
678         return this.physicalPorts.get(portNumber);
679     }
680
681     @Override
682     public Integer getPortBandwidth(Short portNumber) {
683         return this.portBandwidth.get(portNumber);
684     }
685
686     @Override
687     public Set<Short> getPorts() {
688         return this.physicalPorts.keySet();
689     }
690
691     @Override
692     public Byte getTables() {
693         return this.tables;
694     }
695
696     @Override
697     public Integer getActions() {
698         return this.actions;
699     }
700
701     @Override
702     public Integer getCapabilities() {
703         return this.capabilities;
704     }
705
706     @Override
707     public Integer getBuffers() {
708         return this.buffers;
709     }
710
711     @Override
712     public boolean isPortEnabled(short portNumber) {
713         return isPortEnabled(physicalPorts.get(portNumber));
714     }
715
716     @Override
717     public boolean isPortEnabled(OFPhysicalPort port) {
718         if (port == null) {
719             return false;
720         }
721         int portConfig = port.getConfig();
722         int portState = port.getState();
723         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
724             return false;
725         }
726         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
727             return false;
728         }
729         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
730             return false;
731         }
732         return true;
733     }
734
735     @Override
736     public List<OFPhysicalPort> getEnabledPorts() {
737         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
738         synchronized (this.physicalPorts) {
739             for (OFPhysicalPort port : physicalPorts.values()) {
740                 if (isPortEnabled(port)) {
741                     result.add(port);
742                 }
743             }
744         }
745         return result;
746     }
747
748     /*
749      * Transmit thread polls the message out of the priority queue and invokes
750      * messaging service to transmit it over the socket channel
751      */
752     class PriorityMessageTransmit implements Runnable {
753         @Override
754         public void run() {
755             running = true;
756             while (running) {
757                 try {
758                     PriorityMessage pmsg = transmitQ.take();
759                     msgReadWriteService.asyncSend(pmsg.msg);
760                     /*
761                      * If syncReply is set to true, wait for the response back.
762                      */
763                     if (pmsg.syncReply) {
764                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
765                     }
766                 } catch (InterruptedException ie) {
767                     reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
768                 } catch (Exception e) {
769                     reportError(e);
770                 }
771             }
772             transmitQ = null;
773         }
774     }
775
776     /*
777      * Setup and start the transmit thread
778      */
779     private void startTransmitThread() {
780         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
781             @Override
782             public int compare(PriorityMessage p1, PriorityMessage p2) {
783                 if (p2.priority != p1.priority) {
784                     return p2.priority - p1.priority;
785                 } else {
786                     return (p2.seqNum < p1.seqNum) ? 1 : -1;
787                 }
788             }
789         });
790         this.transmitThread = new Thread(new PriorityMessageTransmit());
791         this.transmitThread.start();
792     }
793
794     /*
795      * Setup communication services
796      */
797     private void setupCommChannel() throws Exception {
798         this.selector = SelectorProvider.provider().openSelector();
799         this.socket.configureBlocking(false);
800         this.socket.socket().setTcpNoDelay(true);
801         this.msgReadWriteService = getMessageReadWriteService();
802     }
803
804     private void sendFirstHello() {
805         try {
806             OFMessage msg = factory.getMessage(OFType.HELLO);
807             asyncFastSend(msg);
808         } catch (Exception e) {
809             reportError(e);
810         }
811     }
812
813     private IMessageReadWrite getMessageReadWriteService() throws Exception {
814         String str = System.getProperty("secureChannelEnabled");
815         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
816                 selector) : new MessageReadWriteService(socket, selector);
817     }
818
819     /**
820      * Send Barrier message synchronously. The caller will be blocked until the
821      * Barrier reply is received.
822      */
823     @Override
824     public Object syncSendBarrierMessage() {
825         OFBarrierRequest barrierMsg = new OFBarrierRequest();
826         return syncSend(barrierMsg);
827     }
828
829     /**
830      * Send Barrier message asynchronously. The caller is not blocked. The
831      * Barrier message will be sent in a transmit thread which will be blocked
832      * until the Barrier reply is received.
833      */
834     @Override
835     public Object asyncSendBarrierMessage() {
836         if (transmitQ == null) {
837             return Boolean.FALSE;
838         }
839
840         OFBarrierRequest barrierMsg = new OFBarrierRequest();
841         int xid = getNextXid();
842
843         barrierMsg.setXid(xid);
844         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
845
846         return Boolean.TRUE;
847     }
848
849     /**
850      * This method returns the switch liveness timeout value. If controller did
851      * not receive any message from the switch for such a long period,
852      * controller will tear down the connection to the switch.
853      *
854      * @return The timeout value
855      */
856     private static int getSwitchLivenessTimeout() {
857         String timeout = System.getProperty("of.switchLivenessTimeout");
858         int rv = 60500;
859
860         try {
861             if (timeout != null) {
862                 rv = Integer.parseInt(timeout);
863             }
864         } catch (Exception e) {
865         }
866
867         return rv;
868     }
869
870     /**
871      * This method performs synchronous operations for a given message. If
872      * syncRequest is set to true, the message will be sent out followed by a
873      * Barrier request message. Then it's blocked until the Barrier rely arrives
874      * or timeout. If syncRequest is false, it simply skips the message send and
875      * just waits for the response back.
876      *
877      * @param msg
878      *            Message to be sent
879      * @param xid
880      *            Message XID
881      * @param request
882      *            If set to true, the message the message will be sent out
883      *            followed by a Barrier request message. If set to false, it
884      *            simply skips the sending and just waits for the Barrier reply.
885      * @return the result
886      */
887     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
888         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
889         messageWaitingDone.put(xid, worker);
890         Object result = null;
891         Boolean status = false;
892         Future<Object> submit;
893         try {
894            submit = executor.submit(worker);
895         } catch (RejectedExecutionException re) {
896             messageWaitingDone.remove(xid);
897             return result;
898         }
899         try {
900             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
901             messageWaitingDone.remove(xid);
902             if (result == null) {
903                 // if result is null, then it means the switch can handle this
904                 // message successfully
905                 // convert the result into a Boolean with value true
906                 status = true;
907                 // logger.debug("Successfully send " +
908                 // msg.getType().toString());
909                 result = status;
910             } else {
911                 // if result is not null, this means the switch can't handle
912                 // this message
913                 // the result if OFError already
914                 if (logger.isDebugEnabled()) {
915                     logger.debug("Send {} failed --> {}", msg.getType(), (result));
916                 }
917             }
918             return result;
919         } catch (Exception e) {
920             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
921             // convert the result into a Boolean with value false
922             status = false;
923             result = status;
924             worker.wakeup();
925             return result;
926         }
927     }
928 }