Merge "API 2 Documentation for opendaylight.controller.usermanager"
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedSelectorException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.nio.channels.spi.SelectorProvider;
19 import java.util.ArrayList;
20 import java.util.Comparator;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
40 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
41 import org.openflow.protocol.OFBarrierReply;
42 import org.openflow.protocol.OFBarrierRequest;
43 import org.openflow.protocol.OFEchoReply;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFFeaturesReply;
46 import org.openflow.protocol.OFFlowMod;
47 import org.openflow.protocol.OFGetConfigReply;
48 import org.openflow.protocol.OFMatch;
49 import org.openflow.protocol.OFMessage;
50 import org.openflow.protocol.OFPhysicalPort;
51 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
52 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
53 import org.openflow.protocol.OFPhysicalPort.OFPortState;
54 import org.openflow.protocol.OFPort;
55 import org.openflow.protocol.OFPortStatus;
56 import org.openflow.protocol.OFPortStatus.OFPortReason;
57 import org.openflow.protocol.OFSetConfig;
58 import org.openflow.protocol.OFStatisticsReply;
59 import org.openflow.protocol.OFStatisticsRequest;
60 import org.openflow.protocol.OFType;
61 import org.openflow.protocol.factory.BasicFactory;
62 import org.openflow.util.HexString;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class SwitchHandler implements ISwitch {
67     private static final Logger logger = LoggerFactory
68             .getLogger(SwitchHandler.class);
69     private static final int SWITCH_LIVENESS_TIMER = 5000;
70     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
71     private int MESSAGE_RESPONSE_TIMER = 2000;
72
73     private String instanceName;
74     private ISwitch thisISwitch;
75     private IController core;
76     private Long sid;
77     private Integer buffers;
78     private Integer capabilities;
79     private Byte tables;
80     private Integer actions;
81     private Selector selector;
82     private SocketChannel socket;
83     private BasicFactory factory;
84     private AtomicInteger xid;
85     private SwitchState state;
86     private Timer periodicTimer;
87     private Map<Short, OFPhysicalPort> physicalPorts;
88     private Map<Short, Integer> portBandwidth;
89     private Date connectedDate;
90     private Long lastMsgReceivedTimeStamp;
91     private Boolean probeSent;
92     private ExecutorService executor;
93     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
94     private boolean running;
95     private IMessageReadWrite msgReadWriteService;
96     private Thread switchHandlerThread;
97     private Integer responseTimerValue;
98     private PriorityBlockingQueue<PriorityMessage> transmitQ;
99     private Thread transmitThread;
100
101     private enum SwitchState {
102         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
103                 3);
104
105         private int value;
106
107         private SwitchState(int value) {
108             this.value = value;
109         }
110
111         @SuppressWarnings("unused")
112         public int value() {
113             return this.value;
114         }
115     }
116
117     public SwitchHandler(Controller core, SocketChannel sc, String name) {
118         this.instanceName = name;
119         this.thisISwitch = this;
120         this.sid = (long) 0;
121         this.buffers = (int) 0;
122         this.capabilities = (int) 0;
123         this.tables = (byte) 0;
124         this.actions = (int) 0;
125         this.core = core;
126         this.socket = sc;
127         this.factory = new BasicFactory();
128         this.connectedDate = new Date();
129         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
130         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
131         this.portBandwidth = new HashMap<Short, Integer>();
132         this.state = SwitchState.NON_OPERATIONAL;
133         this.probeSent = false;
134         this.xid = new AtomicInteger(this.socket.hashCode());
135         this.periodicTimer = null;
136         this.executor = Executors.newFixedThreadPool(4);
137         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139         String rTimer = System.getProperty("of.messageResponseTimer");
140         if (rTimer != null) {
141             try {
142                 responseTimerValue = Integer.decode(rTimer);
143             } catch (NumberFormatException e) {
144                 logger.warn(
145                         "Invalid of.messageResponseTimer: {} use default({})",
146                         rTimer, MESSAGE_RESPONSE_TIMER);
147             }
148         }
149     }
150
151     public void start() {
152         try {
153             startTransmitThread();
154             setupCommChannel();
155             sendFirstHello();
156             startHandlerThread();
157         } catch (Exception e) {
158             reportError(e);
159         }
160     }
161
162     private void startHandlerThread() {
163         switchHandlerThread = new Thread(new Runnable() {
164             @Override
165             public void run() {
166                 running = true;
167                 while (running) {
168                     try {
169                         // wait for an incoming connection
170                         selector.select(0);
171                         Iterator<SelectionKey> selectedKeys = selector
172                                 .selectedKeys().iterator();
173                         while (selectedKeys.hasNext()) {
174                             SelectionKey skey = selectedKeys.next();
175                             selectedKeys.remove();
176                             if (skey.isValid() && skey.isWritable()) {
177                                 resumeSend();
178                             }
179                             if (skey.isValid() && skey.isReadable()) {
180                                 handleMessages();
181                             }
182                         }
183                     } catch (Exception e) {
184                         reportError(e);
185                     }
186                 }
187             }
188         }, instanceName);
189         switchHandlerThread.start();
190     }
191
192     public void stop() {
193         running = false;
194         cancelSwitchTimer();
195         try {
196             selector.wakeup();
197             selector.close();
198         } catch (Exception e) {
199         }
200         try {
201             socket.close();
202         } catch (Exception e) {
203         }
204         try {
205             msgReadWriteService.stop();
206         } catch (Exception e) {
207         }
208         executor.shutdown();
209
210         msgReadWriteService = null;
211
212         if (switchHandlerThread != null) {
213             switchHandlerThread.interrupt();
214         }
215         if (transmitThread != null) {
216             transmitThread.interrupt();
217         }
218     }
219
220     @Override
221     public int getNextXid() {
222         return this.xid.incrementAndGet();
223     }
224
225     /**
226      * This method puts the message in an outgoing priority queue with normal
227      * priority. It will be served after high priority messages. The method
228      * should be used for non-critical messages such as statistics request,
229      * discovery packets, etc. An unique XID is generated automatically and
230      * inserted into the message.
231      * 
232      * @param msg
233      *            The OF message to be sent
234      * @return The XID used
235      */
236     @Override
237     public Integer asyncSend(OFMessage msg) {
238         return asyncSend(msg, getNextXid());
239     }
240
241     private Object syncSend(OFMessage msg, int xid) {
242         return syncMessageInternal(msg, xid, true);
243     }
244
245     /**
246      * This method puts the message in an outgoing priority queue with normal
247      * priority. It will be served after high priority messages. The method
248      * should be used for non-critical messages such as statistics request,
249      * discovery packets, etc. The specified XID is inserted into the message.
250      * 
251      * @param msg
252      *            The OF message to be Sent
253      * @param xid
254      *            The XID to be used in the message
255      * @return The XID used
256      */
257     @Override
258     public Integer asyncSend(OFMessage msg, int xid) {
259         msg.setXid(xid);
260         if (transmitQ != null) {
261             transmitQ.add(new PriorityMessage(msg, 0));
262         }
263         return xid;
264     }
265
266     /**
267      * This method puts the message in an outgoing priority queue with high
268      * priority. It will be served first before normal priority messages. The
269      * method should be used for critical messages such as hello, echo reply
270      * etc. An unique XID is generated automatically and inserted into the
271      * message.
272      * 
273      * @param msg
274      *            The OF message to be sent
275      * @return The XID used
276      */
277     @Override
278     public Integer asyncFastSend(OFMessage msg) {
279         return asyncFastSend(msg, getNextXid());
280     }
281
282     /**
283      * This method puts the message in an outgoing priority queue with high
284      * priority. It will be served first before normal priority messages. The
285      * method should be used for critical messages such as hello, echo reply
286      * etc. The specified XID is inserted into the message.
287      * 
288      * @param msg
289      *            The OF message to be sent
290      * @return The XID used
291      */
292     @Override
293     public Integer asyncFastSend(OFMessage msg, int xid) {
294         msg.setXid(xid);
295         if (transmitQ != null) {
296             transmitQ.add(new PriorityMessage(msg, 1));
297         }
298         return xid;
299     }
300
301     public void resumeSend() {
302         try {
303             if (msgReadWriteService != null) {
304                 msgReadWriteService.resumeSend();
305             }
306         } catch (Exception e) {
307             reportError(e);
308         }
309     }
310
311     public void handleMessages() {
312         List<OFMessage> msgs = null;
313
314         try {
315             if (msgReadWriteService != null) {
316                 msgs = msgReadWriteService.readMessages();
317             }
318         } catch (Exception e) {
319             reportError(e);
320         }
321
322         if (msgs == null) {
323             logger.debug("{} is down", toString());
324             // the connection is down, inform core
325             reportSwitchStateChange(false);
326             return;
327         }
328         for (OFMessage msg : msgs) {
329             logger.trace("Message received: {}", msg.toString());
330             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
331             OFType type = msg.getType();
332             switch (type) {
333             case HELLO:
334                 // send feature request
335                 OFMessage featureRequest = factory
336                         .getMessage(OFType.FEATURES_REQUEST);
337                 asyncFastSend(featureRequest);
338                 // delete all pre-existing flows
339                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
340                 OFFlowMod flowMod = (OFFlowMod) factory
341                         .getMessage(OFType.FLOW_MOD);
342                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
343                         .setOutPort(OFPort.OFPP_NONE)
344                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
345                 asyncFastSend(flowMod);
346                 this.state = SwitchState.WAIT_FEATURES_REPLY;
347                 startSwitchTimer();
348                 break;
349             case ECHO_REQUEST:
350                 OFEchoReply echoReply = (OFEchoReply) factory
351                         .getMessage(OFType.ECHO_REPLY);
352                 asyncFastSend(echoReply);
353                 break;
354             case ECHO_REPLY:
355                 this.probeSent = false;
356                 break;
357             case FEATURES_REPLY:
358                 processFeaturesReply((OFFeaturesReply) msg);
359                 break;
360             case GET_CONFIG_REPLY:
361                 // make sure that the switch can send the whole packet to the
362                 // controller
363                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
364                     this.state = SwitchState.OPERATIONAL;
365                 }
366                 break;
367             case BARRIER_REPLY:
368                 processBarrierReply((OFBarrierReply) msg);
369                 break;
370             case ERROR:
371                 processErrorReply((OFError) msg);
372                 break;
373             case PORT_STATUS:
374                 processPortStatusMsg((OFPortStatus) msg);
375                 break;
376             case STATS_REPLY:
377                 processStatsReply((OFStatisticsReply) msg);
378                 break;
379             case PACKET_IN:
380                 break;
381             default:
382                 break;
383             } // end of switch
384             if (isOperational()) {
385                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
386             }
387         } // end of for
388     }
389
390     private void processPortStatusMsg(OFPortStatus msg) {
391         OFPhysicalPort port = msg.getDesc();
392         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
393             updatePhysicalPort(port);
394         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
395             updatePhysicalPort(port);
396         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
397                 .ordinal()) {
398             deletePhysicalPort(port);
399         }
400
401     }
402
403     private void startSwitchTimer() {
404         this.periodicTimer = new Timer();
405         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
406             @Override
407             public void run() {
408                 try {
409                     Long now = System.currentTimeMillis();
410                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
411                         if (probeSent) {
412                             // switch failed to respond to our probe, consider
413                             // it down
414                             logger.warn("{} is idle for too long, disconnect",
415                                     toString());
416                             reportSwitchStateChange(false);
417                         } else {
418                             // send a probe to see if the switch is still alive
419                             logger.debug(
420                                     "Send idle probe (Echo Request) to {}",
421                                     toString());
422                             probeSent = true;
423                             OFMessage echo = factory
424                                     .getMessage(OFType.ECHO_REQUEST);
425                             asyncFastSend(echo);
426                         }
427                     } else {
428                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
429                             // send another features request
430                             OFMessage request = factory
431                                     .getMessage(OFType.FEATURES_REQUEST);
432                             asyncFastSend(request);
433                         } else {
434                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
435                                 // send another config request
436                                 OFSetConfig config = (OFSetConfig) factory
437                                         .getMessage(OFType.SET_CONFIG);
438                                 config.setMissSendLength((short) 0xffff)
439                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
440                                 asyncFastSend(config);
441                                 OFMessage getConfig = factory
442                                         .getMessage(OFType.GET_CONFIG_REQUEST);
443                                 asyncFastSend(getConfig);
444                             }
445                         }
446                     }
447                 } catch (Exception e) {
448                     reportError(e);
449                 }
450             }
451         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
452     }
453
454     private void cancelSwitchTimer() {
455         if (this.periodicTimer != null) {
456             this.periodicTimer.cancel();
457         }
458     }
459
460     private void reportError(Exception e) {
461         if (e instanceof AsynchronousCloseException
462                 || e instanceof InterruptedException
463                 || e instanceof SocketException || e instanceof IOException
464                 || e instanceof ClosedSelectorException) {
465             logger.debug("Caught exception {}", e.getMessage());
466         } else {
467             logger.warn("Caught exception ", e);
468         }
469         // notify core of this error event and disconnect the switch
470         ((Controller) core).takeSwitchEventError(this);
471     }
472
473     private void reportSwitchStateChange(boolean added) {
474         if (added) {
475             ((Controller) core).takeSwitchEventAdd(this);
476         } else {
477             ((Controller) core).takeSwitchEventDelete(this);
478         }
479     }
480
481     @Override
482     public Long getId() {
483         return this.sid;
484     }
485
486     private void processFeaturesReply(OFFeaturesReply reply) {
487         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
488             this.sid = reply.getDatapathId();
489             this.buffers = reply.getBuffers();
490             this.capabilities = reply.getCapabilities();
491             this.tables = reply.getTables();
492             this.actions = reply.getActions();
493             // notify core of this error event
494             for (OFPhysicalPort port : reply.getPorts()) {
495                 updatePhysicalPort(port);
496             }
497             // config the switch to send full data packet
498             OFSetConfig config = (OFSetConfig) factory
499                     .getMessage(OFType.SET_CONFIG);
500             config.setMissSendLength((short) 0xffff).setLengthU(
501                     OFSetConfig.MINIMUM_LENGTH);
502             asyncFastSend(config);
503             // send config request to make sure the switch can handle the set
504             // config
505             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
506             asyncFastSend(getConfig);
507             this.state = SwitchState.WAIT_CONFIG_REPLY;
508             // inform core that a new switch is now operational
509             reportSwitchStateChange(true);
510         }
511     }
512
513     private void updatePhysicalPort(OFPhysicalPort port) {
514         Short portNumber = port.getPortNumber();
515         physicalPorts.put(portNumber, port);
516         portBandwidth
517                 .put(portNumber,
518                         port.getCurrentFeatures()
519                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
520                                         | OFPortFeatures.OFPPF_10MB_HD
521                                                 .getValue()
522                                         | OFPortFeatures.OFPPF_100MB_FD
523                                                 .getValue()
524                                         | OFPortFeatures.OFPPF_100MB_HD
525                                                 .getValue()
526                                         | OFPortFeatures.OFPPF_1GB_FD
527                                                 .getValue()
528                                         | OFPortFeatures.OFPPF_1GB_HD
529                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
530                                             .getValue()));
531     }
532
533     private void deletePhysicalPort(OFPhysicalPort port) {
534         Short portNumber = port.getPortNumber();
535         physicalPorts.remove(portNumber);
536         portBandwidth.remove(portNumber);
537     }
538
539     @Override
540     public boolean isOperational() {
541         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
542     }
543
544     @Override
545     public String toString() {
546         try {
547             return ("Switch:"
548                     + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
549                     + " SWID:" + (isOperational() ? HexString
550                     .toHexString(this.sid) : "unknown"));
551         } catch (Exception e) {
552             return (isOperational() ? HexString.toHexString(this.sid)
553                     : "unknown");
554         }
555
556     }
557
558     @Override
559     public Date getConnectedDate() {
560         return this.connectedDate;
561     }
562
563     public String getInstanceName() {
564         return instanceName;
565     }
566
567     @Override
568     public Object getStatistics(OFStatisticsRequest req) {
569         int xid = getNextXid();
570         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
571         messageWaitingDone.put(xid, worker);
572         Future<Object> submit = executor.submit(worker);
573         Object result = null;
574         try {
575             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
576             return result;
577         } catch (Exception e) {
578             logger.warn("Timeout while waiting for {} replies", req.getType());
579             result = null; // to indicate timeout has occurred
580             return result;
581         }
582     }
583
584     @Override
585     public Object syncSend(OFMessage msg) {
586         int xid = getNextXid();
587         return syncSend(msg, xid);
588     }
589
590     /*
591      * Either a BarrierReply or a OFError is received. If this is a reply for an
592      * outstanding sync message, wake up associated task so that it can continue
593      */
594     private void processBarrierReply(OFBarrierReply msg) {
595         Integer xid = msg.getXid();
596         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
597                 .remove(xid);
598         if (worker == null) {
599             return;
600         }
601         worker.wakeup();
602     }
603
604     private void processErrorReply(OFError errorMsg) {
605         OFMessage offendingMsg = errorMsg.getOffendingMsg();
606         Integer xid;
607         if (offendingMsg != null) {
608             xid = offendingMsg.getXid();
609         } else {
610             xid = errorMsg.getXid();
611         }
612         /*
613          * the error can be a reply to a synchronous message or to a statistic
614          * request message
615          */
616         Callable<?> worker = messageWaitingDone.remove(xid);
617         if (worker == null) {
618             return;
619         }
620         if (worker instanceof SynchronousMessage) {
621             ((SynchronousMessage) worker).wakeup(errorMsg);
622         } else {
623             ((StatisticsCollector) worker).wakeup(errorMsg);
624         }
625     }
626
627     private void processStatsReply(OFStatisticsReply reply) {
628         Integer xid = reply.getXid();
629         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
630                 .get(xid);
631         if (worker == null) {
632             return;
633         }
634         if (worker.collect(reply)) {
635             // if all the stats records are received (collect() returns true)
636             // then we are done.
637             messageWaitingDone.remove(xid);
638             worker.wakeup();
639         }
640     }
641
642     @Override
643     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
644         return this.physicalPorts;
645     }
646
647     @Override
648     public OFPhysicalPort getPhysicalPort(Short portNumber) {
649         return this.physicalPorts.get(portNumber);
650     }
651
652     @Override
653     public Integer getPortBandwidth(Short portNumber) {
654         return this.portBandwidth.get(portNumber);
655     }
656
657     @Override
658     public Set<Short> getPorts() {
659         return this.physicalPorts.keySet();
660     }
661
662     @Override
663     public Byte getTables() {
664         return this.tables;
665     }
666
667     @Override
668     public Integer getActions() {
669         return this.actions;
670     }
671
672     @Override
673     public Integer getCapabilities() {
674         return this.capabilities;
675     }
676
677     @Override
678     public Integer getBuffers() {
679         return this.buffers;
680     }
681
682     @Override
683     public boolean isPortEnabled(short portNumber) {
684         return isPortEnabled(physicalPorts.get(portNumber));
685     }
686
687     @Override
688     public boolean isPortEnabled(OFPhysicalPort port) {
689         if (port == null) {
690             return false;
691         }
692         int portConfig = port.getConfig();
693         int portState = port.getState();
694         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
695             return false;
696         }
697         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
698             return false;
699         }
700         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
701                 .getValue()) {
702             return false;
703         }
704         return true;
705     }
706
707     @Override
708     public List<OFPhysicalPort> getEnabledPorts() {
709         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
710         synchronized (this.physicalPorts) {
711             for (OFPhysicalPort port : physicalPorts.values()) {
712                 if (isPortEnabled(port)) {
713                     result.add(port);
714                 }
715             }
716         }
717         return result;
718     }
719
720     /*
721      * Transmit thread polls the message out of the priority queue and invokes
722      * messaging service to transmit it over the socket channel
723      */
724     class PriorityMessageTransmit implements Runnable {
725         public void run() {
726             running = true;
727             while (running) {
728                 try {
729                     if (!transmitQ.isEmpty()) {
730                         PriorityMessage pmsg = transmitQ.poll();
731                         msgReadWriteService.asyncSend(pmsg.msg);
732                         logger.trace("Message sent: {}", pmsg.toString());
733                         /*
734                          * If syncReply is set to true, wait for the response
735                          * back.
736                          */
737                         if (pmsg.syncReply) {
738                             syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
739                         }
740                     }
741                     Thread.sleep(10);
742                 } catch (InterruptedException ie) {
743                     reportError(new InterruptedException(
744                             "PriorityMessageTransmit thread interrupted"));
745                 } catch (Exception e) {
746                     reportError(e);
747                 }
748             }
749             transmitQ = null;
750         }
751     }
752
753     /*
754      * Setup and start the transmit thread
755      */
756     private void startTransmitThread() {
757         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
758                 new Comparator<PriorityMessage>() {
759                     public int compare(PriorityMessage p1, PriorityMessage p2) {
760                         if (p2.priority != p1.priority) {
761                             return p2.priority - p1.priority;
762                         } else {
763                             return (p2.seqNum < p1.seqNum) ? 1 : -1;
764                         }
765                     }
766                 });
767         this.transmitThread = new Thread(new PriorityMessageTransmit());
768         this.transmitThread.start();
769     }
770
771     /*
772      * Setup communication services
773      */
774     private void setupCommChannel() throws Exception {
775         this.selector = SelectorProvider.provider().openSelector();
776         this.socket.configureBlocking(false);
777         this.socket.socket().setTcpNoDelay(true);
778         this.msgReadWriteService = getMessageReadWriteService();
779     }
780
781     private void sendFirstHello() {
782         try {
783             OFMessage msg = factory.getMessage(OFType.HELLO);
784             asyncFastSend(msg);
785         } catch (Exception e) {
786             reportError(e);
787         }
788     }
789
790     private IMessageReadWrite getMessageReadWriteService() throws Exception {
791         String str = System.getProperty("secureChannelEnabled");
792         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
793                 socket, selector) : new MessageReadWriteService(socket,
794                 selector);
795     }
796
797     /**
798      * Send Barrier message synchronously. The caller will be blocked until the
799      * Barrier reply is received.
800      */
801     @Override
802     public Object syncSendBarrierMessage() {
803         OFBarrierRequest barrierMsg = new OFBarrierRequest();
804         return syncSend(barrierMsg);
805     }
806
807     /**
808      * Send Barrier message asynchronously. The caller is not blocked. The
809      * Barrier message will be sent in a transmit thread which will be blocked
810      * until the Barrier reply is received.
811      */
812     @Override
813     public Object asyncSendBarrierMessage() {
814         if (transmitQ == null) {
815             return Boolean.FALSE;
816         }
817
818         OFBarrierRequest barrierMsg = new OFBarrierRequest();
819         int xid = getNextXid();
820
821         barrierMsg.setXid(xid);
822         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
823         
824         return Boolean.TRUE;
825     }
826
827     /**
828      * This method returns the switch liveness timeout value. If controller did
829      * not receive any message from the switch for such a long period,
830      * controller will tear down the connection to the switch.
831      * 
832      * @return The timeout value
833      */
834     private static int getSwitchLivenessTimeout() {
835         String timeout = System.getProperty("of.switchLivenessTimeout");
836         int rv = 60500;
837
838         try {
839             if (timeout != null) {
840                 rv = Integer.parseInt(timeout);
841             }
842         } catch (Exception e) {
843         }
844
845         return rv;
846     }
847
848     /**
849      * This method performs synchronous operations for a given message. If
850      * syncRequest is set to true, the message will be sent out followed by a
851      * Barrier request message. Then it's blocked until the Barrier rely arrives
852      * or timeout. If syncRequest is false, it simply skips the message send and
853      * just waits for the response back.
854      * 
855      * @param msg
856      *            Message to be sent
857      * @param xid
858      *            Message XID
859      * @param request
860      *            If set to true, the message the message will be sent out
861      *            followed by a Barrier request message. If set to false, it
862      *            simply skips the sending and just waits for the Barrier reply.
863      * @return the result
864      */
865     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
866         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
867         messageWaitingDone.put(xid, worker);
868         Object result = null;
869         Boolean status = false;
870         Future<Object> submit = executor.submit(worker);
871         try {
872             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
873             messageWaitingDone.remove(xid);
874             if (result == null) {
875                 // if result is null, then it means the switch can handle this
876                 // message successfully
877                 // convert the result into a Boolean with value true
878                 status = true;
879                 // logger.debug("Successfully send " +
880                 // msg.getType().toString());
881                 result = status;
882             } else {
883                 // if result is not null, this means the switch can't handle
884                 // this message
885                 // the result if OFError already
886                 logger.debug("Send {} failed --> {}", msg.getType().toString(),
887                         ((OFError) result).toString());
888             }
889             return result;
890         } catch (Exception e) {
891             logger.warn("Timeout while waiting for {} reply", msg.getType()
892                     .toString());
893             // convert the result into a Boolean with value false
894             status = false;
895             result = status;
896             return result;
897         }
898     }
899 }