Merge "BUG-2329 Add test for anyxmls inside rpc resonse for netcfon-connector"
[controller.git] / opendaylight / adsal / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / SwitchHandler.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.nio.channels.SelectionKey;
12 import java.nio.channels.Selector;
13 import java.nio.channels.SocketChannel;
14 import java.nio.channels.spi.SelectorProvider;
15 import java.util.ArrayList;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.HashMap;
19 import java.util.Iterator;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Set;
23 import java.util.Timer;
24 import java.util.TimerTask;
25 import java.util.concurrent.Callable;
26 import java.util.concurrent.ConcurrentHashMap;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.Future;
30 import java.util.concurrent.PriorityBlockingQueue;
31 import java.util.concurrent.RejectedExecutionException;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.atomic.AtomicInteger;
34
35 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
36 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageReadWrite;
37 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
38 import org.openflow.protocol.OFBarrierReply;
39 import org.openflow.protocol.OFBarrierRequest;
40 import org.openflow.protocol.OFEchoReply;
41 import org.openflow.protocol.OFEchoRequest;
42 import org.openflow.protocol.OFError;
43 import org.openflow.protocol.OFFeaturesReply;
44 import org.openflow.protocol.OFFlowMod;
45 import org.openflow.protocol.OFGetConfigReply;
46 import org.openflow.protocol.OFMatch;
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFPhysicalPort;
49 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
50 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
51 import org.openflow.protocol.OFPhysicalPort.OFPortState;
52 import org.openflow.protocol.OFPort;
53 import org.openflow.protocol.OFPortStatus;
54 import org.openflow.protocol.OFPortStatus.OFPortReason;
55 import org.openflow.protocol.OFSetConfig;
56 import org.openflow.protocol.OFStatisticsReply;
57 import org.openflow.protocol.OFStatisticsRequest;
58 import org.openflow.protocol.OFType;
59 import org.openflow.protocol.factory.BasicFactory;
60 import org.openflow.util.HexString;
61 import org.slf4j.Logger;
62 import org.slf4j.LoggerFactory;
63
64
65 public class SwitchHandler implements ISwitch {
66     private static final Logger logger = LoggerFactory.getLogger(SwitchHandler.class);
67     private static final int SWITCH_LIVENESS_TIMER = 5000;
68     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
69     private final int MESSAGE_RESPONSE_TIMER = 2000;
70
71     private final String instanceName;
72     private final ISwitch thisISwitch;
73     private final IController core;
74     private Long sid;
75     private Integer buffers;
76     private Integer capabilities;
77     private Byte tables;
78     private Integer actions;
79     private Selector selector;
80     private final SocketChannel socket;
81     private final BasicFactory factory;
82     private final AtomicInteger xid;
83     private SwitchState state;
84     private Timer periodicTimer;
85     private final Map<Short, OFPhysicalPort> physicalPorts;
86     private final Map<Short, Integer> portBandwidth;
87     private final Date connectedDate;
88     private Long lastMsgReceivedTimeStamp;
89     private Boolean probeSent;
90     private final ExecutorService executor;
91     private final ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
92     private boolean running;
93     private IMessageReadWrite msgReadWriteService;
94     private Thread switchHandlerThread;
95     private Integer responseTimerValue;
96     private PriorityBlockingQueue<PriorityMessage> transmitQ;
97     private Thread transmitThread;
98
99     private enum SwitchState {
100         NON_OPERATIONAL(0), WAIT_FEATURES_REPLY(1), WAIT_CONFIG_REPLY(2), OPERATIONAL(3);
101
102         private int value;
103
104         private SwitchState(int value) {
105             this.value = value;
106         }
107
108         @SuppressWarnings("unused")
109         public int value() {
110             return this.value;
111         }
112     }
113
114     public SwitchHandler(Controller core, SocketChannel sc, String name) {
115         this.instanceName = name;
116         this.thisISwitch = this;
117         this.sid = (long) 0;
118         this.buffers = (int) 0;
119         this.capabilities = (int) 0;
120         this.tables = (byte) 0;
121         this.actions = (int) 0;
122         this.core = core;
123         this.socket = sc;
124         this.factory = new BasicFactory();
125         this.connectedDate = new Date();
126         this.lastMsgReceivedTimeStamp = connectedDate.getTime();
127         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
128         this.portBandwidth = new HashMap<Short, Integer>();
129         this.state = SwitchState.NON_OPERATIONAL;
130         this.probeSent = false;
131         this.xid = new AtomicInteger(this.socket.hashCode());
132         this.periodicTimer = null;
133         this.executor = Executors.newFixedThreadPool(4);
134         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
135         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
136         String rTimer = System.getProperty("of.messageResponseTimer");
137         if (rTimer != null) {
138             try {
139                 responseTimerValue = Integer.decode(rTimer);
140             } catch (NumberFormatException e) {
141                 logger.warn("Invalid of.messageResponseTimer: {} use default({})", rTimer, MESSAGE_RESPONSE_TIMER);
142             }
143         }
144     }
145
146     public void start() {
147         try {
148             startTransmitThread();
149             setupCommChannel();
150             sendFirstHello();
151             startHandlerThread();
152         } catch (Exception e) {
153             reportError(e);
154         }
155     }
156
157     private void startHandlerThread() {
158         switchHandlerThread = new Thread(new Runnable() {
159             @Override
160             public void run() {
161                 running = true;
162                 while (running) {
163                     try {
164                         // wait for an incoming connection
165                         selector.select(0);
166                         Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
167                         while (selectedKeys.hasNext()) {
168                             SelectionKey skey = selectedKeys.next();
169                             selectedKeys.remove();
170                             if (skey.isValid() && skey.isWritable()) {
171                                 resumeSend();
172                             }
173                             if (skey.isValid() && skey.isReadable()) {
174                                 handleMessages();
175                             }
176                         }
177                     } catch (Exception e) {
178                         reportError(e);
179                     }
180                 }
181             }
182         }, instanceName);
183         switchHandlerThread.start();
184     }
185
186     private void stopInternal() {
187         logger.debug("{} receives stop signal",
188                 (isOperational() ? HexString.toHexString(sid) : "unknown"));
189         running = false;
190         cancelSwitchTimer();
191         try {
192             selector.wakeup();
193             selector.close();
194         } catch (Exception e) {
195         }
196         try {
197             socket.close();
198         } catch (Exception e) {
199         }
200         try {
201             msgReadWriteService.stop();
202         } catch (Exception e) {
203         }
204         logger.debug("executor shutdown now");
205         executor.shutdownNow();
206
207         msgReadWriteService = null;
208     }
209
210     public void stop() {
211         stopInternal();
212
213         if (switchHandlerThread != null) {
214             switchHandlerThread.interrupt();
215         }
216         if (transmitThread != null) {
217             transmitThread.interrupt();
218         }
219     }
220
221     @Override
222     public int getNextXid() {
223         return this.xid.incrementAndGet();
224     }
225
226     /**
227      * This method puts the message in an outgoing priority queue with normal
228      * priority. It will be served after high priority messages. The method
229      * should be used for non-critical messages such as statistics request,
230      * discovery packets, etc. An unique XID is generated automatically and
231      * inserted into the message.
232      *
233      * @param msg
234      *            The OF message to be sent
235      * @return The XID used
236      */
237     @Override
238     public Integer asyncSend(OFMessage msg) {
239         return asyncSend(msg, getNextXid());
240     }
241
242     private Object syncSend(OFMessage msg, int xid) {
243         return syncMessageInternal(msg, xid, true);
244     }
245
246     /**
247      * This method puts the message in an outgoing priority queue with normal
248      * priority. It will be served after high priority messages. The method
249      * should be used for non-critical messages such as statistics request,
250      * discovery packets, etc. The specified XID is inserted into the message.
251      *
252      * @param msg
253      *            The OF message to be Sent
254      * @param xid
255      *            The XID to be used in the message
256      * @return The XID used
257      */
258     @Override
259     public Integer asyncSend(OFMessage msg, int xid) {
260         msg.setXid(xid);
261         if (transmitQ != null) {
262             transmitQ.add(new PriorityMessage(msg, 0));
263         }
264         return xid;
265     }
266
267     /**
268      * This method puts the message in an outgoing priority queue with high
269      * priority. It will be served first before normal priority messages. The
270      * method should be used for critical messages such as hello, echo reply
271      * etc. An unique XID is generated automatically and inserted into the
272      * message.
273      *
274      * @param msg
275      *            The OF message to be sent
276      * @return The XID used
277      */
278     @Override
279     public Integer asyncFastSend(OFMessage msg) {
280         return asyncFastSend(msg, getNextXid());
281     }
282
283     /**
284      * This method puts the message in an outgoing priority queue with high
285      * priority. It will be served first before normal priority messages. The
286      * method should be used for critical messages such as hello, echo reply
287      * etc. The specified XID is inserted into the message.
288      *
289      * @param msg
290      *            The OF message to be sent
291      * @return The XID used
292      */
293     @Override
294     public Integer asyncFastSend(OFMessage msg, int xid) {
295         msg.setXid(xid);
296         if (transmitQ != null) {
297             transmitQ.add(new PriorityMessage(msg, 1));
298         }
299         return xid;
300     }
301
302     public void resumeSend() {
303         try {
304             if (msgReadWriteService != null) {
305                 msgReadWriteService.resumeSend();
306             }
307         } catch (Exception e) {
308             reportError(e);
309         }
310     }
311
312     /**
313      * This method bypasses the transmit queue and sends the message over the
314      * socket directly. If the input xid is not null, the specified xid is
315      * inserted into the message. Otherwise, an unique xid is generated
316      * automatically and inserted into the message.
317      *
318      * @param msg
319      *            Message to be sent
320      * @param xid
321      *            Message xid
322      */
323     private void asyncSendNow(OFMessage msg, Integer xid) {
324         if (xid == null) {
325             xid = getNextXid();
326         }
327         msg.setXid(xid);
328
329         asyncSendNow(msg);
330     }
331
332     /**
333      * This method bypasses the transmit queue and sends the message over the
334      * socket directly.
335      *
336      * @param msg
337      *            Message to be sent
338      */
339     private void asyncSendNow(OFMessage msg) {
340         if (msgReadWriteService == null) {
341             logger.warn("asyncSendNow: {} is not sent because Message ReadWrite Service is not available.", msg);
342             return;
343         }
344
345         try {
346             msgReadWriteService.asyncSend(msg);
347         } catch (Exception e) {
348             reportError(e);
349         }
350     }
351
352     public void handleMessages() {
353         List<OFMessage> msgs = null;
354
355         try {
356             if (msgReadWriteService != null) {
357                 msgs = msgReadWriteService.readMessages();
358             }
359         } catch (Exception e) {
360             reportError(e);
361         }
362
363         if (msgs == null) {
364             return;
365         }
366         for (OFMessage msg : msgs) {
367             logger.trace("Message received: {}", msg);
368             this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
369             OFType type = msg.getType();
370             switch (type) {
371             case HELLO:
372                 sendFeaturesRequest();
373                 break;
374             case ECHO_REQUEST:
375                 OFEchoReply echoReply = (OFEchoReply) factory.getMessage(OFType.ECHO_REPLY);
376
377                 byte []payload = ((OFEchoRequest)msg).getPayload();
378                 if (payload != null && payload.length != 0 ) {
379                     // the response must have the same payload as the request
380                     echoReply.setPayload(payload);
381                     echoReply.setLength( (short)(echoReply.getLength() + payload.length) );
382                 }
383
384                 // respond immediately
385                 asyncSendNow(echoReply, msg.getXid());
386
387                 // send features request if not sent yet
388                 sendFeaturesRequest();
389                 break;
390             case ECHO_REPLY:
391                 this.probeSent = false;
392                 break;
393             case FEATURES_REPLY:
394                 processFeaturesReply((OFFeaturesReply) msg);
395                 break;
396             case GET_CONFIG_REPLY:
397                 // make sure that the switch can send the whole packet to the
398                 // controller
399                 if (((OFGetConfigReply) msg).getMissSendLength() == (short) 0xffff) {
400                     this.state = SwitchState.OPERATIONAL;
401                 }
402                 break;
403             case BARRIER_REPLY:
404                 processBarrierReply((OFBarrierReply) msg);
405                 break;
406             case ERROR:
407                 processErrorReply((OFError) msg);
408                 break;
409             case PORT_STATUS:
410                 processPortStatusMsg((OFPortStatus) msg);
411                 break;
412             case STATS_REPLY:
413                 processStatsReply((OFStatisticsReply) msg);
414                 break;
415             case PACKET_IN:
416                 break;
417             default:
418                 break;
419             } // end of switch
420             if (isOperational()) {
421                 ((Controller) core).takeSwitchEventMsg(thisISwitch, msg);
422             }
423         } // end of for
424     }
425
426     private void processPortStatusMsg(OFPortStatus msg) {
427         OFPhysicalPort port = msg.getDesc();
428         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
429             updatePhysicalPort(port);
430         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
431             updatePhysicalPort(port);
432         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE.ordinal()) {
433             deletePhysicalPort(port);
434         }
435
436     }
437
438     private void startSwitchTimer() {
439         this.periodicTimer = new Timer();
440         this.periodicTimer.scheduleAtFixedRate(new TimerTask() {
441             @Override
442             public void run() {
443                 try {
444                     Long now = System.currentTimeMillis();
445                     if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
446                         if (probeSent) {
447                             // switch failed to respond to our probe, consider
448                             // it down
449                             logger.warn("{} sid {} is idle for too long, disconnect", socket.socket()
450                                     .getRemoteSocketAddress().toString().split("/")[1], (sid == 0) ? "unknown"
451                                     : HexString.toHexString(sid));
452                             reportSwitchStateChange(false);
453                         } else {
454                             // send a probe to see if the switch is still alive
455                             logger.debug("Send idle probe (Echo Request) to {}", this);
456                             probeSent = true;
457                             OFMessage echo = factory.getMessage(OFType.ECHO_REQUEST);
458                             asyncFastSend(echo);
459                         }
460                     } else {
461                         if (state == SwitchState.WAIT_FEATURES_REPLY) {
462                             // send another features request
463                             OFMessage request = factory.getMessage(OFType.FEATURES_REQUEST);
464                             asyncFastSend(request);
465                         } else {
466                             if (state == SwitchState.WAIT_CONFIG_REPLY) {
467                                 // send another config request
468                                 OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
469                                 config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
470                                 asyncFastSend(config);
471                                 OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
472                                 asyncFastSend(getConfig);
473                             }
474                         }
475                     }
476                 } catch (Exception e) {
477                     reportError(e);
478                 }
479             }
480         }, SWITCH_LIVENESS_TIMER, SWITCH_LIVENESS_TIMER);
481     }
482
483     private void cancelSwitchTimer() {
484         if (this.periodicTimer != null) {
485             this.periodicTimer.cancel();
486         }
487     }
488
489     private void reportError(Exception e) {
490         if (!running) {
491             logger.debug("Caught exception {} while switch {} is shutting down. Skip", e.getMessage(),
492                     (isOperational() ? HexString.toHexString(sid) : "unknown"));
493             return;
494         }
495         logger.debug("Caught exception: ", e);
496
497         // notify core of this error event and disconnect the switch
498         ((Controller) core).takeSwitchEventError(this);
499
500         // clean up some internal states immediately
501         stopInternal();
502     }
503
504     private void reportSwitchStateChange(boolean added) {
505         if (added) {
506             ((Controller) core).takeSwitchEventAdd(this);
507         } else {
508             ((Controller) core).takeSwitchEventDelete(this);
509         }
510     }
511
512     @Override
513     public Long getId() {
514         return this.sid;
515     }
516
517     private void sendFeaturesRequest() {
518         if (!isOperational() && (this.state != SwitchState.WAIT_FEATURES_REPLY)) {
519             // send feature request
520             OFMessage featureRequest = factory.getMessage(OFType.FEATURES_REQUEST);
521             asyncFastSend(featureRequest);
522             this.state = SwitchState.WAIT_FEATURES_REPLY;
523             startSwitchTimer();
524         }
525     }
526
527     private void processFeaturesReply(OFFeaturesReply reply) {
528         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
529             this.sid = reply.getDatapathId();
530             this.buffers = reply.getBuffers();
531             this.capabilities = reply.getCapabilities();
532             this.tables = reply.getTables();
533             this.actions = reply.getActions();
534             // notify core of this error event
535             for (OFPhysicalPort port : reply.getPorts()) {
536                 updatePhysicalPort(port);
537             }
538             // config the switch to send full data packet
539             OFSetConfig config = (OFSetConfig) factory.getMessage(OFType.SET_CONFIG);
540             config.setMissSendLength((short) 0xffff).setLengthU(OFSetConfig.MINIMUM_LENGTH);
541             asyncFastSend(config);
542             // send config request to make sure the switch can handle the set
543             // config
544             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
545             asyncFastSend(getConfig);
546             this.state = SwitchState.WAIT_CONFIG_REPLY;
547             // inform core that a new switch is now operational
548             reportSwitchStateChange(true);
549         }
550     }
551
552     private void updatePhysicalPort(OFPhysicalPort port) {
553         Short portNumber = port.getPortNumber();
554         physicalPorts.put(portNumber, port);
555         portBandwidth
556                 .put(portNumber,
557                         port.getCurrentFeatures()
558                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue() | OFPortFeatures.OFPPF_10MB_HD.getValue()
559                                         | OFPortFeatures.OFPPF_100MB_FD.getValue()
560                                         | OFPortFeatures.OFPPF_100MB_HD.getValue()
561                                         | OFPortFeatures.OFPPF_1GB_FD.getValue()
562                                         | OFPortFeatures.OFPPF_1GB_HD.getValue() | OFPortFeatures.OFPPF_10GB_FD
563                                             .getValue()));
564     }
565
566     private void deletePhysicalPort(OFPhysicalPort port) {
567         Short portNumber = port.getPortNumber();
568         physicalPorts.remove(portNumber);
569         portBandwidth.remove(portNumber);
570     }
571
572     @Override
573     public boolean isOperational() {
574         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
575     }
576
577     @Override
578     public String toString() {
579         try {
580             return ("Switch:" + socket.socket().getRemoteSocketAddress().toString().split("/")[1] + " SWID:" + (isOperational() ? HexString
581                     .toHexString(this.sid) : "unknown"));
582         } catch (Exception e) {
583             return (isOperational() ? HexString.toHexString(this.sid) : "unknown");
584         }
585
586     }
587
588     @Override
589     public Date getConnectedDate() {
590         return this.connectedDate;
591     }
592
593     public String getInstanceName() {
594         return instanceName;
595     }
596
597     @Override
598     public Object getStatistics(OFStatisticsRequest req) {
599         int xid = getNextXid();
600         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
601         messageWaitingDone.put(xid, worker);
602         Future<Object> submit;
603         Object result = null;
604         try {
605             submit = executor.submit(worker);
606         } catch (RejectedExecutionException re) {
607             messageWaitingDone.remove(xid);
608             return result;
609         }
610         try {
611             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
612             return result;
613         } catch (Exception e) {
614             logger.warn("Timeout while waiting for {} replies from {}",
615                     req.getType(), (isOperational() ? HexString.toHexString(sid) : "unknown"));
616             result = null; // to indicate timeout has occurred
617             worker.wakeup();
618             return result;
619         }
620     }
621
622     @Override
623     public Object syncSend(OFMessage msg) {
624         if (!running) {
625             logger.debug("Switch is going down, ignore syncSend");
626             return null;
627         }
628         int xid = getNextXid();
629         return syncSend(msg, xid);
630     }
631
632     /*
633      * Either a BarrierReply or a OFError is received. If this is a reply for an
634      * outstanding sync message, wake up associated task so that it can continue
635      */
636     private void processBarrierReply(OFBarrierReply msg) {
637         Integer xid = msg.getXid();
638         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone.remove(xid);
639         if (worker == null) {
640             return;
641         }
642         worker.wakeup();
643     }
644
645     private void processErrorReply(OFError errorMsg) {
646         OFMessage offendingMsg = errorMsg.getOffendingMsg();
647         Integer xid;
648         if (offendingMsg != null) {
649             xid = offendingMsg.getXid();
650         } else {
651             xid = errorMsg.getXid();
652         }
653         /*
654          * the error can be a reply to a synchronous message or to a statistic
655          * request message
656          */
657         Callable<?> worker = messageWaitingDone.remove(xid);
658         if (worker == null) {
659             return;
660         }
661         if (worker instanceof SynchronousMessage) {
662             ((SynchronousMessage) worker).wakeup(errorMsg);
663         } else {
664             ((StatisticsCollector) worker).wakeup(errorMsg);
665         }
666     }
667
668     private void processStatsReply(OFStatisticsReply reply) {
669         Integer xid = reply.getXid();
670         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone.get(xid);
671         if (worker == null) {
672             return;
673         }
674         if (worker.collect(reply)) {
675             // if all the stats records are received (collect() returns true)
676             // then we are done.
677             messageWaitingDone.remove(xid);
678             worker.wakeup();
679         }
680     }
681
682     @Override
683     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
684         return this.physicalPorts;
685     }
686
687     @Override
688     public OFPhysicalPort getPhysicalPort(Short portNumber) {
689         return this.physicalPorts.get(portNumber);
690     }
691
692     @Override
693     public Integer getPortBandwidth(Short portNumber) {
694         return this.portBandwidth.get(portNumber);
695     }
696
697     @Override
698     public Set<Short> getPorts() {
699         return this.physicalPorts.keySet();
700     }
701
702     @Override
703     public Byte getTables() {
704         return this.tables;
705     }
706
707     @Override
708     public Integer getActions() {
709         return this.actions;
710     }
711
712     @Override
713     public Integer getCapabilities() {
714         return this.capabilities;
715     }
716
717     @Override
718     public Integer getBuffers() {
719         return this.buffers;
720     }
721
722     @Override
723     public boolean isPortEnabled(short portNumber) {
724         return isPortEnabled(physicalPorts.get(portNumber));
725     }
726
727     @Override
728     public boolean isPortEnabled(OFPhysicalPort port) {
729         if (port == null) {
730             return false;
731         }
732         int portConfig = port.getConfig();
733         int portState = port.getState();
734         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
735             return false;
736         }
737         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
738             return false;
739         }
740         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK.getValue()) {
741             return false;
742         }
743         return true;
744     }
745
746     @Override
747     public List<OFPhysicalPort> getEnabledPorts() {
748         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
749         synchronized (this.physicalPorts) {
750             for (OFPhysicalPort port : physicalPorts.values()) {
751                 if (isPortEnabled(port)) {
752                     result.add(port);
753                 }
754             }
755         }
756         return result;
757     }
758
759     /*
760      * Transmit thread polls the message out of the priority queue and invokes
761      * messaging service to transmit it over the socket channel
762      */
763     class PriorityMessageTransmit implements Runnable {
764         @Override
765         public void run() {
766             running = true;
767             while (running) {
768                 try {
769                     PriorityMessage pmsg = transmitQ.take();
770                     msgReadWriteService.asyncSend(pmsg.msg);
771                     /*
772                      * If syncReply is set to true, wait for the response back.
773                      */
774                     if (pmsg.syncReply) {
775                         syncMessageInternal(pmsg.msg, pmsg.msg.getXid(), false);
776                     }
777                 } catch (InterruptedException ie) {
778                     reportError(new InterruptedException("PriorityMessageTransmit thread interrupted"));
779                 } catch (Exception e) {
780                     reportError(e);
781                 }
782             }
783             transmitQ = null;
784         }
785     }
786
787     /*
788      * Setup and start the transmit thread
789      */
790     private void startTransmitThread() {
791         this.transmitQ = new PriorityBlockingQueue<PriorityMessage>(11, new Comparator<PriorityMessage>() {
792             @Override
793             public int compare(PriorityMessage p1, PriorityMessage p2) {
794                 if (p2.priority != p1.priority) {
795                     return p2.priority - p1.priority;
796                 } else {
797                     return (p2.seqNum < p1.seqNum) ? 1 : -1;
798                 }
799             }
800         });
801         this.transmitThread = new Thread(new PriorityMessageTransmit());
802         this.transmitThread.start();
803     }
804
805     /*
806      * Setup communication services
807      */
808     private void setupCommChannel() throws Exception {
809         this.selector = SelectorProvider.provider().openSelector();
810         this.socket.configureBlocking(false);
811         this.socket.socket().setTcpNoDelay(true);
812         this.msgReadWriteService = getMessageReadWriteService();
813     }
814
815     private void sendFirstHello() {
816         try {
817             OFMessage msg = factory.getMessage(OFType.HELLO);
818             asyncFastSend(msg);
819         } catch (Exception e) {
820             reportError(e);
821         }
822     }
823
824     private IMessageReadWrite getMessageReadWriteService() throws Exception {
825         String str = System.getProperty("secureChannelEnabled");
826         return ((str != null) && (str.trim().equalsIgnoreCase("true"))) ? new SecureMessageReadWriteService(socket,
827                 selector) : new MessageReadWriteService(socket, selector);
828     }
829
830     /**
831      * Send Barrier message synchronously. The caller will be blocked until the
832      * Barrier reply is received.
833      */
834     @Override
835     public Object syncSendBarrierMessage() {
836         OFBarrierRequest barrierMsg = new OFBarrierRequest();
837         return syncSend(barrierMsg);
838     }
839
840     /**
841      * Send Barrier message asynchronously. The caller is not blocked. The
842      * Barrier message will be sent in a transmit thread which will be blocked
843      * until the Barrier reply is received.
844      */
845     @Override
846     public Object asyncSendBarrierMessage() {
847         if (transmitQ == null) {
848             return Boolean.FALSE;
849         }
850
851         OFBarrierRequest barrierMsg = new OFBarrierRequest();
852         int xid = getNextXid();
853
854         barrierMsg.setXid(xid);
855         transmitQ.add(new PriorityMessage(barrierMsg, 0, true));
856
857         return Boolean.TRUE;
858     }
859
860     /**
861      * This method returns the switch liveness timeout value. If controller did
862      * not receive any message from the switch for such a long period,
863      * controller will tear down the connection to the switch.
864      *
865      * @return The timeout value
866      */
867     private static int getSwitchLivenessTimeout() {
868         String timeout = System.getProperty("of.switchLivenessTimeout");
869         int rv = 60500;
870
871         try {
872             if (timeout != null) {
873                 rv = Integer.parseInt(timeout);
874             }
875         } catch (Exception e) {
876         }
877
878         return rv;
879     }
880
881     /**
882      * This method performs synchronous operations for a given message. If
883      * syncRequest is set to true, the message will be sent out followed by a
884      * Barrier request message. Then it's blocked until the Barrier rely arrives
885      * or timeout. If syncRequest is false, it simply skips the message send and
886      * just waits for the response back.
887      *
888      * @param msg
889      *            Message to be sent
890      * @param xid
891      *            Message XID
892      * @param request
893      *            If set to true, the message the message will be sent out
894      *            followed by a Barrier request message. If set to false, it
895      *            simply skips the sending and just waits for the Barrier reply.
896      * @return the result
897      */
898     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
899         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
900         messageWaitingDone.put(xid, worker);
901         Object result = null;
902         Boolean status = false;
903         Future<Object> submit;
904         try {
905            submit = executor.submit(worker);
906         } catch (RejectedExecutionException re) {
907             messageWaitingDone.remove(xid);
908             return result;
909         }
910         try {
911             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
912             messageWaitingDone.remove(xid);
913             if (result == null) {
914                 // if result is null, then it means the switch can handle this
915                 // message successfully
916                 // convert the result into a Boolean with value true
917                 status = true;
918                 // logger.debug("Successfully send " +
919                 // msg.getType().toString());
920                 result = status;
921             } else {
922                 // if result is not null, this means the switch can't handle
923                 // this message
924                 // the result if OFError already
925                 if (logger.isDebugEnabled()) {
926                     logger.debug("Send {} failed --> {}", msg.getType(), (result));
927                 }
928             }
929             return result;
930         } catch (Exception e) {
931             logger.warn("Timeout while waiting for {} reply", msg.getType().toString());
932             // convert the result into a Boolean with value false
933             status = false;
934             result = status;
935             worker.wakeup();
936             return result;
937         }
938     }
939
940     @Override
941     public void deleteAllFlows() {
942         logger.trace("deleteAllFlows on switch {}", HexString.toHexString(this.sid));
943         OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
944         OFFlowMod flowMod = (OFFlowMod) factory.getMessage(OFType.FLOW_MOD);
945         flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE).setOutPort(OFPort.OFPP_NONE)
946                 .setLength((short) OFFlowMod.MINIMUM_LENGTH);
947         asyncFastSend(flowMod);
948     }
949 }