Clean all unused and redundant imports in controller.
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.nio.channels.SelectionKey;
12 import java.nio.channels.Selector;
13 import java.nio.channels.SocketChannel;
14 import java.nio.channels.spi.SelectorProvider;
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.openflow.protocol.OFBarrierReply;
39 import org.openflow.protocol.OFBarrierRequest;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFError;
42 import org.openflow.protocol.OFFeaturesReply;
43 import org.openflow.protocol.OFFlowMod;
44 import org.openflow.protocol.OFGetConfigReply;
45 import org.openflow.protocol.OFMatch;
46 import org.openflow.protocol.OFMessage;
47 import org.openflow.protocol.OFPhysicalPort;
48 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
49 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
50 import org.openflow.protocol.OFPhysicalPort.OFPortState;
51 import org.openflow.protocol.OFPort;
52 import org.openflow.protocol.OFPortStatus;
53 import org.openflow.protocol.OFPortStatus.OFPortReason;
54 import org.openflow.protocol.OFSetConfig;
55 import org.openflow.protocol.OFStatisticsReply;
56 import org.openflow.protocol.OFStatisticsRequest;
57 import org.openflow.protocol.OFType;
58 import org.openflow.protocol.factory.BasicFactory;
59 import org.openflow.util.HexString;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class SwitchHandler implements ISwitch {
64     private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
65     private static final int SWITCH_LIVENESS_TIMER = 5000;
66     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
67     private final int MESSAGE_RESPONSE_TIMER = 2000;
68
69     private final String instanceName;
70     private final ISwitch thisISwitch;
71     private final IController core;
72     private Long sid;
73     private Integer buffers;
74     private Integer capabilities;
75     private Byte tables;
76     private Integer actions;
77     private Selector selector;
78     private final SocketChannel socket;
79     private final BasicFactory factory;
80     private final AtomicInteger xid;
81     private SwitchState state;
82     private Timer periodicTimer;
83     private final Map<Short, OFPhysicalPort> physicalPorts;
84     private final Map<Short, Integer> portBandwidth;
85     private final Date connectedDate;
86     private Long lastMsgReceivedTimeStamp;
87     private Boolean probeSent;
88     private final ExecutorService executor;
89     private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
90     private boolean running;
91     private IMessageReadWrite msgReadWriteService;
92     private Thread switchHandlerThread;
93     private Integer responseTimerValue;
94     private PriorityBlockingQueue<PriorityMessage> transmitQ;
95     private Thread transmitThread;
96
97     private enum SwitchState {
98         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
99
100         private int value;
101
102         private SwitchState(int value) {
103             this.value = value;
104         }
105
106         @SuppressWarnings("unused")
107         public int value() {
108             return this.value;
109         }
110     }
111
112     public SwitchHandler(Controller core, SocketChannel sc, String name) {
113         this.instanceName = name;
114         this.thisISwitch = this;
115         this.sid = (long) 0;
116         this.buffers = (int) 0;
117         this.capabilities = (int) 0;
118         this.tables = (byte) 0;
119         this.actions = (int) 0;
120         this.core = core;
121         this.socket = sc;
122         this.factory = new BasicFactory();
123         this.connectedDate = new Date();
124         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
125         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126         this.portBandwidth = new HashMap<Short, Integer>();
127         this.state = SwitchState.NON_OPERATIONAL;
128         this.probeSent = false;
129         this.xid = new AtomicInteger(this.socket.hashCode());
130         this.periodicTimer = null;
131         this.executor = Executors.newFixedThreadPool(4);
132         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
133         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
134         String rTimer = System.getProperty("of.messageResponseTimer");
135         if (rTimer != null) {
136             try {
137                 responseTimerValue = Integer.decode(rTimer);
138             } catch (NumberFormatException e) {
139                 logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
140             }
141         }
142     }
143
144     public void start() {
145         try {
146             startTransmitThread();
147             setupCommChannel();
148             sendFirstHello();
149             startHandlerThread();
150         } catch (Exception e) {
151             reportError(e);
152         }
153     }
154
155     private void startHandlerThread() {
156         switchHandlerThread = new Thread(new Runnable() {
157             @Override
158             public void run() {
159                 running = true;
160                 while (running) {
161                     try {
162                         // wait for an incoming connection
163                         selector.select(0);
164                         Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
165                         while (selectedKeys.hasNext()) {
166                             SelectionKey skey = selectedKeys.next();
167                             selectedKeys.remove();
168                             if (skey.isValid() && skey.isWritable()) {
169                                 resumeSend();
170                             }
171                             if (skey.isValid() && skey.isReadable()) {
172                                 handleMessages();
173                             }
174                         }
175                     } catch (Exception e) {
176                         reportError(e);
177                     }
178                 }
179             }
180         }, instanceName);
181         switchHandlerThread.start();
182     }
183
184     private void stopInternal() {
185         logger.debug("{} receives stop signal",
186                 (isOperational() ? HexString.toHexString(sid) : "unknown"));
187         running = false;
188         cancelSwitchTimer();
189         try {
190             selector.wakeup();
191             selector.close();
192         } catch (Exception e) {
193         }
194         try {
195             socket.close();
196         } catch (Exception e) {
197         }
198         try {
199             msgReadWriteService.stop();
200         } catch (Exception e) {
201         }
202         logger.debug("executor shutdown now");
203         executor.shutdownNow();
204
205         msgReadWriteService = null;
206     }
207
208     public void stop() {
209         stopInternal();
210
211         if (switchHandlerThread != null) {
212             switchHandlerThread.interrupt();
213         }
214         if (transmitThread != null) {
215             transmitThread.interrupt();
216         }
217     }
218
219     @Override
220     public int getNextXid() {
221         return this.xid.incrementAndGet();
222     }
223
224     /**
225      * This method puts the message in an outgoing priority queue with normal
226      * priority. It will be served after high priority messages. The method
227      * should be used for non-critical messages such as statistics request,
228      * discovery packets, etc. An unique XID is generated automatically and
229      * inserted into the message.
230      *
231      * @param msg
232      *            The OF message to be sent
233      * @return The XID used
234      */
235     @Override
236     public Integer asyncSend(OFMessage msg) {
237         return asyncSend(msg, getNextXid());
238     }
239
240     private Object syncSend(OFMessage msg, int xid) {
241         return syncMessageInternal(msg, xid, true);
242     }
243
244     /**
245      * This method puts the message in an outgoing priority queue with normal
246      * priority. It will be served after high priority messages. The method
247      * should be used for non-critical messages such as statistics request,
248      * discovery packets, etc. The specified XID is inserted into the message.
249      *
250      * @param msg
251      *            The OF message to be Sent
252      * @param xid
253      *            The XID to be used in the message
254      * @return The XID used
255      */
256     @Override
257     public Integer asyncSend(OFMessage msg, int xid) {
258         msg.setXid(xid);
259         if (transmitQ != null) {
260             transmitQ.add(new PriorityMessage(msg, 0));
261         }
262         return xid;
263     }
264
265     /**
266      * This method puts the message in an outgoing priority queue with high
267      * priority. It will be served first before normal priority messages. The
268      * method should be used for critical messages such as hello, echo reply
269      * etc. An unique XID is generated automatically and inserted into the
270      * message.
271      *
272      * @param msg
273      *            The OF message to be sent
274      * @return The XID used
275      */
276     @Override
277     public Integer asyncFastSend(OFMessage msg) {
278         return asyncFastSend(msg, getNextXid());
279     }
280
281     /**
282      * This method puts the message in an outgoing priority queue with high
283      * priority. It will be served first before normal priority messages. The
284      * method should be used for critical messages such as hello, echo reply
285      * etc. The specified XID is inserted into the message.
286      *
287      * @param msg
288      *            The OF message to be sent
289      * @return The XID used
290      */
291     @Override
292     public Integer asyncFastSend(OFMessage msg, int xid) {
293         msg.setXid(xid);
294         if (transmitQ != null) {
295             transmitQ.add(new PriorityMessage(msg, 1));
296         }
297         return xid;
298     }
299
300     public void resumeSend() {
301         try {
302             if (msgReadWriteService != null) {
303                 msgReadWriteService.resumeSend();
304             }
305         } catch (Exception e) {
306             reportError(e);
307         }
308     }
309
310     /**
311      * This method bypasses the transmit queue and sends the message over the
312      * socket directly. If the input xid is not null, the specified xid is
313      * inserted into the message. Otherwise, an unique xid is generated
314      * automatically and inserted into the message.
315      *
316      * @param msg
317      *            Message to be sent
318      * @param xid
319      *            Message xid
320      */
321     private void asyncSendNow(OFMessage msg, Integer xid) {
322         if (xid == null) {
323             xid = getNextXid();
324         }
325         msg.setXid(xid);
326
327         asyncSendNow(msg);
328     }
329
330     /**
331      * This method bypasses the transmit queue and sends the message over the
332      * socket directly.
333      *
334      * @param msg
335      *            Message to be sent
336      */
337     private void asyncSendNow(OFMessage msg) {
338         if (msgReadWriteService == null) {
339             logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
340             return;
341         }
342
343         try {
344             msgReadWriteService.asyncSend(msg);
345         } catch (Exception e) {
346             reportError(e);
347         }
348     }
349
350     public void handleMessages() {
351         List<OFMessage> msgs = null;
352
353         try {
354             if (msgReadWriteService != null) {
355                 msgs = msgReadWriteService.readMessages();
356             }
357         } catch (Exception e) {
358             reportError(e);
359         }
360
361         if (msgs == null) {
362             logger.info("{} is down", this);
363             reportSwitchStateChange(false);
364             return;
365         }
366         for (OFMessage msg : msgs) {
367             logger.trace("Message received: {}", msg);
368             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
369             OFType type = msg.getType();
370             switch (type) {
371             case HELLO:
372                 // send feature request
373                 OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
374                 asyncFastSend(featureRequest);
375                 this.state = SwitchState.WAIT_FEATURES_REPLY;
376                 startSwitchTimer();
377                 break;
378             case ECHO_REQUEST:
379                 OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
380                 // respond immediately
381                 asyncSendNow(echoReply, msg.getXid());
382                 break;
383             case ECHO_REPLY:
384                 this.probeSent = false;
385                 break;
386             case FEATURES_REPLY:
387                 processFeaturesReply((OFFeaturesReply) msg);
388                 break;
389             case GET_CONFIG_REPLY:
390                 // make sure that the switch can send the whole packet to the
391                 // controller
392                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
393                     this.state = SwitchState.OPERATIONAL;
394                 }
395                 break;
396             case BARRIER_REPLY:
397                 processBarrierReply((OFBarrierReply) msg);
398                 break;
399             case ERROR:
400                 processErrorReply((OFError) msg);
401                 break;
402             case PORT_STATUS:
403                 processPortStatusMsg((OFPortStatus) msg);
404                 break;
405             case STATS_REPLY:
406                 processStatsReply((OFStatisticsReply) msg);
407                 break;
408             case PACKET_IN:
409                 break;
410             default:
411                 break;
412             } // end of switch
413             if (isOperational()) {
414                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
415             }
416         } // end of for
417     }
418
419     private void processPortStatusMsg(OFPortStatus msg) {
420         OFPhysicalPort port = msg.getDesc();
421         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
422             updatePhysicalPort(port);
423         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
424             updatePhysicalPort(port);
425         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
426             deletePhysicalPort(port);
427         }
428
429     }
430
431     private void startSwitchTimer() {
432         this.periodicTimer = new Timer();
433         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
434             @Override
435             public void run() {
436                 try {
437                     Long now = System.currentTimeMillis();
438                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
439                         if (probeSent) {
440                             // switch failed to respond to our probe, consider
441                             // it down
442                             logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
443                                     .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
444                                     : HexString.toHexString(sid));
445                             reportSwitchStateChange(false);
446                         } else {
447                             // send a probe to see if the switch is still alive
448                             logger.debug("Send idle probe (Echo Request) to {}", this);
449                             probeSent = true;
450                             OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
451                             asyncFastSend(echo);
452                         }
453                     } else {
454                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
455                             // send another features request
456                             OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
457                             asyncFastSend(request);
458                         } else {
459                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
460                                 // send another config request
461                                 OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
462                                 config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
463                                 asyncFastSend(config);
464                                 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
465                                 asyncFastSend(getConfig);
466                             }
467                         }
468                     }
469                 } catch (Exception e) {
470                     reportError(e);
471                 }
472             }
473         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
474     }
475
476     private void cancelSwitchTimer() {
477         if (this.periodicTimer != null) {
478             this.periodicTimer.cancel();
479         }
480     }
481
482     private void reportError(Exception e) {
483         if (!running) {
484             logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
485                     (isOperational() ? HexString.toHexString(sid) : "unknown"));
486             return;
487         }
488         logger.debug("Caught exception: ", e);
489
490         // notify core of this error event and disconnect the switch
491         ((Controller) core).takeSwitchEventError(this);
492
493         // clean up some internal states immediately
494         stopInternal();
495     }
496
497     private void reportSwitchStateChange(boolean added) {
498         if (added) {
499             ((Controller) core).takeSwitchEventAdd(this);
500         } else {
501             ((Controller) core).takeSwitchEventDelete(this);
502         }
503     }
504
505     @Override
506     public Long getId() {
507         return this.sid;
508     }
509
510     private void processFeaturesReply(OFFeaturesReply reply) {
511         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
512             this.sid = reply.getDatapathId();
513             this.buffers = reply.getBuffers();
514             this.capabilities = reply.getCapabilities();
515             this.tables = reply.getTables();
516             this.actions = reply.getActions();
517             // notify core of this error event
518             for (OFPhysicalPort port : reply.getPorts()) {
519                 updatePhysicalPort(port);
520             }
521             // config the switch to send full data packet
522             OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
523             config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
524             asyncFastSend(config);
525             // send config request to make sure the switch can handle the set
526             // config
527             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
528             asyncFastSend(getConfig);
529             this.state = SwitchState.WAIT_CONFIG_REPLY;
530             // inform core that a new switch is now operational
531             reportSwitchStateChange(true);
532         }
533     }
534
535     private void updatePhysicalPort(OFPhysicalPort port) {
536         Short portNumber = port.getPortNumber();
537         physicalPorts.put(portNumber, port);
538         portBandwidth
539                 .put(portNumber,
540                         port.getCurrentFeatures()
541                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
542                                         | OFPortFeatures.OFPPF_100MB_FD.getValue()
543                                         | OFPortFeatures.OFPPF_100MB_HD.getValue()
544                                         | OFPortFeatures.OFPPF_1GB_FD.getValue()
545                                         | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
546                                             .getValue()));
547     }
548
549     private void deletePhysicalPort(OFPhysicalPort port) {
550         Short portNumber = port.getPortNumber();
551         physicalPorts.remove(portNumber);
552         portBandwidth.remove(portNumber);
553     }
554
555     @Override
556     public boolean isOperational() {
557         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
558     }
559
560     @Override
561     public String toString() {
562         try {
563             return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
564                     .toHexString(this.sid) : "unknown"));
565         } catch (Exception e) {
566             return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
567         }
568
569     }
570
571     @Override
572     public Date getConnectedDate() {
573         return this.connectedDate;
574     }
575
576     public String getInstanceName() {
577         return instanceName;
578     }
579
580     @Override
581     public Object getStatistics(OFStatisticsRequest req) {
582         int xid = getNextXid();
583         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
584         messageWaitingDone.put(xid, worker);
585         Future<Object> submit;
586         Object result = null;
587         try {
588             submit = executor.submit(worker);
589         } catch (RejectedExecutionException re) {
590             messageWaitingDone.remove(xid);
591             return result;
592         }
593         try {
594             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
595             return result;
596         } catch (Exception e) {
597             logger.warn("Timeout while waiting for {} replies from {}",
598                     req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
599             result = null; // to indicate timeout has occurred
600             worker.wakeup();
601             return result;
602         }
603     }
604
605     @Override
606     public Object syncSend(OFMessage msg) {
607         if (!running) {
608             logger.debug("Switch is going down, ignore syncSend");
609             return null;
610         }
611         int xid = getNextXid();
612         return syncSend(msg, xid);
613     }
614
615     /*
616      * Either a BarrierReply or a OFError is received. If this is a reply for an
617      * outstanding sync message, wake up associated task so that it can continue
618      */
619     private void processBarrierReply(OFBarrierReply msg) {
620         Integer xid = msg.getXid();
621         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
622         if (worker == null) {
623             return;
624         }
625         worker.wakeup();
626     }
627
628     private void processErrorReply(OFError errorMsg) {
629         OFMessage offendingMsg = errorMsg.getOffendingMsg();
630         Integer xid;
631         if (offendingMsg != null) {
632             xid = offendingMsg.getXid();
633         } else {
634             xid = errorMsg.getXid();
635         }
636         /*
637          * the error can be a reply to a synchronous message or to a statistic
638          * request message
639          */
640         Callable<?> worker = messageWaitingDone.remove(xid);
641         if (worker == null) {
642             return;
643         }
644         if (worker instanceof SynchronousMessage) {
645             ((SynchronousMessage) worker).wakeup(errorMsg);
646         } else {
647             ((StatisticsCollector) worker).wakeup(errorMsg);
648         }
649     }
650
651     private void processStatsReply(OFStatisticsReply reply) {
652         Integer xid = reply.getXid();
653         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
654         if (worker == null) {
655             return;
656         }
657         if (worker.collect(reply)) {
658             // if all the stats records are received (collect() returns true)
659             // then we are done.
660             messageWaitingDone.remove(xid);
661             worker.wakeup();
662         }
663     }
664
665     @Override
666     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
667         return this.physicalPorts;
668     }
669
670     @Override
671     public OFPhysicalPort getPhysicalPort(Short portNumber) {
672         return this.physicalPorts.get(portNumber);
673     }
674
675     @Override
676     public Integer getPortBandwidth(Short portNumber) {
677         return this.portBandwidth.get(portNumber);
678     }
679
680     @Override
681     public Set<Short> getPorts() {
682         return this.physicalPorts.keySet();
683     }
684
685     @Override
686     public Byte getTables() {
687         return this.tables;
688     }
689
690     @Override
691     public Integer getActions() {
692         return this.actions;
693     }
694
695     @Override
696     public Integer getCapabilities() {
697         return this.capabilities;
698     }
699
700     @Override
701     public Integer getBuffers() {
702         return this.buffers;
703     }
704
705     @Override
706     public boolean isPortEnabled(short portNumber) {
707         return isPortEnabled(physicalPorts.get(portNumber));
708     }
709
710     @Override
711     public boolean isPortEnabled(OFPhysicalPort port) {
712         if (port == null) {
713             return false;
714         }
715         int portConfig = port.getConfig();
716         int portState = port.getState();
717         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
718             return false;
719         }
720         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
721             return false;
722         }
723         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
724             return false;
725         }
726         return true;
727     }
728
729     @Override
730     public List<OFPhysicalPort> getEnabledPorts() {
731         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
732         synchronized (this.physicalPorts) {
733             for (OFPhysicalPort port : physicalPorts.values()) {
734                 if (isPortEnabled(port)) {
735                     result.add(port);
736                 }
737             }
738         }
739         return result;
740     }
741
742     /*
743      * Transmit thread polls the message out of the priority queue and invokes
744      * messaging service to transmit it over the socket channel
745      */
746     class PriorityMessageTransmit implements Runnable {
747         @Override
748         public void run() {
749             running = true;
750             while (running) {
751                 try {
752                     PriorityMessage pmsg = transmitQ.take();
753                     msgReadWriteService.asyncSend(pmsg.msg);
754                     /*
755                      * If syncReply is set to true, wait for the response back.
756                      */
757                     if (pmsg.syncReply) {
758                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
759                     }
760                 } catch (InterruptedException ie) {
761                     reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
762                 } catch (Exception e) {
763                     reportError(e);
764                 }
765             }
766             transmitQ = null;
767         }
768     }
769
770     /*
771      * Setup and start the transmit thread
772      */
773     private void startTransmitThread() {
774         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
775             @Override
776             public int compare(PriorityMessage p1, PriorityMessage p2) {
777                 if (p2.priority != p1.priority) {
778                     return p2.priority - p1.priority;
779                 } else {
780                     return (p2.seqNum < p1.seqNum) ? 1 : -1;
781                 }
782             }
783         });
784         this.transmitThread = new Thread(new PriorityMessageTransmit());
785         this.transmitThread.start();
786     }
787
788     /*
789      * Setup communication services
790      */
791     private void setupCommChannel() throws Exception {
792         this.selector = SelectorProvider.provider().openSelector();
793         this.socket.configureBlocking(false);
794         this.socket.socket().setTcpNoDelay(true);
795         this.msgReadWriteService = getMessageReadWriteService();
796     }
797
798     private void sendFirstHello() {
799         try {
800             OFMessage msg = factory.getMessage(OFType.HELLO);
801             asyncFastSend(msg);
802         } catch (Exception e) {
803             reportError(e);
804         }
805     }
806
807     private IMessageReadWrite getMessageReadWriteService() throws Exception {
808         String str = System.getProperty("secureChannelEnabled");
809         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
810                 selector) : new MessageReadWriteService(socket, selector);
811     }
812
813     /**
814      * Send Barrier message synchronously. The caller will be blocked until the
815      * Barrier reply is received.
816      */
817     @Override
818     public Object syncSendBarrierMessage() {
819         OFBarrierRequest barrierMsg = new OFBarrierRequest();
820         return syncSend(barrierMsg);
821     }
822
823     /**
824      * Send Barrier message asynchronously. The caller is not blocked. The
825      * Barrier message will be sent in a transmit thread which will be blocked
826      * until the Barrier reply is received.
827      */
828     @Override
829     public Object asyncSendBarrierMessage() {
830         if (transmitQ == null) {
831             return Boolean.FALSE;
832         }
833
834         OFBarrierRequest barrierMsg = new OFBarrierRequest();
835         int xid = getNextXid();
836
837         barrierMsg.setXid(xid);
838         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
839
840         return Boolean.TRUE;
841     }
842
843     /**
844      * This method returns the switch liveness timeout value. If controller did
845      * not receive any message from the switch for such a long period,
846      * controller will tear down the connection to the switch.
847      *
848      * @return The timeout value
849      */
850     private static int getSwitchLivenessTimeout() {
851         String timeout = System.getProperty("of.switchLivenessTimeout");
852         int rv = 60500;
853
854         try {
855             if (timeout != null) {
856                 rv = Integer.parseInt(timeout);
857             }
858         } catch (Exception e) {
859         }
860
861         return rv;
862     }
863
864     /**
865      * This method performs synchronous operations for a given message. If
866      * syncRequest is set to true, the message will be sent out followed by a
867      * Barrier request message. Then it's blocked until the Barrier rely arrives
868      * or timeout. If syncRequest is false, it simply skips the message send and
869      * just waits for the response back.
870      *
871      * @param msg
872      *            Message to be sent
873      * @param xid
874      *            Message XID
875      * @param request
876      *            If set to true, the message the message will be sent out
877      *            followed by a Barrier request message. If set to false, it
878      *            simply skips the sending and just waits for the Barrier reply.
879      * @return the result
880      */
881     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
882         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
883         messageWaitingDone.put(xid, worker);
884         Object result = null;
885         Boolean status = false;
886         Future<Object> submit;
887         try {
888            submit = executor.submit(worker);
889         } catch (RejectedExecutionException re) {
890             messageWaitingDone.remove(xid);
891             return result;
892         }
893         try {
894             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
895             messageWaitingDone.remove(xid);
896             if (result == null) {
897                 // if result is null, then it means the switch can handle this
898                 // message successfully
899                 // convert the result into a Boolean with value true
900                 status = true;
901                 // logger.debug("Successfully send " +
902                 // msg.getType().toString());
903                 result = status;
904             } else {
905                 // if result is not null, this means the switch can't handle
906                 // this message
907                 // the result if OFError already
908                 if (logger.isDebugEnabled()) {
909                     logger.debug("Send {} failed --> {}", msg.getType(), (result));
910                 }
911             }
912             return result;
913         } catch (Exception e) {
914             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
915             // convert the result into a Boolean with value false
916             status = false;
917             result = status;
918             worker.wakeup();
919             return result;
920         }
921     }
922
923     @Override
924     public void deleteAllFlows() {
925         logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
926         OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
927         OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
928         flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
929                 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
930         asyncFastSend(flowMod);
931     }
932 }