Merge "Removed unused private variable containerAwareRegistration and unused import "
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedSelectorException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.nio.channels.spi.SelectorProvider;
19 import java.util.ArrayList;
20 import java.util.Comparator;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
40 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
41 import org.openflow.protocol.OFBarrierReply;
42 import org.openflow.protocol.OFBarrierRequest;
43 import org.openflow.protocol.OFEchoReply;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFFeaturesReply;
46 import org.openflow.protocol.OFFlowMod;
47 import org.openflow.protocol.OFGetConfigReply;
48 import org.openflow.protocol.OFMatch;
49 import org.openflow.protocol.OFMessage;
50 import org.openflow.protocol.OFPhysicalPort;
51 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
52 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
53 import org.openflow.protocol.OFPhysicalPort.OFPortState;
54 import org.openflow.protocol.OFPort;
55 import org.openflow.protocol.OFPortStatus;
56 import org.openflow.protocol.OFPortStatus.OFPortReason;
57 import org.openflow.protocol.OFSetConfig;
58 import org.openflow.protocol.OFStatisticsReply;
59 import org.openflow.protocol.OFStatisticsRequest;
60 import org.openflow.protocol.OFType;
61 import org.openflow.protocol.factory.BasicFactory;
62 import org.openflow.util.HexString;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class SwitchHandler implements ISwitch {
67     private static final Logger logger = LoggerFactory
68             .getLogger(SwitchHandler.class);
69     private static final int SWITCH_LIVENESS_TIMER = 5000;
70     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
71     private int MESSAGE_RESPONSE_TIMER = 2000;
72
73     private String instanceName;
74     private ISwitch thisISwitch;
75     private IController core;
76     private Long sid;
77     private Integer buffers;
78     private Integer capabilities;
79     private Byte tables;
80     private Integer actions;
81     private Selector selector;
82     private SocketChannel socket;
83     private BasicFactory factory;
84     private AtomicInteger xid;
85     private SwitchState state;
86     private Timer periodicTimer;
87     private Map<Short, OFPhysicalPort> physicalPorts;
88     private Map<Short, Integer> portBandwidth;
89     private Date connectedDate;
90     private Long lastMsgReceivedTimeStamp;
91     private Boolean probeSent;
92     private ExecutorService executor;
93     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
94     private boolean running;
95     private IMessageReadWrite msgReadWriteService;
96     private Thread switchHandlerThread;
97     private Integer responseTimerValue;
98     private PriorityBlockingQueue<PriorityMessage> transmitQ;
99     private Thread transmitThread;
100
101     private enum SwitchState {
102         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
103                 3);
104
105         private int value;
106
107         private SwitchState(int value) {
108             this.value = value;
109         }
110
111         @SuppressWarnings("unused")
112         public int value() {
113             return this.value;
114         }
115     }
116
117     public SwitchHandler(Controller core, SocketChannel sc, String name) {
118         this.instanceName = name;
119         this.thisISwitch = this;
120         this.sid = (long) 0;
121         this.buffers = (int) 0;
122         this.capabilities = (int) 0;
123         this.tables = (byte) 0;
124         this.actions = (int) 0;
125         this.core = core;
126         this.socket = sc;
127         this.factory = new BasicFactory();
128         this.connectedDate = new Date();
129         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
130         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
131         this.portBandwidth = new HashMap<Short, Integer>();
132         this.state = SwitchState.NON_OPERATIONAL;
133         this.probeSent = false;
134         this.xid = new AtomicInteger(this.socket.hashCode());
135         this.periodicTimer = null;
136         this.executor = Executors.newFixedThreadPool(4);
137         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139         String rTimer = System.getProperty("of.messageResponseTimer");
140         if (rTimer != null) {
141             try {
142                 responseTimerValue = Integer.decode(rTimer);
143             } catch (NumberFormatException e) {
144                 logger.warn(
145                         "Invalid of.messageResponseTimer: {} use default({})",
146                         rTimer, MESSAGE_RESPONSE_TIMER);
147             }
148         }
149     }
150
151     public void start() {
152         try {
153             startTransmitThread();
154             setupCommChannel();
155             sendFirstHello();
156             startHandlerThread();
157         } catch (Exception e) {
158             reportError(e);
159         }
160     }
161
162     private void startHandlerThread() {
163         switchHandlerThread = new Thread(new Runnable() {
164             @Override
165             public void run() {
166                 running = true;
167                 while (running) {
168                     try {
169                         // wait for an incoming connection
170                         selector.select(0);
171                         Iterator<SelectionKey> selectedKeys = selector
172                                 .selectedKeys().iterator();
173                         while (selectedKeys.hasNext()) {
174                             SelectionKey skey = selectedKeys.next();
175                             selectedKeys.remove();
176                             if (skey.isValid() && skey.isWritable()) {
177                                 resumeSend();
178                             }
179                             if (skey.isValid() && skey.isReadable()) {
180                                 handleMessages();
181                             }
182                         }
183                     } catch (Exception e) {
184                         reportError(e);
185                     }
186                 }
187             }
188         }, instanceName);
189         switchHandlerThread.start();
190     }
191
192     public void stop() {
193         running = false;
194         cancelSwitchTimer();
195         try {
196             selector.wakeup();
197             selector.close();
198         } catch (Exception e) {
199         }
200         try {
201             socket.close();
202         } catch (Exception e) {
203         }
204         try {
205             msgReadWriteService.stop();
206         } catch (Exception e) {
207         }
208         executor.shutdown();
209
210         msgReadWriteService = null;
211
212         if (switchHandlerThread != null) {
213             switchHandlerThread.interrupt();
214         }
215         if (transmitThread != null) {
216             transmitThread.interrupt();
217         }
218     }
219
220     @Override
221     public int getNextXid() {
222         return this.xid.incrementAndGet();
223     }
224
225     /**
226      * This method puts the message in an outgoing priority queue with normal
227      * priority. It will be served after high priority messages. The method
228      * should be used for non-critical messages such as statistics request,
229      * discovery packets, etc. An unique XID is generated automatically and
230      * inserted into the message.
231      * 
232      * @param msg
233      *            The OF message to be sent
234      * @return The XID used
235      */
236     @Override
237     public Integer asyncSend(OFMessage msg) {
238         return asyncSend(msg, getNextXid());
239     }
240
241     private Object syncSend(OFMessage msg, int xid) {
242         return syncMessageInternal(msg, xid, true);
243     }
244
245     /**
246      * This method puts the message in an outgoing priority queue with normal
247      * priority. It will be served after high priority messages. The method
248      * should be used for non-critical messages such as statistics request,
249      * discovery packets, etc. The specified XID is inserted into the message.
250      * 
251      * @param msg
252      *            The OF message to be Sent
253      * @param xid
254      *            The XID to be used in the message
255      * @return The XID used
256      */
257     @Override
258     public Integer asyncSend(OFMessage msg, int xid) {
259         msg.setXid(xid);
260         if (transmitQ != null) {
261             transmitQ.add(new PriorityMessage(msg, 0));
262         }
263         return xid;
264     }
265
266     /**
267      * This method puts the message in an outgoing priority queue with high
268      * priority. It will be served first before normal priority messages. The
269      * method should be used for critical messages such as hello, echo reply
270      * etc. An unique XID is generated automatically and inserted into the
271      * message.
272      * 
273      * @param msg
274      *            The OF message to be sent
275      * @return The XID used
276      */
277     @Override
278     public Integer asyncFastSend(OFMessage msg) {
279         return asyncFastSend(msg, getNextXid());
280     }
281
282     /**
283      * This method puts the message in an outgoing priority queue with high
284      * priority. It will be served first before normal priority messages. The
285      * method should be used for critical messages such as hello, echo reply
286      * etc. The specified XID is inserted into the message.
287      * 
288      * @param msg
289      *            The OF message to be sent
290      * @return The XID used
291      */
292     @Override
293     public Integer asyncFastSend(OFMessage msg, int xid) {
294         msg.setXid(xid);
295         if (transmitQ != null) {
296             transmitQ.add(new PriorityMessage(msg, 1));
297         }
298         return xid;
299     }
300
301     public void resumeSend() {
302         try {
303             if (msgReadWriteService != null) {
304                 msgReadWriteService.resumeSend();
305             }
306         } catch (Exception e) {
307             reportError(e);
308         }
309     }
310
311     public void handleMessages() {
312         List<OFMessage> msgs = null;
313
314         try {
315             if (msgReadWriteService != null) {
316                 msgs = msgReadWriteService.readMessages();
317             }
318         } catch (Exception e) {
319             reportError(e);
320         }
321
322         if (msgs == null) {
323             logger.debug("{} is down", this);
324             // the connection is down, inform core
325             reportSwitchStateChange(false);
326             return;
327         }
328         for (OFMessage msg : msgs) {
329             logger.trace("Message received: {}", msg);
330             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
331             OFType type = msg.getType();
332             switch (type) {
333             case HELLO:
334                 // send feature request
335                 OFMessage featureRequest = factory
336                         .getMessage(OFType.FEATURES_REQUEST);
337                 asyncFastSend(featureRequest);
338                 // delete all pre-existing flows
339                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
340                 OFFlowMod flowMod = (OFFlowMod) factory
341                         .getMessage(OFType.FLOW_MOD);
342                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
343                         .setOutPort(OFPort.OFPP_NONE)
344                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
345                 asyncFastSend(flowMod);
346                 this.state = SwitchState.WAIT_FEATURES_REPLY;
347                 startSwitchTimer();
348                 break;
349             case ECHO_REQUEST:
350                 OFEchoReply echoReply = (OFEchoReply) factory
351                         .getMessage(OFType.ECHO_REPLY);
352                 asyncFastSend(echoReply);
353                 break;
354             case ECHO_REPLY:
355                 this.probeSent = false;
356                 break;
357             case FEATURES_REPLY:
358                 processFeaturesReply((OFFeaturesReply) msg);
359                 break;
360             case GET_CONFIG_REPLY:
361                 // make sure that the switch can send the whole packet to the
362                 // controller
363                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
364                     this.state = SwitchState.OPERATIONAL;
365                 }
366                 break;
367             case BARRIER_REPLY:
368                 processBarrierReply((OFBarrierReply) msg);
369                 break;
370             case ERROR:
371                 processErrorReply((OFError) msg);
372                 break;
373             case PORT_STATUS:
374                 processPortStatusMsg((OFPortStatus) msg);
375                 break;
376             case STATS_REPLY:
377                 processStatsReply((OFStatisticsReply) msg);
378                 break;
379             case PACKET_IN:
380                 break;
381             default:
382                 break;
383             } // end of switch
384             if (isOperational()) {
385                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
386             }
387         } // end of for
388     }
389
390     private void processPortStatusMsg(OFPortStatus msg) {
391         OFPhysicalPort port = msg.getDesc();
392         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
393             updatePhysicalPort(port);
394         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
395             updatePhysicalPort(port);
396         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
397                 .ordinal()) {
398             deletePhysicalPort(port);
399         }
400
401     }
402
403     private void startSwitchTimer() {
404         this.periodicTimer = new Timer();
405         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
406             @Override
407             public void run() {
408                 try {
409                     Long now = System.currentTimeMillis();
410                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
411                         if (probeSent) {
412                             // switch failed to respond to our probe, consider
413                             // it down
414                             logger.warn("{} is idle for too long, disconnect",
415                                     toString());
416                             reportSwitchStateChange(false);
417                         } else {
418                             // send a probe to see if the switch is still alive
419                             logger.debug(
420                                     "Send idle probe (Echo Request) to {}",
421                                     this);
422                             probeSent = true;
423                             OFMessage echo = factory
424                                     .getMessage(OFType.ECHO_REQUEST);
425                             asyncFastSend(echo);
426                         }
427                     } else {
428                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
429                             // send another features request
430                             OFMessage request = factory
431                                     .getMessage(OFType.FEATURES_REQUEST);
432                             asyncFastSend(request);
433                         } else {
434                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
435                                 // send another config request
436                                 OFSetConfig config = (OFSetConfig) factory
437                                         .getMessage(OFType.SET_CONFIG);
438                                 config.setMissSendLength((short) 0xffff)
439                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
440                                 asyncFastSend(config);
441                                 OFMessage getConfig = factory
442                                         .getMessage(OFType.GET_CONFIG_REQUEST);
443                                 asyncFastSend(getConfig);
444                             }
445                         }
446                     }
447                 } catch (Exception e) {
448                     reportError(e);
449                 }
450             }
451         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
452     }
453
454     private void cancelSwitchTimer() {
455         if (this.periodicTimer != null) {
456             this.periodicTimer.cancel();
457         }
458     }
459
460     private void reportError(Exception e) {
461         if (e instanceof AsynchronousCloseException
462                 || e instanceof InterruptedException
463                 || e instanceof SocketException || e instanceof IOException
464                 || e instanceof ClosedSelectorException) {
465             if (logger.isDebugEnabled()) {
466               logger.debug("Caught exception {}", e.getMessage());
467             }
468         } else {
469             logger.warn("Caught exception ", e);
470         }
471         // notify core of this error event and disconnect the switch
472         ((Controller) core).takeSwitchEventError(this);
473     }
474
475     private void reportSwitchStateChange(boolean added) {
476         if (added) {
477             ((Controller) core).takeSwitchEventAdd(this);
478         } else {
479             ((Controller) core).takeSwitchEventDelete(this);
480         }
481     }
482
483     @Override
484     public Long getId() {
485         return this.sid;
486     }
487
488     private void processFeaturesReply(OFFeaturesReply reply) {
489         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
490             this.sid = reply.getDatapathId();
491             this.buffers = reply.getBuffers();
492             this.capabilities = reply.getCapabilities();
493             this.tables = reply.getTables();
494             this.actions = reply.getActions();
495             // notify core of this error event
496             for (OFPhysicalPort port : reply.getPorts()) {
497                 updatePhysicalPort(port);
498             }
499             // config the switch to send full data packet
500             OFSetConfig config = (OFSetConfig) factory
501                     .getMessage(OFType.SET_CONFIG);
502             config.setMissSendLength((short) 0xffff).setLengthU(
503                     OFSetConfig.MINIMUM_LENGTH);
504             asyncFastSend(config);
505             // send config request to make sure the switch can handle the set
506             // config
507             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
508             asyncFastSend(getConfig);
509             this.state = SwitchState.WAIT_CONFIG_REPLY;
510             // inform core that a new switch is now operational
511             reportSwitchStateChange(true);
512         }
513     }
514
515     private void updatePhysicalPort(OFPhysicalPort port) {
516         Short portNumber = port.getPortNumber();
517         physicalPorts.put(portNumber, port);
518         portBandwidth
519                 .put(portNumber,
520                         port.getCurrentFeatures()
521                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
522                                         | OFPortFeatures.OFPPF_10MB_HD
523                                                 .getValue()
524                                         | OFPortFeatures.OFPPF_100MB_FD
525                                                 .getValue()
526                                         | OFPortFeatures.OFPPF_100MB_HD
527                                                 .getValue()
528                                         | OFPortFeatures.OFPPF_1GB_FD
529                                                 .getValue()
530                                         | OFPortFeatures.OFPPF_1GB_HD
531                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
532                                             .getValue()));
533     }
534
535     private void deletePhysicalPort(OFPhysicalPort port) {
536         Short portNumber = port.getPortNumber();
537         physicalPorts.remove(portNumber);
538         portBandwidth.remove(portNumber);
539     }
540
541     @Override
542     public boolean isOperational() {
543         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
544     }
545
546     @Override
547     public String toString() {
548         try {
549             return ("Switch:"
550                     + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
551                     + " SWID:" + (isOperational() ? HexString
552                     .toHexString(this.sid) : "unknown"));
553         } catch (Exception e) {
554             return (isOperational() ? HexString.toHexString(this.sid)
555                     : "unknown");
556         }
557
558     }
559
560     @Override
561     public Date getConnectedDate() {
562         return this.connectedDate;
563     }
564
565     public String getInstanceName() {
566         return instanceName;
567     }
568
569     @Override
570     public Object getStatistics(OFStatisticsRequest req) {
571         int xid = getNextXid();
572         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
573         messageWaitingDone.put(xid, worker);
574         Future<Object> submit = executor.submit(worker);
575         Object result = null;
576         try {
577             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
578             return result;
579         } catch (Exception e) {
580             logger.warn("Timeout while waiting for {} replies", req.getType());
581             result = null; // to indicate timeout has occurred
582             return result;
583         }
584     }
585
586     @Override
587     public Object syncSend(OFMessage msg) {
588         int xid = getNextXid();
589         return syncSend(msg, xid);
590     }
591
592     /*
593      * Either a BarrierReply or a OFError is received. If this is a reply for an
594      * outstanding sync message, wake up associated task so that it can continue
595      */
596     private void processBarrierReply(OFBarrierReply msg) {
597         Integer xid = msg.getXid();
598         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
599                 .remove(xid);
600         if (worker == null) {
601             return;
602         }
603         worker.wakeup();
604     }
605
606     private void processErrorReply(OFError errorMsg) {
607         OFMessage offendingMsg = errorMsg.getOffendingMsg();
608         Integer xid;
609         if (offendingMsg != null) {
610             xid = offendingMsg.getXid();
611         } else {
612             xid = errorMsg.getXid();
613         }
614         /*
615          * the error can be a reply to a synchronous message or to a statistic
616          * request message
617          */
618         Callable<?> worker = messageWaitingDone.remove(xid);
619         if (worker == null) {
620             return;
621         }
622         if (worker instanceof SynchronousMessage) {
623             ((SynchronousMessage) worker).wakeup(errorMsg);
624         } else {
625             ((StatisticsCollector) worker).wakeup(errorMsg);
626         }
627     }
628
629     private void processStatsReply(OFStatisticsReply reply) {
630         Integer xid = reply.getXid();
631         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
632                 .get(xid);
633         if (worker == null) {
634             return;
635         }
636         if (worker.collect(reply)) {
637             // if all the stats records are received (collect() returns true)
638             // then we are done.
639             messageWaitingDone.remove(xid);
640             worker.wakeup();
641         }
642     }
643
644     @Override
645     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
646         return this.physicalPorts;
647     }
648
649     @Override
650     public OFPhysicalPort getPhysicalPort(Short portNumber) {
651         return this.physicalPorts.get(portNumber);
652     }
653
654     @Override
655     public Integer getPortBandwidth(Short portNumber) {
656         return this.portBandwidth.get(portNumber);
657     }
658
659     @Override
660     public Set<Short> getPorts() {
661         return this.physicalPorts.keySet();
662     }
663
664     @Override
665     public Byte getTables() {
666         return this.tables;
667     }
668
669     @Override
670     public Integer getActions() {
671         return this.actions;
672     }
673
674     @Override
675     public Integer getCapabilities() {
676         return this.capabilities;
677     }
678
679     @Override
680     public Integer getBuffers() {
681         return this.buffers;
682     }
683
684     @Override
685     public boolean isPortEnabled(short portNumber) {
686         return isPortEnabled(physicalPorts.get(portNumber));
687     }
688
689     @Override
690     public boolean isPortEnabled(OFPhysicalPort port) {
691         if (port == null) {
692             return false;
693         }
694         int portConfig = port.getConfig();
695         int portState = port.getState();
696         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
697             return false;
698         }
699         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
700             return false;
701         }
702         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
703                 .getValue()) {
704             return false;
705         }
706         return true;
707     }
708
709     @Override
710     public List<OFPhysicalPort> getEnabledPorts() {
711         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
712         synchronized (this.physicalPorts) {
713             for (OFPhysicalPort port : physicalPorts.values()) {
714                 if (isPortEnabled(port)) {
715                     result.add(port);
716                 }
717             }
718         }
719         return result;
720     }
721
722     /*
723      * Transmit thread polls the message out of the priority queue and invokes
724      * messaging service to transmit it over the socket channel
725      */
726     class PriorityMessageTransmit implements Runnable {
727         public void run() {
728             running = true;
729             while (running) {
730                 try {
731                     if (!transmitQ.isEmpty()) {
732                         PriorityMessage pmsg = transmitQ.poll();
733                         msgReadWriteService.asyncSend(pmsg.msg);
734                         logger.trace("Message sent: {}", pmsg);
735                         /*
736                          * If syncReply is set to true, wait for the response
737                          * back.
738                          */
739                         if (pmsg.syncReply) {
740                             syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
741                         }
742                     }
743                     Thread.sleep(10);
744                 } catch (InterruptedException ie) {
745                     reportError(new InterruptedException(
746                             "PriorityMessageTransmit thread interrupted"));
747                 } catch (Exception e) {
748                     reportError(e);
749                 }
750             }
751             transmitQ = null;
752         }
753     }
754
755     /*
756      * Setup and start the transmit thread
757      */
758     private void startTransmitThread() {
759         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
760                 new Comparator<PriorityMessage>() {
761                     public int compare(PriorityMessage p1, PriorityMessage p2) {
762                         if (p2.priority != p1.priority) {
763                             return p2.priority - p1.priority;
764                         } else {
765                             return (p2.seqNum < p1.seqNum) ? 1 : -1;
766                         }
767                     }
768                 });
769         this.transmitThread = new Thread(new PriorityMessageTransmit());
770         this.transmitThread.start();
771     }
772
773     /*
774      * Setup communication services
775      */
776     private void setupCommChannel() throws Exception {
777         this.selector = SelectorProvider.provider().openSelector();
778         this.socket.configureBlocking(false);
779         this.socket.socket().setTcpNoDelay(true);
780         this.msgReadWriteService = getMessageReadWriteService();
781     }
782
783     private void sendFirstHello() {
784         try {
785             OFMessage msg = factory.getMessage(OFType.HELLO);
786             asyncFastSend(msg);
787         } catch (Exception e) {
788             reportError(e);
789         }
790     }
791
792     private IMessageReadWrite getMessageReadWriteService() throws Exception {
793         String str = System.getProperty("secureChannelEnabled");
794         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
795                 socket, selector) : new MessageReadWriteService(socket,
796                 selector);
797     }
798
799     /**
800      * Send Barrier message synchronously. The caller will be blocked until the
801      * Barrier reply is received.
802      */
803     @Override
804     public Object syncSendBarrierMessage() {
805         OFBarrierRequest barrierMsg = new OFBarrierRequest();
806         return syncSend(barrierMsg);
807     }
808
809     /**
810      * Send Barrier message asynchronously. The caller is not blocked. The
811      * Barrier message will be sent in a transmit thread which will be blocked
812      * until the Barrier reply is received.
813      */
814     @Override
815     public Object asyncSendBarrierMessage() {
816         if (transmitQ == null) {
817             return Boolean.FALSE;
818         }
819
820         OFBarrierRequest barrierMsg = new OFBarrierRequest();
821         int xid = getNextXid();
822
823         barrierMsg.setXid(xid);
824         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
825         
826         return Boolean.TRUE;
827     }
828
829     /**
830      * This method returns the switch liveness timeout value. If controller did
831      * not receive any message from the switch for such a long period,
832      * controller will tear down the connection to the switch.
833      * 
834      * @return The timeout value
835      */
836     private static int getSwitchLivenessTimeout() {
837         String timeout = System.getProperty("of.switchLivenessTimeout");
838         int rv = 60500;
839
840         try {
841             if (timeout != null) {
842                 rv = Integer.parseInt(timeout);
843             }
844         } catch (Exception e) {
845         }
846
847         return rv;
848     }
849
850     /**
851      * This method performs synchronous operations for a given message. If
852      * syncRequest is set to true, the message will be sent out followed by a
853      * Barrier request message. Then it's blocked until the Barrier rely arrives
854      * or timeout. If syncRequest is false, it simply skips the message send and
855      * just waits for the response back.
856      * 
857      * @param msg
858      *            Message to be sent
859      * @param xid
860      *            Message XID
861      * @param request
862      *            If set to true, the message the message will be sent out
863      *            followed by a Barrier request message. If set to false, it
864      *            simply skips the sending and just waits for the Barrier reply.
865      * @return the result
866      */
867     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
868         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
869         messageWaitingDone.put(xid, worker);
870         Object result = null;
871         Boolean status = false;
872         Future<Object> submit = executor.submit(worker);
873         try {
874             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
875             messageWaitingDone.remove(xid);
876             if (result == null) {
877                 // if result is null, then it means the switch can handle this
878                 // message successfully
879                 // convert the result into a Boolean with value true
880                 status = true;
881                 // logger.debug("Successfully send " +
882                 // msg.getType().toString());
883                 result = status;
884             } else {
885                 // if result is not null, this means the switch can't handle
886                 // this message
887                 // the result if OFError already
888                 if (logger.isDebugEnabled()) {
889                   logger.debug("Send {} failed --> {}", msg.getType(),
890                                ((OFError) result));
891                 }
892             }
893             return result;
894         } catch (Exception e) {
895             logger.warn("Timeout while waiting for {} reply", msg.getType()
896                     .toString());
897             // convert the result into a Boolean with value false
898             status = false;
899             result = status;
900             return result;
901         }
902     }
903 }