5c51cc5862d5f4c391cd956c1c8b59e1f03059ce
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedSelectorException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.nio.channels.spi.SelectorProvider;
19 import java.util.ArrayList;
20 import java.util.Comparator;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
40 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
41 import org.openflow.protocol.OFBarrierReply;
42 import org.openflow.protocol.OFBarrierRequest;
43 import org.openflow.protocol.OFEchoReply;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFFeaturesReply;
46 import org.openflow.protocol.OFFlowMod;
47 import org.openflow.protocol.OFGetConfigReply;
48 import org.openflow.protocol.OFMatch;
49 import org.openflow.protocol.OFMessage;
50 import org.openflow.protocol.OFPhysicalPort;
51 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
52 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
53 import org.openflow.protocol.OFPhysicalPort.OFPortState;
54 import org.openflow.protocol.OFPort;
55 import org.openflow.protocol.OFPortStatus;
56 import org.openflow.protocol.OFPortStatus.OFPortReason;
57 import org.openflow.protocol.OFSetConfig;
58 import org.openflow.protocol.OFStatisticsReply;
59 import org.openflow.protocol.OFStatisticsRequest;
60 import org.openflow.protocol.OFType;
61 import org.openflow.protocol.factory.BasicFactory;
62 import org.openflow.util.HexString;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class SwitchHandler implements ISwitch {
67     private static final Logger logger = LoggerFactory
68             .getLogger(SwitchHandler.class);
69     private static final int SWITCH_LIVENESS_TIMER = 5000;
70     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
71     private int MESSAGE_RESPONSE_TIMER = 2000;
72
73     private String instanceName;
74     private ISwitch thisISwitch;
75     private IController core;
76     private Long sid;
77     private Integer buffers;
78     private Integer capabilities;
79     private Byte tables;
80     private Integer actions;
81     private Selector selector;
82     private SocketChannel socket;
83     private BasicFactory factory;
84     private AtomicInteger xid;
85     private SwitchState state;
86     private Timer periodicTimer;
87     private Map<Short, OFPhysicalPort> physicalPorts;
88     private Map<Short, Integer> portBandwidth;
89     private Date connectedDate;
90     private Long lastMsgReceivedTimeStamp;
91     private Boolean probeSent;
92     private ExecutorService executor;
93     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
94     private boolean running;
95     private IMessageReadWrite msgReadWriteService;
96     private Thread switchHandlerThread;
97     private Integer responseTimerValue;
98     private PriorityBlockingQueue<PriorityMessage> transmitQ;
99     private Thread transmitThread;
100
101     private enum SwitchState {
102         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
103                 3);
104
105         private int value;
106
107         private SwitchState(int value) {
108             this.value = value;
109         }
110
111         @SuppressWarnings("unused")
112         public int value() {
113             return this.value;
114         }
115     }
116
117     public SwitchHandler(Controller core, SocketChannel sc, String name) {
118         this.instanceName = name;
119         this.thisISwitch = this;
120         this.sid = (long) 0;
121         this.buffers = (int) 0;
122         this.capabilities = (int) 0;
123         this.tables = (byte) 0;
124         this.actions = (int) 0;
125         this.core = core;
126         this.socket = sc;
127         this.factory = new BasicFactory();
128         this.connectedDate = new Date();
129         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
130         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
131         this.portBandwidth = new HashMap<Short, Integer>();
132         this.state = SwitchState.NON_OPERATIONAL;
133         this.probeSent = false;
134         this.xid = new AtomicInteger(this.socket.hashCode());
135         this.periodicTimer = null;
136         this.executor = Executors.newFixedThreadPool(4);
137         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139         String rTimer = System.getProperty("of.messageResponseTimer");
140         if (rTimer != null) {
141             try {
142                 responseTimerValue = Integer.decode(rTimer);
143             } catch (NumberFormatException e) {
144                 logger.warn(
145                         "Invalid of.messageResponseTimer: {} use default({})",
146                         rTimer, MESSAGE_RESPONSE_TIMER);
147             }
148         }
149     }
150
151     public void start() {
152         try {
153             startTransmitThread();
154             setupCommChannel();
155             sendFirstHello();
156             startHandlerThread();
157         } catch (Exception e) {
158             reportError(e);
159         }
160     }
161
162     private void startHandlerThread() {
163         switchHandlerThread = new Thread(new Runnable() {
164             @Override
165             public void run() {
166                 running = true;
167                 while (running) {
168                     try {
169                         // wait for an incoming connection
170                         selector.select(0);
171                         Iterator<SelectionKey> selectedKeys = selector
172                                 .selectedKeys().iterator();
173                         while (selectedKeys.hasNext()) {
174                             SelectionKey skey = selectedKeys.next();
175                             selectedKeys.remove();
176                             if (skey.isValid() && skey.isWritable()) {
177                                 resumeSend();
178                             }
179                             if (skey.isValid() && skey.isReadable()) {
180                                 handleMessages();
181                             }
182                         }
183                     } catch (Exception e) {
184                         reportError(e);
185                     }
186                 }
187             }
188         }, instanceName);
189         switchHandlerThread.start();
190     }
191
192     public void stop() {
193         running = false;
194         cancelSwitchTimer();
195         try {
196             selector.wakeup();
197             selector.close();
198         } catch (Exception e) {
199         }
200         try {
201             socket.close();
202         } catch (Exception e) {
203         }
204         try {
205             msgReadWriteService.stop();
206         } catch (Exception e) {
207         }
208         executor.shutdown();
209
210         selector = null;
211         msgReadWriteService = null;
212
213         if (switchHandlerThread != null) {
214             switchHandlerThread.interrupt();
215         }
216         if (transmitThread != null) {
217             transmitThread.interrupt();
218         }
219     }
220
221     @Override
222     public int getNextXid() {
223         return this.xid.incrementAndGet();
224     }
225
226     /**
227      * This method puts the message in an outgoing priority queue with normal
228      * priority. It will be served after high priority messages. The method
229      * should be used for non-critical messages such as statistics request,
230      * discovery packets, etc. An unique XID is generated automatically and
231      * inserted into the message.
232      * 
233      * @param msg
234      *            The OF message to be sent
235      * @return The XID used
236      */
237     @Override
238     public Integer asyncSend(OFMessage msg) {
239         return asyncSend(msg, getNextXid());
240     }
241
242     private Object syncSend(OFMessage msg, int xid) {
243         return syncMessageInternal(msg, xid, true);
244     }
245
246     /**
247      * This method puts the message in an outgoing priority queue with normal
248      * priority. It will be served after high priority messages. The method
249      * should be used for non-critical messages such as statistics request,
250      * discovery packets, etc. The specified XID is inserted into the message.
251      * 
252      * @param msg
253      *            The OF message to be Sent
254      * @param xid
255      *            The XID to be used in the message
256      * @return The XID used
257      */
258     @Override
259     public Integer asyncSend(OFMessage msg, int xid) {
260         msg.setXid(xid);
261         if (transmitQ != null) {
262             transmitQ.add(new PriorityMessage(msg, 0));
263         }
264         return xid;
265     }
266
267     /**
268      * This method puts the message in an outgoing priority queue with high
269      * priority. It will be served first before normal priority messages. The
270      * method should be used for critical messages such as hello, echo reply
271      * etc. An unique XID is generated automatically and inserted into the
272      * message.
273      * 
274      * @param msg
275      *            The OF message to be sent
276      * @return The XID used
277      */
278     @Override
279     public Integer asyncFastSend(OFMessage msg) {
280         return asyncFastSend(msg, getNextXid());
281     }
282
283     /**
284      * This method puts the message in an outgoing priority queue with high
285      * priority. It will be served first before normal priority messages. The
286      * method should be used for critical messages such as hello, echo reply
287      * etc. The specified XID is inserted into the message.
288      * 
289      * @param msg
290      *            The OF message to be sent
291      * @return The XID used
292      */
293     @Override
294     public Integer asyncFastSend(OFMessage msg, int xid) {
295         msg.setXid(xid);
296         if (transmitQ != null) {
297             transmitQ.add(new PriorityMessage(msg, 1));
298         }
299         return xid;
300     }
301
302     public void resumeSend() {
303         try {
304             if (msgReadWriteService != null) {
305                 msgReadWriteService.resumeSend();
306             }
307         } catch (Exception e) {
308             reportError(e);
309         }
310     }
311
312     public void handleMessages() {
313         List<OFMessage> msgs = null;
314
315         try {
316             msgs = msgReadWriteService.readMessages();
317         } catch (Exception e) {
318             reportError(e);
319         }
320
321         if (msgs == null) {
322             logger.debug("{} is down", toString());
323             // the connection is down, inform core
324             reportSwitchStateChange(false);
325             return;
326         }
327         for (OFMessage msg : msgs) {
328             logger.trace("Message received: {}", msg.toString());
329             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
330             OFType type = msg.getType();
331             switch (type) {
332             case HELLO:
333                 // send feature request
334                 OFMessage featureRequest = factory
335                         .getMessage(OFType.FEATURES_REQUEST);
336                 asyncFastSend(featureRequest);
337                 // delete all pre-existing flows
338                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
339                 OFFlowMod flowMod = (OFFlowMod) factory
340                         .getMessage(OFType.FLOW_MOD);
341                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
342                         .setOutPort(OFPort.OFPP_NONE)
343                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
344                 asyncFastSend(flowMod);
345                 this.state = SwitchState.WAIT_FEATURES_REPLY;
346                 startSwitchTimer();
347                 break;
348             case ECHO_REQUEST:
349                 OFEchoReply echoReply = (OFEchoReply) factory
350                         .getMessage(OFType.ECHO_REPLY);
351                 asyncFastSend(echoReply);
352                 break;
353             case ECHO_REPLY:
354                 this.probeSent = false;
355                 break;
356             case FEATURES_REPLY:
357                 processFeaturesReply((OFFeaturesReply) msg);
358                 break;
359             case GET_CONFIG_REPLY:
360                 // make sure that the switch can send the whole packet to the
361                 // controller
362                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
363                     this.state = SwitchState.OPERATIONAL;
364                 }
365                 break;
366             case BARRIER_REPLY:
367                 processBarrierReply((OFBarrierReply) msg);
368                 break;
369             case ERROR:
370                 processErrorReply((OFError) msg);
371                 break;
372             case PORT_STATUS:
373                 processPortStatusMsg((OFPortStatus) msg);
374                 break;
375             case STATS_REPLY:
376                 processStatsReply((OFStatisticsReply) msg);
377                 break;
378             case PACKET_IN:
379                 break;
380             default:
381                 break;
382             } // end of switch
383             if (isOperational()) {
384                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
385             }
386         } // end of for
387     }
388
389     private void processPortStatusMsg(OFPortStatus msg) {
390         OFPhysicalPort port = msg.getDesc();
391         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
392             updatePhysicalPort(port);
393         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
394             updatePhysicalPort(port);
395         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
396                 .ordinal()) {
397             deletePhysicalPort(port);
398         }
399
400     }
401
402     private void startSwitchTimer() {
403         this.periodicTimer = new Timer();
404         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
405             @Override
406             public void run() {
407                 try {
408                     Long now = System.currentTimeMillis();
409                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
410                         if (probeSent) {
411                             // switch failed to respond to our probe, consider
412                             // it down
413                             logger.warn("{} is idle for too long, disconnect",
414                                     toString());
415                             reportSwitchStateChange(false);
416                         } else {
417                             // send a probe to see if the switch is still alive
418                             logger.debug(
419                                     "Send idle probe (Echo Request) to {}",
420                                     toString());
421                             probeSent = true;
422                             OFMessage echo = factory
423                                     .getMessage(OFType.ECHO_REQUEST);
424                             asyncFastSend(echo);
425                         }
426                     } else {
427                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
428                             // send another features request
429                             OFMessage request = factory
430                                     .getMessage(OFType.FEATURES_REQUEST);
431                             asyncFastSend(request);
432                         } else {
433                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
434                                 // send another config request
435                                 OFSetConfig config = (OFSetConfig) factory
436                                         .getMessage(OFType.SET_CONFIG);
437                                 config.setMissSendLength((short) 0xffff)
438                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
439                                 asyncFastSend(config);
440                                 OFMessage getConfig = factory
441                                         .getMessage(OFType.GET_CONFIG_REQUEST);
442                                 asyncFastSend(getConfig);
443                             }
444                         }
445                     }
446                 } catch (Exception e) {
447                     reportError(e);
448                 }
449             }
450         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
451     }
452
453     private void cancelSwitchTimer() {
454         if (this.periodicTimer != null) {
455             this.periodicTimer.cancel();
456         }
457     }
458
459     private void reportError(Exception e) {
460         if (e instanceof AsynchronousCloseException
461                 || e instanceof InterruptedException
462                 || e instanceof SocketException || e instanceof IOException
463                 || e instanceof ClosedSelectorException) {
464             logger.debug("Caught exception {}", e.getMessage());
465         } else {
466             logger.warn("Caught exception ", e);
467         }
468         // notify core of this error event and disconnect the switch
469         ((Controller) core).takeSwitchEventError(this);
470     }
471
472     private void reportSwitchStateChange(boolean added) {
473         if (added) {
474             ((Controller) core).takeSwitchEventAdd(this);
475         } else {
476             ((Controller) core).takeSwitchEventDelete(this);
477         }
478     }
479
480     @Override
481     public Long getId() {
482         return this.sid;
483     }
484
485     private void processFeaturesReply(OFFeaturesReply reply) {
486         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
487             this.sid = reply.getDatapathId();
488             this.buffers = reply.getBuffers();
489             this.capabilities = reply.getCapabilities();
490             this.tables = reply.getTables();
491             this.actions = reply.getActions();
492             // notify core of this error event
493             for (OFPhysicalPort port : reply.getPorts()) {
494                 updatePhysicalPort(port);
495             }
496             // config the switch to send full data packet
497             OFSetConfig config = (OFSetConfig) factory
498                     .getMessage(OFType.SET_CONFIG);
499             config.setMissSendLength((short) 0xffff).setLengthU(
500                     OFSetConfig.MINIMUM_LENGTH);
501             asyncFastSend(config);
502             // send config request to make sure the switch can handle the set
503             // config
504             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
505             asyncFastSend(getConfig);
506             this.state = SwitchState.WAIT_CONFIG_REPLY;
507             // inform core that a new switch is now operational
508             reportSwitchStateChange(true);
509         }
510     }
511
512     private void updatePhysicalPort(OFPhysicalPort port) {
513         Short portNumber = port.getPortNumber();
514         physicalPorts.put(portNumber, port);
515         portBandwidth
516                 .put(portNumber,
517                         port.getCurrentFeatures()
518                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
519                                         | OFPortFeatures.OFPPF_10MB_HD
520                                                 .getValue()
521                                         | OFPortFeatures.OFPPF_100MB_FD
522                                                 .getValue()
523                                         | OFPortFeatures.OFPPF_100MB_HD
524                                                 .getValue()
525                                         | OFPortFeatures.OFPPF_1GB_FD
526                                                 .getValue()
527                                         | OFPortFeatures.OFPPF_1GB_HD
528                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
529                                             .getValue()));
530     }
531
532     private void deletePhysicalPort(OFPhysicalPort port) {
533         Short portNumber = port.getPortNumber();
534         physicalPorts.remove(portNumber);
535         portBandwidth.remove(portNumber);
536     }
537
538     @Override
539     public boolean isOperational() {
540         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
541     }
542
543     @Override
544     public String toString() {
545         try {
546             return ("Switch:"
547                     + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
548                     + " SWID:" + (isOperational() ? HexString
549                     .toHexString(this.sid) : "unknown"));
550         } catch (Exception e) {
551             return (isOperational() ? HexString.toHexString(this.sid)
552                     : "unknown");
553         }
554
555     }
556
557     @Override
558     public Date getConnectedDate() {
559         return this.connectedDate;
560     }
561
562     public String getInstanceName() {
563         return instanceName;
564     }
565
566     @Override
567     public Object getStatistics(OFStatisticsRequest req) {
568         int xid = getNextXid();
569         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
570         messageWaitingDone.put(xid, worker);
571         Future<Object> submit = executor.submit(worker);
572         Object result = null;
573         try {
574             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
575             return result;
576         } catch (Exception e) {
577             logger.warn("Timeout while waiting for {} replies", req.getType());
578             result = null; // to indicate timeout has occurred
579             return result;
580         }
581     }
582
583     @Override
584     public Object syncSend(OFMessage msg) {
585         int xid = getNextXid();
586         return syncSend(msg, xid);
587     }
588
589     /*
590      * Either a BarrierReply or a OFError is received. If this is a reply for an
591      * outstanding sync message, wake up associated task so that it can continue
592      */
593     private void processBarrierReply(OFBarrierReply msg) {
594         Integer xid = msg.getXid();
595         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
596                 .remove(xid);
597         if (worker == null) {
598             return;
599         }
600         worker.wakeup();
601     }
602
603     private void processErrorReply(OFError errorMsg) {
604         OFMessage offendingMsg = errorMsg.getOffendingMsg();
605         Integer xid;
606         if (offendingMsg != null) {
607             xid = offendingMsg.getXid();
608         } else {
609             xid = errorMsg.getXid();
610         }
611         /*
612          * the error can be a reply to a synchronous message or to a statistic
613          * request message
614          */
615         Callable<?> worker = messageWaitingDone.remove(xid);
616         if (worker == null) {
617             return;
618         }
619         if (worker instanceof SynchronousMessage) {
620             ((SynchronousMessage) worker).wakeup(errorMsg);
621         } else {
622             ((StatisticsCollector) worker).wakeup(errorMsg);
623         }
624     }
625
626     private void processStatsReply(OFStatisticsReply reply) {
627         Integer xid = reply.getXid();
628         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
629                 .get(xid);
630         if (worker == null) {
631             return;
632         }
633         if (worker.collect(reply)) {
634             // if all the stats records are received (collect() returns true)
635             // then we are done.
636             messageWaitingDone.remove(xid);
637             worker.wakeup();
638         }
639     }
640
641     @Override
642     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
643         return this.physicalPorts;
644     }
645
646     @Override
647     public OFPhysicalPort getPhysicalPort(Short portNumber) {
648         return this.physicalPorts.get(portNumber);
649     }
650
651     @Override
652     public Integer getPortBandwidth(Short portNumber) {
653         return this.portBandwidth.get(portNumber);
654     }
655
656     @Override
657     public Set<Short> getPorts() {
658         return this.physicalPorts.keySet();
659     }
660
661     @Override
662     public Byte getTables() {
663         return this.tables;
664     }
665
666     @Override
667     public Integer getActions() {
668         return this.actions;
669     }
670
671     @Override
672     public Integer getCapabilities() {
673         return this.capabilities;
674     }
675
676     @Override
677     public Integer getBuffers() {
678         return this.buffers;
679     }
680
681     @Override
682     public boolean isPortEnabled(short portNumber) {
683         return isPortEnabled(physicalPorts.get(portNumber));
684     }
685
686     @Override
687     public boolean isPortEnabled(OFPhysicalPort port) {
688         if (port == null) {
689             return false;
690         }
691         int portConfig = port.getConfig();
692         int portState = port.getState();
693         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
694             return false;
695         }
696         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
697             return false;
698         }
699         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
700                 .getValue()) {
701             return false;
702         }
703         return true;
704     }
705
706     @Override
707     public List<OFPhysicalPort> getEnabledPorts() {
708         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
709         synchronized (this.physicalPorts) {
710             for (OFPhysicalPort port : physicalPorts.values()) {
711                 if (isPortEnabled(port)) {
712                     result.add(port);
713                 }
714             }
715         }
716         return result;
717     }
718
719     /*
720      * Transmit thread polls the message out of the priority queue and invokes
721      * messaging service to transmit it over the socket channel
722      */
723     class PriorityMessageTransmit implements Runnable {
724         public void run() {
725             running = true;
726             while (running) {
727                 try {
728                     if (!transmitQ.isEmpty()) {
729                         PriorityMessage pmsg = transmitQ.poll();
730                         msgReadWriteService.asyncSend(pmsg.msg);
731                         logger.trace("Message sent: {}", pmsg.toString());
732                         /*
733                          * If syncReply is set to true, wait for the response
734                          * back.
735                          */
736                         if (pmsg.syncReply) {
737                             syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
738                         }
739                     }
740                     Thread.sleep(10);
741                 } catch (InterruptedException ie) {
742                     reportError(new InterruptedException(
743                             "PriorityMessageTransmit thread interrupted"));
744                 } catch (Exception e) {
745                     reportError(e);
746                 }
747             }
748             transmitQ = null;
749         }
750     }
751
752     /*
753      * Setup and start the transmit thread
754      */
755     private void startTransmitThread() {
756         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
757                 new Comparator<PriorityMessage>() {
758                     public int compare(PriorityMessage p1, PriorityMessage p2) {
759                         if (p2.priority != p1.priority) {
760                             return p2.priority - p1.priority;
761                         } else {
762                             return (p2.seqNum < p1.seqNum) ? 1 : -1;
763                         }
764                     }
765                 });
766         this.transmitThread = new Thread(new PriorityMessageTransmit());
767         this.transmitThread.start();
768     }
769
770     /*
771      * Setup communication services
772      */
773     private void setupCommChannel() throws Exception {
774         this.selector = SelectorProvider.provider().openSelector();
775         this.socket.configureBlocking(false);
776         this.socket.socket().setTcpNoDelay(true);
777         this.msgReadWriteService = getMessageReadWriteService();
778     }
779
780     private void sendFirstHello() {
781         try {
782             OFMessage msg = factory.getMessage(OFType.HELLO);
783             asyncFastSend(msg);
784         } catch (Exception e) {
785             reportError(e);
786         }
787     }
788
789     private IMessageReadWrite getMessageReadWriteService() throws Exception {
790         String str = System.getProperty("secureChannelEnabled");
791         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
792                 socket, selector) : new MessageReadWriteService(socket,
793                 selector);
794     }
795
796     /**
797      * Send Barrier message synchronously. The caller will be blocked until the
798      * Barrier reply is received.
799      */
800     @Override
801     public Object syncSendBarrierMessage() {
802         OFBarrierRequest barrierMsg = new OFBarrierRequest();
803         return syncSend(barrierMsg);
804     }
805
806     /**
807      * Send Barrier message asynchronously. The caller is not blocked. The
808      * Barrier message will be sent in a transmit thread which will be blocked
809      * until the Barrier reply is received.
810      */
811     @Override
812     public Object asyncSendBarrierMessage() {
813         if (transmitQ == null) {
814             return Boolean.FALSE;
815         }
816
817         OFBarrierRequest barrierMsg = new OFBarrierRequest();
818         int xid = getNextXid();
819
820         barrierMsg.setXid(xid);
821         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
822         
823         return Boolean.TRUE;
824     }
825
826     /**
827      * This method returns the switch liveness timeout value. If controller did
828      * not receive any message from the switch for such a long period,
829      * controller will tear down the connection to the switch.
830      * 
831      * @return The timeout value
832      */
833     private static int getSwitchLivenessTimeout() {
834         String timeout = System.getProperty("of.switchLivenessTimeout");
835         int rv = 60500;
836
837         try {
838             if (timeout != null) {
839                 rv = Integer.parseInt(timeout);
840             }
841         } catch (Exception e) {
842         }
843
844         return rv;
845     }
846
847     /**
848      * This method performs synchronous operations for a given message. If
849      * syncRequest is set to true, the message will be sent out followed by a
850      * Barrier request message. Then it's blocked until the Barrier rely arrives
851      * or timeout. If syncRequest is false, it simply skips the message send and
852      * just waits for the response back.
853      * 
854      * @param msg
855      *            Message to be sent
856      * @param xid
857      *            Message XID
858      * @param request
859      *            If set to true, the message the message will be sent out
860      *            followed by a Barrier request message. If set to false, it
861      *            simply skips the sending and just waits for the Barrier reply.
862      * @return the result
863      */
864     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
865         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
866         messageWaitingDone.put(xid, worker);
867         Object result = null;
868         Boolean status = false;
869         Future<Object> submit = executor.submit(worker);
870         try {
871             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
872             messageWaitingDone.remove(xid);
873             if (result == null) {
874                 // if result is null, then it means the switch can handle this
875                 // message successfully
876                 // convert the result into a Boolean with value true
877                 status = true;
878                 // logger.debug("Successfully send " +
879                 // msg.getType().toString());
880                 result = status;
881             } else {
882                 // if result is not null, this means the switch can't handle
883                 // this message
884                 // the result if OFError already
885                 logger.debug("Send {} failed --> {}", msg.getType().toString(),
886                         ((OFError) result).toString());
887             }
888             return result;
889         } catch (Exception e) {
890             logger.warn("Timeout while waiting for {} reply", msg.getType()
891                     .toString());
892             // convert the result into a Boolean with value false
893             status = false;
894             result = status;
895             return result;
896         }
897     }
898 }