ed79d5c5a8f88a585dac9363478e6870bc77889c
[controller.git] / opendaylight / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.nio.channels.SelectionKey;
12 import java.nio.channels.Selector;
13 import java.nio.channels.SocketChannel;
14 import java.nio.channels.spi.SelectorProvider;
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.openflow.protocol.OFBarrierReply;
39 import org.openflow.protocol.OFBarrierRequest;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFEchoRequest;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFeaturesReply;
44 import org.openflow.protocol.OFFlowMod;
45 import org.openflow.protocol.OFGetConfigReply;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFPhysicalPort;
49 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
50 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
51 import org.openflow.protocol.OFPhysicalPort.OFPortState;
52 import org.openflow.protocol.OFPort;
53 import org.openflow.protocol.OFPortStatus;
54 import org.openflow.protocol.OFPortStatus.OFPortReason;
55 import org.openflow.protocol.OFSetConfig;
56 import org.openflow.protocol.OFStatisticsReply;
57 import org.openflow.protocol.OFStatisticsRequest;
58 import org.openflow.protocol.OFType;
59 import org.openflow.protocol.factory.BasicFactory;
60 import org.openflow.util.HexString;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64 public class SwitchHandler implements ISwitch {
65     private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
66     private static final int SWITCH_LIVENESS_TIMER = 5000;
67     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
68     private final int MESSAGE_RESPONSE_TIMER = 2000;
69
70     private final String instanceName;
71     private final ISwitch thisISwitch;
72     private final IController core;
73     private Long sid;
74     private Integer buffers;
75     private Integer capabilities;
76     private Byte tables;
77     private Integer actions;
78     private Selector selector;
79     private final SocketChannel socket;
80     private final BasicFactory factory;
81     private final AtomicInteger xid;
82     private SwitchState state;
83     private Timer periodicTimer;
84     private final Map<Short, OFPhysicalPort> physicalPorts;
85     private final Map<Short, Integer> portBandwidth;
86     private final Date connectedDate;
87     private Long lastMsgReceivedTimeStamp;
88     private Boolean probeSent;
89     private final ExecutorService executor;
90     private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
91     private boolean running;
92     private IMessageReadWrite msgReadWriteService;
93     private Thread switchHandlerThread;
94     private Integer responseTimerValue;
95     private PriorityBlockingQueue<PriorityMessage> transmitQ;
96     private Thread transmitThread;
97
98     private enum SwitchState {
99         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
100
101         private int value;
102
103         private SwitchState(int value) {
104             this.value = value;
105         }
106
107         @SuppressWarnings("unused")
108         public int value() {
109             return this.value;
110         }
111     }
112
113     public SwitchHandler(Controller core, SocketChannel sc, String name) {
114         this.instanceName = name;
115         this.thisISwitch = this;
116         this.sid = (long) 0;
117         this.buffers = (int) 0;
118         this.capabilities = (int) 0;
119         this.tables = (byte) 0;
120         this.actions = (int) 0;
121         this.core = core;
122         this.socket = sc;
123         this.factory = new BasicFactory();
124         this.connectedDate = new Date();
125         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
126         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
127         this.portBandwidth = new HashMap<Short, Integer>();
128         this.state = SwitchState.NON_OPERATIONAL;
129         this.probeSent = false;
130         this.xid = new AtomicInteger(this.socket.hashCode());
131         this.periodicTimer = null;
132         this.executor = Executors.newFixedThreadPool(4);
133         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
134         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
135         String rTimer = System.getProperty("of.messageResponseTimer");
136         if (rTimer != null) {
137             try {
138                 responseTimerValue = Integer.decode(rTimer);
139             } catch (NumberFormatException e) {
140                 logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
141             }
142         }
143     }
144
145     public void start() {
146         try {
147             startTransmitThread();
148             setupCommChannel();
149             sendFirstHello();
150             startHandlerThread();
151         } catch (Exception e) {
152             reportError(e);
153         }
154     }
155
156     private void startHandlerThread() {
157         switchHandlerThread = new Thread(new Runnable() {
158             @Override
159             public void run() {
160                 running = true;
161                 while (running) {
162                     try {
163                         // wait for an incoming connection
164                         selector.select(0);
165                         Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
166                         while (selectedKeys.hasNext()) {
167                             SelectionKey skey = selectedKeys.next();
168                             selectedKeys.remove();
169                             if (skey.isValid() && skey.isWritable()) {
170                                 resumeSend();
171                             }
172                             if (skey.isValid() && skey.isReadable()) {
173                                 handleMessages();
174                             }
175                         }
176                     } catch (Exception e) {
177                         reportError(e);
178                     }
179                 }
180             }
181         }, instanceName);
182         switchHandlerThread.start();
183     }
184
185     private void stopInternal() {
186         logger.debug("{} receives stop signal",
187                 (isOperational() ? HexString.toHexString(sid) : "unknown"));
188         running = false;
189         cancelSwitchTimer();
190         try {
191             selector.wakeup();
192             selector.close();
193         } catch (Exception e) {
194         }
195         try {
196             socket.close();
197         } catch (Exception e) {
198         }
199         try {
200             msgReadWriteService.stop();
201         } catch (Exception e) {
202         }
203         logger.debug("executor shutdown now");
204         executor.shutdownNow();
205
206         msgReadWriteService = null;
207     }
208
209     public void stop() {
210         stopInternal();
211
212         if (switchHandlerThread != null) {
213             switchHandlerThread.interrupt();
214         }
215         if (transmitThread != null) {
216             transmitThread.interrupt();
217         }
218     }
219
220     @Override
221     public int getNextXid() {
222         return this.xid.incrementAndGet();
223     }
224
225     /**
226      * This method puts the message in an outgoing priority queue with normal
227      * priority. It will be served after high priority messages. The method
228      * should be used for non-critical messages such as statistics request,
229      * discovery packets, etc. An unique XID is generated automatically and
230      * inserted into the message.
231      *
232      * @param msg
233      *            The OF message to be sent
234      * @return The XID used
235      */
236     @Override
237     public Integer asyncSend(OFMessage msg) {
238         return asyncSend(msg, getNextXid());
239     }
240
241     private Object syncSend(OFMessage msg, int xid) {
242         return syncMessageInternal(msg, xid, true);
243     }
244
245     /**
246      * This method puts the message in an outgoing priority queue with normal
247      * priority. It will be served after high priority messages. The method
248      * should be used for non-critical messages such as statistics request,
249      * discovery packets, etc. The specified XID is inserted into the message.
250      *
251      * @param msg
252      *            The OF message to be Sent
253      * @param xid
254      *            The XID to be used in the message
255      * @return The XID used
256      */
257     @Override
258     public Integer asyncSend(OFMessage msg, int xid) {
259         msg.setXid(xid);
260         if (transmitQ != null) {
261             transmitQ.add(new PriorityMessage(msg, 0));
262         }
263         return xid;
264     }
265
266     /**
267      * This method puts the message in an outgoing priority queue with high
268      * priority. It will be served first before normal priority messages. The
269      * method should be used for critical messages such as hello, echo reply
270      * etc. An unique XID is generated automatically and inserted into the
271      * message.
272      *
273      * @param msg
274      *            The OF message to be sent
275      * @return The XID used
276      */
277     @Override
278     public Integer asyncFastSend(OFMessage msg) {
279         return asyncFastSend(msg, getNextXid());
280     }
281
282     /**
283      * This method puts the message in an outgoing priority queue with high
284      * priority. It will be served first before normal priority messages. The
285      * method should be used for critical messages such as hello, echo reply
286      * etc. The specified XID is inserted into the message.
287      *
288      * @param msg
289      *            The OF message to be sent
290      * @return The XID used
291      */
292     @Override
293     public Integer asyncFastSend(OFMessage msg, int xid) {
294         msg.setXid(xid);
295         if (transmitQ != null) {
296             transmitQ.add(new PriorityMessage(msg, 1));
297         }
298         return xid;
299     }
300
301     public void resumeSend() {
302         try {
303             if (msgReadWriteService != null) {
304                 msgReadWriteService.resumeSend();
305             }
306         } catch (Exception e) {
307             reportError(e);
308         }
309     }
310
311     /**
312      * This method bypasses the transmit queue and sends the message over the
313      * socket directly. If the input xid is not null, the specified xid is
314      * inserted into the message. Otherwise, an unique xid is generated
315      * automatically and inserted into the message.
316      *
317      * @param msg
318      *            Message to be sent
319      * @param xid
320      *            Message xid
321      */
322     private void asyncSendNow(OFMessage msg, Integer xid) {
323         if (xid == null) {
324             xid = getNextXid();
325         }
326         msg.setXid(xid);
327
328         asyncSendNow(msg);
329     }
330
331     /**
332      * This method bypasses the transmit queue and sends the message over the
333      * socket directly.
334      *
335      * @param msg
336      *            Message to be sent
337      */
338     private void asyncSendNow(OFMessage msg) {
339         if (msgReadWriteService == null) {
340             logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
341             return;
342         }
343
344         try {
345             msgReadWriteService.asyncSend(msg);
346         } catch (Exception e) {
347             reportError(e);
348         }
349     }
350
351     public void handleMessages() {
352         List<OFMessage> msgs = null;
353
354         try {
355             if (msgReadWriteService != null) {
356                 msgs = msgReadWriteService.readMessages();
357             }
358         } catch (Exception e) {
359             reportError(e);
360         }
361
362         if (msgs == null) {
363             return;
364         }
365         for (OFMessage msg : msgs) {
366             logger.trace("Message received: {}", msg);
367             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
368             OFType type = msg.getType();
369             switch (type) {
370             case HELLO:
371                 sendFeaturesRequest();
372                 break;
373             case ECHO_REQUEST:
374                 OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
375
376                 // the EchoReply must have the same payload as the request
377                 byte []payload = ((OFEchoRequest)msg).getPayload();
378                 echoReply.setPayload(payload);
379                 echoReply.setLength( (short) (echoReply.getLength() + payload.length ));
380
381                 // respond immediately
382                 asyncSendNow(echoReply, msg.getXid());
383
384                 // send features request if not sent yet
385                 sendFeaturesRequest();
386                 break;
387             case ECHO_REPLY:
388                 this.probeSent = false;
389                 break;
390             case FEATURES_REPLY:
391                 processFeaturesReply((OFFeaturesReply) msg);
392                 break;
393             case GET_CONFIG_REPLY:
394                 // make sure that the switch can send the whole packet to the
395                 // controller
396                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
397                     this.state = SwitchState.OPERATIONAL;
398                 }
399                 break;
400             case BARRIER_REPLY:
401                 processBarrierReply((OFBarrierReply) msg);
402                 break;
403             case ERROR:
404                 processErrorReply((OFError) msg);
405                 break;
406             case PORT_STATUS:
407                 processPortStatusMsg((OFPortStatus) msg);
408                 break;
409             case STATS_REPLY:
410                 processStatsReply((OFStatisticsReply) msg);
411                 break;
412             case PACKET_IN:
413                 break;
414             default:
415                 break;
416             } // end of switch
417             if (isOperational()) {
418                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
419             }
420         } // end of for
421     }
422
423     private void processPortStatusMsg(OFPortStatus msg) {
424         OFPhysicalPort port = msg.getDesc();
425         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
426             updatePhysicalPort(port);
427         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
428             updatePhysicalPort(port);
429         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
430             deletePhysicalPort(port);
431         }
432
433     }
434
435     private void startSwitchTimer() {
436         this.periodicTimer = new Timer();
437         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
438             @Override
439             public void run() {
440                 try {
441                     Long now = System.currentTimeMillis();
442                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
443                         if (probeSent) {
444                             // switch failed to respond to our probe, consider
445                             // it down
446                             logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
447                                     .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
448                                     : HexString.toHexString(sid));
449                             reportSwitchStateChange(false);
450                         } else {
451                             // send a probe to see if the switch is still alive
452                             logger.debug("Send idle probe (Echo Request) to {}", this);
453                             probeSent = true;
454                             OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
455                             asyncFastSend(echo);
456                         }
457                     } else {
458                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
459                             // send another features request
460                             OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
461                             asyncFastSend(request);
462                         } else {
463                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
464                                 // send another config request
465                                 OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
466                                 config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
467                                 asyncFastSend(config);
468                                 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
469                                 asyncFastSend(getConfig);
470                             }
471                         }
472                     }
473                 } catch (Exception e) {
474                     reportError(e);
475                 }
476             }
477         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
478     }
479
480     private void cancelSwitchTimer() {
481         if (this.periodicTimer != null) {
482             this.periodicTimer.cancel();
483         }
484     }
485
486     private void reportError(Exception e) {
487         if (!running) {
488             logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
489                     (isOperational() ? HexString.toHexString(sid) : "unknown"));
490             return;
491         }
492         logger.debug("Caught exception: ", e);
493
494         // notify core of this error event and disconnect the switch
495         ((Controller) core).takeSwitchEventError(this);
496
497         // clean up some internal states immediately
498         stopInternal();
499     }
500
501     private void reportSwitchStateChange(boolean added) {
502         if (added) {
503             ((Controller) core).takeSwitchEventAdd(this);
504         } else {
505             ((Controller) core).takeSwitchEventDelete(this);
506         }
507     }
508
509     @Override
510     public Long getId() {
511         return this.sid;
512     }
513
514     private void sendFeaturesRequest() {
515         if (!isOperational() && (this.state != SwitchState.WAIT_FEATURES_REPLY)) {
516             // send feature request
517             OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
518             asyncFastSend(featureRequest);
519             this.state = SwitchState.WAIT_FEATURES_REPLY;
520             startSwitchTimer();
521         }
522     }
523
524     private void processFeaturesReply(OFFeaturesReply reply) {
525         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
526             this.sid = reply.getDatapathId();
527             this.buffers = reply.getBuffers();
528             this.capabilities = reply.getCapabilities();
529             this.tables = reply.getTables();
530             this.actions = reply.getActions();
531             // notify core of this error event
532             for (OFPhysicalPort port : reply.getPorts()) {
533                 updatePhysicalPort(port);
534             }
535             // config the switch to send full data packet
536             OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
537             config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
538             asyncFastSend(config);
539             // send config request to make sure the switch can handle the set
540             // config
541             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
542             asyncFastSend(getConfig);
543             this.state = SwitchState.WAIT_CONFIG_REPLY;
544             // inform core that a new switch is now operational
545             reportSwitchStateChange(true);
546         }
547     }
548
549     private void updatePhysicalPort(OFPhysicalPort port) {
550         Short portNumber = port.getPortNumber();
551         physicalPorts.put(portNumber, port);
552         portBandwidth
553                 .put(portNumber,
554                         port.getCurrentFeatures()
555                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
556                                         | OFPortFeatures.OFPPF_100MB_FD.getValue()
557                                         | OFPortFeatures.OFPPF_100MB_HD.getValue()
558                                         | OFPortFeatures.OFPPF_1GB_FD.getValue()
559                                         | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
560                                             .getValue()));
561     }
562
563     private void deletePhysicalPort(OFPhysicalPort port) {
564         Short portNumber = port.getPortNumber();
565         physicalPorts.remove(portNumber);
566         portBandwidth.remove(portNumber);
567     }
568
569     @Override
570     public boolean isOperational() {
571         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
572     }
573
574     @Override
575     public String toString() {
576         try {
577             return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
578                     .toHexString(this.sid) : "unknown"));
579         } catch (Exception e) {
580             return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
581         }
582
583     }
584
585     @Override
586     public Date getConnectedDate() {
587         return this.connectedDate;
588     }
589
590     public String getInstanceName() {
591         return instanceName;
592     }
593
594     @Override
595     public Object getStatistics(OFStatisticsRequest req) {
596         int xid = getNextXid();
597         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
598         messageWaitingDone.put(xid, worker);
599         Future<Object> submit;
600         Object result = null;
601         try {
602             submit = executor.submit(worker);
603         } catch (RejectedExecutionException re) {
604             messageWaitingDone.remove(xid);
605             return result;
606         }
607         try {
608             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
609             return result;
610         } catch (Exception e) {
611             logger.warn("Timeout while waiting for {} replies from {}",
612                     req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
613             result = null; // to indicate timeout has occurred
614             worker.wakeup();
615             return result;
616         }
617     }
618
619     @Override
620     public Object syncSend(OFMessage msg) {
621         if (!running) {
622             logger.debug("Switch is going down, ignore syncSend");
623             return null;
624         }
625         int xid = getNextXid();
626         return syncSend(msg, xid);
627     }
628
629     /*
630      * Either a BarrierReply or a OFError is received. If this is a reply for an
631      * outstanding sync message, wake up associated task so that it can continue
632      */
633     private void processBarrierReply(OFBarrierReply msg) {
634         Integer xid = msg.getXid();
635         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
636         if (worker == null) {
637             return;
638         }
639         worker.wakeup();
640     }
641
642     private void processErrorReply(OFError errorMsg) {
643         OFMessage offendingMsg = errorMsg.getOffendingMsg();
644         Integer xid;
645         if (offendingMsg != null) {
646             xid = offendingMsg.getXid();
647         } else {
648             xid = errorMsg.getXid();
649         }
650         /*
651          * the error can be a reply to a synchronous message or to a statistic
652          * request message
653          */
654         Callable<?> worker = messageWaitingDone.remove(xid);
655         if (worker == null) {
656             return;
657         }
658         if (worker instanceof SynchronousMessage) {
659             ((SynchronousMessage) worker).wakeup(errorMsg);
660         } else {
661             ((StatisticsCollector) worker).wakeup(errorMsg);
662         }
663     }
664
665     private void processStatsReply(OFStatisticsReply reply) {
666         Integer xid = reply.getXid();
667         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
668         if (worker == null) {
669             return;
670         }
671         if (worker.collect(reply)) {
672             // if all the stats records are received (collect() returns true)
673             // then we are done.
674             messageWaitingDone.remove(xid);
675             worker.wakeup();
676         }
677     }
678
679     @Override
680     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
681         return this.physicalPorts;
682     }
683
684     @Override
685     public OFPhysicalPort getPhysicalPort(Short portNumber) {
686         return this.physicalPorts.get(portNumber);
687     }
688
689     @Override
690     public Integer getPortBandwidth(Short portNumber) {
691         return this.portBandwidth.get(portNumber);
692     }
693
694     @Override
695     public Set<Short> getPorts() {
696         return this.physicalPorts.keySet();
697     }
698
699     @Override
700     public Byte getTables() {
701         return this.tables;
702     }
703
704     @Override
705     public Integer getActions() {
706         return this.actions;
707     }
708
709     @Override
710     public Integer getCapabilities() {
711         return this.capabilities;
712     }
713
714     @Override
715     public Integer getBuffers() {
716         return this.buffers;
717     }
718
719     @Override
720     public boolean isPortEnabled(short portNumber) {
721         return isPortEnabled(physicalPorts.get(portNumber));
722     }
723
724     @Override
725     public boolean isPortEnabled(OFPhysicalPort port) {
726         if (port == null) {
727             return false;
728         }
729         int portConfig = port.getConfig();
730         int portState = port.getState();
731         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
732             return false;
733         }
734         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
735             return false;
736         }
737         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
738             return false;
739         }
740         return true;
741     }
742
743     @Override
744     public List<OFPhysicalPort> getEnabledPorts() {
745         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
746         synchronized (this.physicalPorts) {
747             for (OFPhysicalPort port : physicalPorts.values()) {
748                 if (isPortEnabled(port)) {
749                     result.add(port);
750                 }
751             }
752         }
753         return result;
754     }
755
756     /*
757      * Transmit thread polls the message out of the priority queue and invokes
758      * messaging service to transmit it over the socket channel
759      */
760     class PriorityMessageTransmit implements Runnable {
761         @Override
762         public void run() {
763             running = true;
764             while (running) {
765                 try {
766                     PriorityMessage pmsg = transmitQ.take();
767                     msgReadWriteService.asyncSend(pmsg.msg);
768                     /*
769                      * If syncReply is set to true, wait for the response back.
770                      */
771                     if (pmsg.syncReply) {
772                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
773                     }
774                 } catch (InterruptedException ie) {
775                     reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
776                 } catch (Exception e) {
777                     reportError(e);
778                 }
779             }
780             transmitQ = null;
781         }
782     }
783
784     /*
785      * Setup and start the transmit thread
786      */
787     private void startTransmitThread() {
788         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
789             @Override
790             public int compare(PriorityMessage p1, PriorityMessage p2) {
791                 if (p2.priority != p1.priority) {
792                     return p2.priority - p1.priority;
793                 } else {
794                     return (p2.seqNum < p1.seqNum) ? 1 : -1;
795                 }
796             }
797         });
798         this.transmitThread = new Thread(new PriorityMessageTransmit());
799         this.transmitThread.start();
800     }
801
802     /*
803      * Setup communication services
804      */
805     private void setupCommChannel() throws Exception {
806         this.selector = SelectorProvider.provider().openSelector();
807         this.socket.configureBlocking(false);
808         this.socket.socket().setTcpNoDelay(true);
809         this.msgReadWriteService = getMessageReadWriteService();
810     }
811
812     private void sendFirstHello() {
813         try {
814             OFMessage msg = factory.getMessage(OFType.HELLO);
815             asyncFastSend(msg);
816         } catch (Exception e) {
817             reportError(e);
818         }
819     }
820
821     private IMessageReadWrite getMessageReadWriteService() throws Exception {
822         String str = System.getProperty("secureChannelEnabled");
823         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
824                 selector) : new MessageReadWriteService(socket, selector);
825     }
826
827     /**
828      * Send Barrier message synchronously. The caller will be blocked until the
829      * Barrier reply is received.
830      */
831     @Override
832     public Object syncSendBarrierMessage() {
833         OFBarrierRequest barrierMsg = new OFBarrierRequest();
834         return syncSend(barrierMsg);
835     }
836
837     /**
838      * Send Barrier message asynchronously. The caller is not blocked. The
839      * Barrier message will be sent in a transmit thread which will be blocked
840      * until the Barrier reply is received.
841      */
842     @Override
843     public Object asyncSendBarrierMessage() {
844         if (transmitQ == null) {
845             return Boolean.FALSE;
846         }
847
848         OFBarrierRequest barrierMsg = new OFBarrierRequest();
849         int xid = getNextXid();
850
851         barrierMsg.setXid(xid);
852         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
853
854         return Boolean.TRUE;
855     }
856
857     /**
858      * This method returns the switch liveness timeout value. If controller did
859      * not receive any message from the switch for such a long period,
860      * controller will tear down the connection to the switch.
861      *
862      * @return The timeout value
863      */
864     private static int getSwitchLivenessTimeout() {
865         String timeout = System.getProperty("of.switchLivenessTimeout");
866         int rv = 60500;
867
868         try {
869             if (timeout != null) {
870                 rv = Integer.parseInt(timeout);
871             }
872         } catch (Exception e) {
873         }
874
875         return rv;
876     }
877
878     /**
879      * This method performs synchronous operations for a given message. If
880      * syncRequest is set to true, the message will be sent out followed by a
881      * Barrier request message. Then it's blocked until the Barrier rely arrives
882      * or timeout. If syncRequest is false, it simply skips the message send and
883      * just waits for the response back.
884      *
885      * @param msg
886      *            Message to be sent
887      * @param xid
888      *            Message XID
889      * @param request
890      *            If set to true, the message the message will be sent out
891      *            followed by a Barrier request message. If set to false, it
892      *            simply skips the sending and just waits for the Barrier reply.
893      * @return the result
894      */
895     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
896         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
897         messageWaitingDone.put(xid, worker);
898         Object result = null;
899         Boolean status = false;
900         Future<Object> submit;
901         try {
902            submit = executor.submit(worker);
903         } catch (RejectedExecutionException re) {
904             messageWaitingDone.remove(xid);
905             return result;
906         }
907         try {
908             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
909             messageWaitingDone.remove(xid);
910             if (result == null) {
911                 // if result is null, then it means the switch can handle this
912                 // message successfully
913                 // convert the result into a Boolean with value true
914                 status = true;
915                 // logger.debug("Successfully send " +
916                 // msg.getType().toString());
917                 result = status;
918             } else {
919                 // if result is not null, this means the switch can't handle
920                 // this message
921                 // the result if OFError already
922                 if (logger.isDebugEnabled()) {
923                     logger.debug("Send {} failed --> {}", msg.getType(), (result));
924                 }
925             }
926             return result;
927         } catch (Exception e) {
928             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
929             // convert the result into a Boolean with value false
930             status = false;
931             result = status;
932             worker.wakeup();
933             return result;
934         }
935     }
936
937     @Override
938     public void deleteAllFlows() {
939         logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
940         OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
941         OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
942         flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
943                 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
944         asyncFastSend(flowMod);
945     }
946 }