- In SwitchHandler.java, Transmit Thread waits if the priority queue is empty.
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.net.SocketException;
13 import java.nio.channels.AsynchronousCloseException;
14 import java.nio.channels.ClosedSelectorException;
15 import java.nio.channels.SelectionKey;
16 import java.nio.channels.Selector;
17 import java.nio.channels.SocketChannel;
18 import java.nio.channels.spi.SelectorProvider;
19 import java.util.ArrayList;
20 import java.util.Comparator;
21 import java.util.Date;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Set;
27 import java.util.Timer;
28 import java.util.TimerTask;
29 import java.util.concurrent.Callable;
30 import java.util.concurrent.ConcurrentHashMap;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.PriorityBlockingQueue;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicInteger;
37
38 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
39 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
40 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
41 import org.openflow.protocol.OFBarrierReply;
42 import org.openflow.protocol.OFBarrierRequest;
43 import org.openflow.protocol.OFEchoReply;
44 import org.openflow.protocol.OFError;
45 import org.openflow.protocol.OFFeaturesReply;
46 import org.openflow.protocol.OFFlowMod;
47 import org.openflow.protocol.OFGetConfigReply;
48 import org.openflow.protocol.OFMatch;
49 import org.openflow.protocol.OFMessage;
50 import org.openflow.protocol.OFPhysicalPort;
51 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
52 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
53 import org.openflow.protocol.OFPhysicalPort.OFPortState;
54 import org.openflow.protocol.OFPort;
55 import org.openflow.protocol.OFPortStatus;
56 import org.openflow.protocol.OFPortStatus.OFPortReason;
57 import org.openflow.protocol.OFSetConfig;
58 import org.openflow.protocol.OFStatisticsReply;
59 import org.openflow.protocol.OFStatisticsRequest;
60 import org.openflow.protocol.OFType;
61 import org.openflow.protocol.factory.BasicFactory;
62 import org.openflow.util.HexString;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 public class SwitchHandler implements ISwitch {
67     private static final Logger logger = LoggerFactory
68             .getLogger(SwitchHandler.class);
69     private static final int SWITCH_LIVENESS_TIMER = 5000;
70     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
71     private final int MESSAGE_RESPONSE_TIMER = 2000;
72
73     private final String instanceName;
74     private final ISwitch thisISwitch;
75     private final IController core;
76     private Long sid;
77     private Integer buffers;
78     private Integer capabilities;
79     private Byte tables;
80     private Integer actions;
81     private Selector selector;
82     private final SocketChannel socket;
83     private final BasicFactory factory;
84     private final AtomicInteger xid;
85     private SwitchState state;
86     private Timer periodicTimer;
87     private final Map<Short, OFPhysicalPort> physicalPorts;
88     private final Map<Short, Integer> portBandwidth;
89     private final Date connectedDate;
90     private Long lastMsgReceivedTimeStamp;
91     private Boolean probeSent;
92     private final ExecutorService executor;
93     private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
94     private boolean running;
95     private IMessageReadWrite msgReadWriteService;
96     private Thread switchHandlerThread;
97     private Integer responseTimerValue;
98     private PriorityBlockingQueue<PriorityMessage> transmitQ;
99     private Thread transmitThread;
100
101     private enum SwitchState {
102         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(
103                 3);
104
105         private int value;
106
107         private SwitchState(int value) {
108             this.value = value;
109         }
110
111         @SuppressWarnings("unused")
112         public int value() {
113             return this.value;
114         }
115     }
116
117     public SwitchHandler(Controller core, SocketChannel sc, String name) {
118         this.instanceName = name;
119         this.thisISwitch = this;
120         this.sid = (long) 0;
121         this.buffers = (int) 0;
122         this.capabilities = (int) 0;
123         this.tables = (byte) 0;
124         this.actions = (int) 0;
125         this.core = core;
126         this.socket = sc;
127         this.factory = new BasicFactory();
128         this.connectedDate = new Date();
129         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
130         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
131         this.portBandwidth = new HashMap<Short, Integer>();
132         this.state = SwitchState.NON_OPERATIONAL;
133         this.probeSent = false;
134         this.xid = new AtomicInteger(this.socket.hashCode());
135         this.periodicTimer = null;
136         this.executor = Executors.newFixedThreadPool(4);
137         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139         String rTimer = System.getProperty("of.messageResponseTimer");
140         if (rTimer != null) {
141             try {
142                 responseTimerValue = Integer.decode(rTimer);
143             } catch (NumberFormatException e) {
144                 logger.warn(
145                         "Invalid of.messageResponseTimer: {} use default({})",
146                         rTimer, MESSAGE_RESPONSE_TIMER);
147             }
148         }
149     }
150
151     public void start() {
152         try {
153             startTransmitThread();
154             setupCommChannel();
155             sendFirstHello();
156             startHandlerThread();
157         } catch (Exception e) {
158             reportError(e);
159         }
160     }
161
162     private void startHandlerThread() {
163         switchHandlerThread = new Thread(new Runnable() {
164             @Override
165             public void run() {
166                 running = true;
167                 while (running) {
168                     try {
169                         // wait for an incoming connection
170                         selector.select(0);
171                         Iterator<SelectionKey> selectedKeys = selector
172                                 .selectedKeys().iterator();
173                         while (selectedKeys.hasNext()) {
174                             SelectionKey skey = selectedKeys.next();
175                             selectedKeys.remove();
176                             if (skey.isValid() && skey.isWritable()) {
177                                 resumeSend();
178                             }
179                             if (skey.isValid() && skey.isReadable()) {
180                                 handleMessages();
181                             }
182                         }
183                     } catch (Exception e) {
184                         reportError(e);
185                     }
186                 }
187             }
188         }, instanceName);
189         switchHandlerThread.start();
190     }
191
192     public void stop() {
193         running = false;
194         cancelSwitchTimer();
195         try {
196             selector.wakeup();
197             selector.close();
198         } catch (Exception e) {
199         }
200         try {
201             socket.close();
202         } catch (Exception e) {
203         }
204         try {
205             msgReadWriteService.stop();
206         } catch (Exception e) {
207         }
208         executor.shutdown();
209
210         msgReadWriteService = null;
211
212         if (switchHandlerThread != null) {
213             switchHandlerThread.interrupt();
214         }
215         if (transmitThread != null) {
216             transmitThread.interrupt();
217         }
218     }
219
220     @Override
221     public int getNextXid() {
222         return this.xid.incrementAndGet();
223     }
224
225     /**
226      * This method puts the message in an outgoing priority queue with normal
227      * priority. It will be served after high priority messages. The method
228      * should be used for non-critical messages such as statistics request,
229      * discovery packets, etc. An unique XID is generated automatically and
230      * inserted into the message.
231      *
232      * @param msg
233      *            The OF message to be sent
234      * @return The XID used
235      */
236     @Override
237     public Integer asyncSend(OFMessage msg) {
238         return asyncSend(msg, getNextXid());
239     }
240
241     private Object syncSend(OFMessage msg, int xid) {
242         return syncMessageInternal(msg, xid, true);
243     }
244
245     /**
246      * This method puts the message in an outgoing priority queue with normal
247      * priority. It will be served after high priority messages. The method
248      * should be used for non-critical messages such as statistics request,
249      * discovery packets, etc. The specified XID is inserted into the message.
250      *
251      * @param msg
252      *            The OF message to be Sent
253      * @param xid
254      *            The XID to be used in the message
255      * @return The XID used
256      */
257     @Override
258     public Integer asyncSend(OFMessage msg, int xid) {
259         msg.setXid(xid);
260         if (transmitQ != null) {
261             transmitQ.add(new PriorityMessage(msg, 0));
262         }
263         return xid;
264     }
265
266     /**
267      * This method puts the message in an outgoing priority queue with high
268      * priority. It will be served first before normal priority messages. The
269      * method should be used for critical messages such as hello, echo reply
270      * etc. An unique XID is generated automatically and inserted into the
271      * message.
272      *
273      * @param msg
274      *            The OF message to be sent
275      * @return The XID used
276      */
277     @Override
278     public Integer asyncFastSend(OFMessage msg) {
279         return asyncFastSend(msg, getNextXid());
280     }
281
282     /**
283      * This method puts the message in an outgoing priority queue with high
284      * priority. It will be served first before normal priority messages. The
285      * method should be used for critical messages such as hello, echo reply
286      * etc. The specified XID is inserted into the message.
287      *
288      * @param msg
289      *            The OF message to be sent
290      * @return The XID used
291      */
292     @Override
293     public Integer asyncFastSend(OFMessage msg, int xid) {
294         msg.setXid(xid);
295         if (transmitQ != null) {
296             transmitQ.add(new PriorityMessage(msg, 1));
297         }
298         return xid;
299     }
300
301     public void resumeSend() {
302         try {
303             if (msgReadWriteService != null) {
304                 msgReadWriteService.resumeSend();
305             }
306         } catch (Exception e) {
307             reportError(e);
308         }
309     }
310
311     /**
312      * This method bypasses the transmit queue and sends the message over the
313      * socket directly. If the input xid is not null, the specified xid is
314      * inserted into the message. Otherwise, an unique xid is generated
315      * automatically and inserted into the message.
316      *
317      * @param msg
318      *            Message to be sent
319      * @param xid
320      *            Message xid
321      */
322     private void asyncSendNow(OFMessage msg, Integer xid) {
323         if (xid == null) {
324             xid = getNextXid();
325         }
326         msg.setXid(xid);
327
328         asyncSendNow(msg);
329     }
330
331     /**
332      * This method bypasses the transmit queue and sends the message over the
333      * socket directly.
334      *
335      * @param msg
336      *            Message to be sent
337      */
338     private void asyncSendNow(OFMessage msg) {
339         if (msgReadWriteService == null) {
340             logger.warn(
341                     "asyncSendNow: {} is not sent because Message ReadWrite Service is not available.",
342                     msg);
343             return;
344         }
345
346         try {
347             msgReadWriteService.asyncSend(msg);
348         } catch (Exception e) {
349             reportError(e);
350         }
351     }
352
353     public void handleMessages() {
354         List<OFMessage> msgs = null;
355
356         try {
357             if (msgReadWriteService != null) {
358                 msgs = msgReadWriteService.readMessages();
359             }
360         } catch (Exception e) {
361             reportError(e);
362         }
363
364         if (msgs == null) {
365             logger.debug("{} is down", this);
366             // the connection is down, inform core
367             reportSwitchStateChange(false);
368             return;
369         }
370         for (OFMessage msg : msgs) {
371             logger.trace("Message received: {}", msg);
372             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
373             OFType type = msg.getType();
374             switch (type) {
375             case HELLO:
376                 // send feature request
377                 OFMessage featureRequest = factory
378                         .getMessage(OFType.FEATURES_REQUEST);
379                 asyncFastSend(featureRequest);
380                 // delete all pre-existing flows
381                 OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
382                 OFFlowMod flowMod = (OFFlowMod) factory
383                         .getMessage(OFType.FLOW_MOD);
384                 flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
385                         .setOutPort(OFPort.OFPP_NONE)
386                         .setLength((short) OFFlowMod.MINIMUM_LENGTH);
387                 asyncFastSend(flowMod);
388                 this.state = SwitchState.WAIT_FEATURES_REPLY;
389                 startSwitchTimer();
390                 break;
391             case ECHO_REQUEST:
392                 OFEchoReply echoReply = (OFEchoReply) factory
393                         .getMessage(OFType.ECHO_REPLY);
394                 // respond immediately
395                 asyncSendNow(echoReply, msg.getXid());
396                 break;
397             case ECHO_REPLY:
398                 this.probeSent = false;
399                 break;
400             case FEATURES_REPLY:
401                 processFeaturesReply((OFFeaturesReply) msg);
402                 break;
403             case GET_CONFIG_REPLY:
404                 // make sure that the switch can send the whole packet to the
405                 // controller
406                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
407                     this.state = SwitchState.OPERATIONAL;
408                 }
409                 break;
410             case BARRIER_REPLY:
411                 processBarrierReply((OFBarrierReply) msg);
412                 break;
413             case ERROR:
414                 processErrorReply((OFError) msg);
415                 break;
416             case PORT_STATUS:
417                 processPortStatusMsg((OFPortStatus) msg);
418                 break;
419             case STATS_REPLY:
420                 processStatsReply((OFStatisticsReply) msg);
421                 break;
422             case PACKET_IN:
423                 break;
424             default:
425                 break;
426             } // end of switch
427             if (isOperational()) {
428                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
429             }
430         } // end of for
431     }
432
433     private void processPortStatusMsg(OFPortStatus msg) {
434         OFPhysicalPort port = msg.getDesc();
435         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
436             updatePhysicalPort(port);
437         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
438             updatePhysicalPort(port);
439         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
440                 .ordinal()) {
441             deletePhysicalPort(port);
442         }
443
444     }
445
446     private void startSwitchTimer() {
447         this.periodicTimer = new Timer();
448         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
449             @Override
450             public void run() {
451                 try {
452                     Long now = System.currentTimeMillis();
453                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
454                         if (probeSent) {
455                             // switch failed to respond to our probe, consider
456                             // it down
457                             logger.warn("{} is idle for too long, disconnect",
458                                     toString());
459                             reportSwitchStateChange(false);
460                         } else {
461                             // send a probe to see if the switch is still alive
462                             logger.debug(
463                                     "Send idle probe (Echo Request) to {}",
464                                     this);
465                             probeSent = true;
466                             OFMessage echo = factory
467                                     .getMessage(OFType.ECHO_REQUEST);
468                             asyncFastSend(echo);
469                         }
470                     } else {
471                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
472                             // send another features request
473                             OFMessage request = factory
474                                     .getMessage(OFType.FEATURES_REQUEST);
475                             asyncFastSend(request);
476                         } else {
477                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
478                                 // send another config request
479                                 OFSetConfig config = (OFSetConfig) factory
480                                         .getMessage(OFType.SET_CONFIG);
481                                 config.setMissSendLength((short) 0xffff)
482                                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
483                                 asyncFastSend(config);
484                                 OFMessage getConfig = factory
485                                         .getMessage(OFType.GET_CONFIG_REQUEST);
486                                 asyncFastSend(getConfig);
487                             }
488                         }
489                     }
490                 } catch (Exception e) {
491                     reportError(e);
492                 }
493             }
494         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
495     }
496
497     private void cancelSwitchTimer() {
498         if (this.periodicTimer != null) {
499             this.periodicTimer.cancel();
500         }
501     }
502
503     private void reportError(Exception e) {
504         if (e instanceof AsynchronousCloseException
505                 || e instanceof InterruptedException
506                 || e instanceof SocketException || e instanceof IOException
507                 || e instanceof ClosedSelectorException) {
508             if (logger.isDebugEnabled()) {
509               logger.debug("Caught exception {}", e.getMessage());
510             }
511         } else {
512             logger.warn("Caught exception ", e);
513         }
514         // notify core of this error event and disconnect the switch
515         ((Controller) core).takeSwitchEventError(this);
516     }
517
518     private void reportSwitchStateChange(boolean added) {
519         if (added) {
520             ((Controller) core).takeSwitchEventAdd(this);
521         } else {
522             ((Controller) core).takeSwitchEventDelete(this);
523         }
524     }
525
526     @Override
527     public Long getId() {
528         return this.sid;
529     }
530
531     private void processFeaturesReply(OFFeaturesReply reply) {
532         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
533             this.sid = reply.getDatapathId();
534             this.buffers = reply.getBuffers();
535             this.capabilities = reply.getCapabilities();
536             this.tables = reply.getTables();
537             this.actions = reply.getActions();
538             // notify core of this error event
539             for (OFPhysicalPort port : reply.getPorts()) {
540                 updatePhysicalPort(port);
541             }
542             // config the switch to send full data packet
543             OFSetConfig config = (OFSetConfig) factory
544                     .getMessage(OFType.SET_CONFIG);
545             config.setMissSendLength((short) 0xffff).setLengthU(
546                     OFSetConfig.MINIMUM_LENGTH);
547             asyncFastSend(config);
548             // send config request to make sure the switch can handle the set
549             // config
550             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
551             asyncFastSend(getConfig);
552             this.state = SwitchState.WAIT_CONFIG_REPLY;
553             // inform core that a new switch is now operational
554             reportSwitchStateChange(true);
555         }
556     }
557
558     private void updatePhysicalPort(OFPhysicalPort port) {
559         Short portNumber = port.getPortNumber();
560         physicalPorts.put(portNumber, port);
561         portBandwidth
562                 .put(portNumber,
563                         port.getCurrentFeatures()
564                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
565                                         | OFPortFeatures.OFPPF_10MB_HD
566                                                 .getValue()
567                                         | OFPortFeatures.OFPPF_100MB_FD
568                                                 .getValue()
569                                         | OFPortFeatures.OFPPF_100MB_HD
570                                                 .getValue()
571                                         | OFPortFeatures.OFPPF_1GB_FD
572                                                 .getValue()
573                                         | OFPortFeatures.OFPPF_1GB_HD
574                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
575                                             .getValue()));
576     }
577
578     private void deletePhysicalPort(OFPhysicalPort port) {
579         Short portNumber = port.getPortNumber();
580         physicalPorts.remove(portNumber);
581         portBandwidth.remove(portNumber);
582     }
583
584     @Override
585     public boolean isOperational() {
586         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
587     }
588
589     @Override
590     public String toString() {
591         try {
592             return ("Switch:"
593                     + socket.socket().getRemoteSocketAddress().toString().split("/")[1]
594                     + " SWID:" + (isOperational() ? HexString
595                     .toHexString(this.sid) : "unknown"));
596         } catch (Exception e) {
597             return (isOperational() ? HexString.toHexString(this.sid)
598                     : "unknown");
599         }
600
601     }
602
603     @Override
604     public Date getConnectedDate() {
605         return this.connectedDate;
606     }
607
608     public String getInstanceName() {
609         return instanceName;
610     }
611
612     @Override
613     public Object getStatistics(OFStatisticsRequest req) {
614         int xid = getNextXid();
615         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
616         messageWaitingDone.put(xid, worker);
617         Future<Object> submit = executor.submit(worker);
618         Object result = null;
619         try {
620             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
621             return result;
622         } catch (Exception e) {
623             logger.warn("Timeout while waiting for {} replies", req.getType());
624             result = null; // to indicate timeout has occurred
625             return result;
626         }
627     }
628
629     @Override
630     public Object syncSend(OFMessage msg) {
631         int xid = getNextXid();
632         return syncSend(msg, xid);
633     }
634
635     /*
636      * Either a BarrierReply or a OFError is received. If this is a reply for an
637      * outstanding sync message, wake up associated task so that it can continue
638      */
639     private void processBarrierReply(OFBarrierReply msg) {
640         Integer xid = msg.getXid();
641         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
642                 .remove(xid);
643         if (worker == null) {
644             return;
645         }
646         worker.wakeup();
647     }
648
649     private void processErrorReply(OFError errorMsg) {
650         OFMessage offendingMsg = errorMsg.getOffendingMsg();
651         Integer xid;
652         if (offendingMsg != null) {
653             xid = offendingMsg.getXid();
654         } else {
655             xid = errorMsg.getXid();
656         }
657         /*
658          * the error can be a reply to a synchronous message or to a statistic
659          * request message
660          */
661         Callable<?> worker = messageWaitingDone.remove(xid);
662         if (worker == null) {
663             return;
664         }
665         if (worker instanceof SynchronousMessage) {
666             ((SynchronousMessage) worker).wakeup(errorMsg);
667         } else {
668             ((StatisticsCollector) worker).wakeup(errorMsg);
669         }
670     }
671
672     private void processStatsReply(OFStatisticsReply reply) {
673         Integer xid = reply.getXid();
674         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
675                 .get(xid);
676         if (worker == null) {
677             return;
678         }
679         if (worker.collect(reply)) {
680             // if all the stats records are received (collect() returns true)
681             // then we are done.
682             messageWaitingDone.remove(xid);
683             worker.wakeup();
684         }
685     }
686
687     @Override
688     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
689         return this.physicalPorts;
690     }
691
692     @Override
693     public OFPhysicalPort getPhysicalPort(Short portNumber) {
694         return this.physicalPorts.get(portNumber);
695     }
696
697     @Override
698     public Integer getPortBandwidth(Short portNumber) {
699         return this.portBandwidth.get(portNumber);
700     }
701
702     @Override
703     public Set<Short> getPorts() {
704         return this.physicalPorts.keySet();
705     }
706
707     @Override
708     public Byte getTables() {
709         return this.tables;
710     }
711
712     @Override
713     public Integer getActions() {
714         return this.actions;
715     }
716
717     @Override
718     public Integer getCapabilities() {
719         return this.capabilities;
720     }
721
722     @Override
723     public Integer getBuffers() {
724         return this.buffers;
725     }
726
727     @Override
728     public boolean isPortEnabled(short portNumber) {
729         return isPortEnabled(physicalPorts.get(portNumber));
730     }
731
732     @Override
733     public boolean isPortEnabled(OFPhysicalPort port) {
734         if (port == null) {
735             return false;
736         }
737         int portConfig = port.getConfig();
738         int portState = port.getState();
739         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
740             return false;
741         }
742         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
743             return false;
744         }
745         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
746                 .getValue()) {
747             return false;
748         }
749         return true;
750     }
751
752     @Override
753     public List<OFPhysicalPort> getEnabledPorts() {
754         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
755         synchronized (this.physicalPorts) {
756             for (OFPhysicalPort port : physicalPorts.values()) {
757                 if (isPortEnabled(port)) {
758                     result.add(port);
759                 }
760             }
761         }
762         return result;
763     }
764
765     /*
766      * Transmit thread polls the message out of the priority queue and invokes
767      * messaging service to transmit it over the socket channel
768      */
769     class PriorityMessageTransmit implements Runnable {
770         @Override
771         public void run() {
772             running = true;
773             while (running) {
774                 try {
775                     PriorityMessage pmsg = transmitQ.take();
776                     msgReadWriteService.asyncSend(pmsg.msg);
777                     /*
778                      * If syncReply is set to true, wait for the response back.
779                      */
780                     if (pmsg.syncReply) {
781                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
782                     }
783                 } catch (InterruptedException ie) {
784                     reportError(new InterruptedException(
785                             "PriorityMessageTransmit thread interrupted"));
786                 } catch (Exception e) {
787                     reportError(e);
788                 }
789             }
790             transmitQ = null;
791         }
792     }
793
794     /*
795      * Setup and start the transmit thread
796      */
797     private void startTransmitThread() {
798         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11,
799                 new Comparator<PriorityMessage>() {
800                     @Override
801                     public int compare(PriorityMessage p1, PriorityMessage p2) {
802                         if (p2.priority != p1.priority) {
803                             return p2.priority - p1.priority;
804                         } else {
805                             return (p2.seqNum < p1.seqNum) ? 1 : -1;
806                         }
807                     }
808                 });
809         this.transmitThread = new Thread(new PriorityMessageTransmit());
810         this.transmitThread.start();
811     }
812
813     /*
814      * Setup communication services
815      */
816     private void setupCommChannel() throws Exception {
817         this.selector = SelectorProvider.provider().openSelector();
818         this.socket.configureBlocking(false);
819         this.socket.socket().setTcpNoDelay(true);
820         this.msgReadWriteService = getMessageReadWriteService();
821     }
822
823     private void sendFirstHello() {
824         try {
825             OFMessage msg = factory.getMessage(OFType.HELLO);
826             asyncFastSend(msg);
827         } catch (Exception e) {
828             reportError(e);
829         }
830     }
831
832     private IMessageReadWrite getMessageReadWriteService() throws Exception {
833         String str = System.getProperty("secureChannelEnabled");
834         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(
835                 socket, selector) : new MessageReadWriteService(socket,
836                 selector);
837     }
838
839     /**
840      * Send Barrier message synchronously. The caller will be blocked until the
841      * Barrier reply is received.
842      */
843     @Override
844     public Object syncSendBarrierMessage() {
845         OFBarrierRequest barrierMsg = new OFBarrierRequest();
846         return syncSend(barrierMsg);
847     }
848
849     /**
850      * Send Barrier message asynchronously. The caller is not blocked. The
851      * Barrier message will be sent in a transmit thread which will be blocked
852      * until the Barrier reply is received.
853      */
854     @Override
855     public Object asyncSendBarrierMessage() {
856         if (transmitQ == null) {
857             return Boolean.FALSE;
858         }
859
860         OFBarrierRequest barrierMsg = new OFBarrierRequest();
861         int xid = getNextXid();
862
863         barrierMsg.setXid(xid);
864         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
865
866         return Boolean.TRUE;
867     }
868
869     /**
870      * This method returns the switch liveness timeout value. If controller did
871      * not receive any message from the switch for such a long period,
872      * controller will tear down the connection to the switch.
873      *
874      * @return The timeout value
875      */
876     private static int getSwitchLivenessTimeout() {
877         String timeout = System.getProperty("of.switchLivenessTimeout");
878         int rv = 60500;
879
880         try {
881             if (timeout != null) {
882                 rv = Integer.parseInt(timeout);
883             }
884         } catch (Exception e) {
885         }
886
887         return rv;
888     }
889
890     /**
891      * This method performs synchronous operations for a given message. If
892      * syncRequest is set to true, the message will be sent out followed by a
893      * Barrier request message. Then it's blocked until the Barrier rely arrives
894      * or timeout. If syncRequest is false, it simply skips the message send and
895      * just waits for the response back.
896      *
897      * @param msg
898      *            Message to be sent
899      * @param xid
900      *            Message XID
901      * @param request
902      *            If set to true, the message the message will be sent out
903      *            followed by a Barrier request message. If set to false, it
904      *            simply skips the sending and just waits for the Barrier reply.
905      * @return the result
906      */
907     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
908         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
909         messageWaitingDone.put(xid, worker);
910         Object result = null;
911         Boolean status = false;
912         Future<Object> submit = executor.submit(worker);
913         try {
914             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
915             messageWaitingDone.remove(xid);
916             if (result == null) {
917                 // if result is null, then it means the switch can handle this
918                 // message successfully
919                 // convert the result into a Boolean with value true
920                 status = true;
921                 // logger.debug("Successfully send " +
922                 // msg.getType().toString());
923                 result = status;
924             } else {
925                 // if result is not null, this means the switch can't handle
926                 // this message
927                 // the result if OFError already
928                 if (logger.isDebugEnabled()) {
929                   logger.debug("Send {} failed --> {}", msg.getType(),
930                                (result));
931                 }
932             }
933             return result;
934         } catch (Exception e) {
935             logger.warn("Timeout while waiting for {} reply", msg.getType()
936                     .toString());
937             // convert the result into a Boolean with value false
938             status = false;
939             result = status;
940             return result;
941         }
942     }
943 }