b84b3ac8e1beae098682f02e64c554812499a80e
[controller.git] / opendaylight / protocol_plugins / openflow_netty / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / EnhancedSwitchHandler.java
1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
2
3 import java.io.IOException;
4 import java.net.SocketAddress;
5 import java.net.SocketException;
6 import java.nio.channels.AsynchronousCloseException;
7 import java.nio.channels.ClosedSelectorException;
8 import java.util.ArrayList;
9 import java.util.Date;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Set;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.util.HashedWheelTimer;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.TimerTask;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
26 import org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchEvent.SwitchEventType;
27 import org.openflow.protocol.OFBarrierReply;
28 import org.openflow.protocol.OFBarrierRequest;
29 import org.openflow.protocol.OFEchoReply;
30 import org.openflow.protocol.OFError;
31 import org.openflow.protocol.OFFeaturesReply;
32 import org.openflow.protocol.OFFlowMod;
33 import org.openflow.protocol.OFGetConfigReply;
34 import org.openflow.protocol.OFMatch;
35 import org.openflow.protocol.OFMessage;
36 import org.openflow.protocol.OFPhysicalPort;
37 import org.openflow.protocol.OFPort;
38 import org.openflow.protocol.OFPortStatus;
39 import org.openflow.protocol.OFSetConfig;
40 import org.openflow.protocol.OFStatisticsReply;
41 import org.openflow.protocol.OFStatisticsRequest;
42 import org.openflow.protocol.OFType;
43 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
44 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
45 import org.openflow.protocol.OFPhysicalPort.OFPortState;
46 import org.openflow.protocol.OFPortStatus.OFPortReason;
47 import org.openflow.protocol.factory.BasicFactory;
48 import org.openflow.protocol.factory.MessageParseException;
49 import org.slf4j.Logger;
50 import org.slf4j.LoggerFactory;
51
52 public class EnhancedSwitchHandler implements ISwitch {
53
54
55     private static final Logger logger = LoggerFactory
56             .getLogger(EnhancedSwitchHandler.class);
57     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
58     private int MESSAGE_RESPONSE_TIMER = 2000;
59
60     private EnhancedController controller = null;
61     private Integer switchChannelID = null;
62     private Channel channel;
63     private long lastMsgReceivedTimeStamp = 0;
64     private SwitchState state = null;
65     private BasicFactory factory = null;
66     private HashedWheelTimer timer = null;
67     private SwitchLivelinessTimerTask switchLivelinessTask = null;
68     private Timeout switchLivelinessTaskHandle = null;
69     private long sid;
70     private AtomicInteger xid;
71     private int buffers;
72     private int capabilities;
73     private byte tables;
74     private int actions;
75     private Map<Short, OFPhysicalPort> physicalPorts;
76     private Map<Short, Integer> portBandwidth;
77     private Date connectedDate;
78     private ExecutorService executor = null;
79     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
80     private Integer responseTimerValue;
81     private TrafficStatisticsHandler trafficStatsHandler = null;
82     private static final boolean START_LIVELINESS_TIMER = false;
83
84     private static final int BATCH_COUNT_FOR_FLUSHING = 3;
85     private int flushBatchTrack = 0;
86
87     /*
88     private List<OFMessage> msgBuffer = new ArrayList<OFMessage>();
89     private int bufferTrack = 0;
90     private static final int BATCH_BUFFER_THRESHOLD = 100;
91     */
92
93
94     // PLEASE .. IF THERE IS SOMETHING CALLED GOD, HELP ME GET THE THROUGHPUT WITH THIS !!
95     private List<OFMessage> flushableMsgBuffer = new ArrayList<OFMessage>();
96
97
98     public enum SwitchState {
99         NON_OPERATIONAL(0),
100         WAIT_FEATURES_REPLY(1),
101         WAIT_CONFIG_REPLY(2),
102         OPERATIONAL(3);
103
104         private int value;
105
106         private SwitchState(int value) {
107             this.value = value;
108         }
109
110         @SuppressWarnings("unused")
111         public int value() {
112             return this.value;
113         }
114     }
115
116
117     public EnhancedSwitchHandler(EnhancedController controller,
118             Integer switchConnectionChannelID,
119             Channel channel,
120             HashedWheelTimer timer,
121             ExecutorService executor,
122             TrafficStatisticsHandler tHandler){
123
124         this.controller = controller;
125         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
126         this.portBandwidth = new HashMap<Short, Integer>();
127         this.switchChannelID = switchConnectionChannelID;
128         this.timer = timer;
129         this.sid = (long) 0;
130         this.tables = (byte) 0;
131         this.actions = (int) 0;
132         this.capabilities = (int) 0;
133         this.buffers = (int) 0;
134         this.connectedDate = new Date();
135         this.state = SwitchState.NON_OPERATIONAL;
136         this.executor = executor;
137         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
138         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
139         this.channel = channel;
140         this.xid = new AtomicInteger(this.channel.hashCode());
141         this.trafficStatsHandler = tHandler;
142
143     }
144
145
146     public void startHandler(){
147         this.factory = new BasicFactory();
148         start();
149
150     }
151
152
153     public void shutDownHandler(){
154         stop();
155
156     }
157
158
159     public void handleChannelIdle(){
160         // TODO: this is already handled by OFChannelHandler
161         // so DON'T care
162
163
164     }
165
166
167     public void start() {
168         sendFirstHello();
169     }
170
171     public void stop() {
172         cancelSwitchTimer();
173         SwitchEvent ev = new SwitchEvent(SwitchEventType.SWITCH_DELETE, this, null);
174         controller.switchDeleted(ev, switchChannelID);
175     }
176
177     private void cancelSwitchTimer() {
178         if (switchLivelinessTaskHandle != null){
179             this.switchLivelinessTaskHandle.cancel();
180         }
181     }
182
183
184     public void handleCaughtException(){
185
186
187
188     }
189
190
191
192
193     @Override
194     public int getNextXid() {
195         return this.xid.incrementAndGet();
196     }
197
198     @Override
199     public Long getId() {
200         return this.sid;
201     }
202
203     @Override
204     public Byte getTables() {
205         return this.tables;
206     }
207
208     @Override
209     public Integer getActions() {
210         return this.actions;
211     }
212
213     @Override
214     public Integer getCapabilities() {
215         return this.capabilities;
216     }
217
218     @Override
219     public Integer getBuffers() {
220         return this.buffers;
221     }
222
223     @Override
224     public Date getConnectedDate() {
225         return this.connectedDate;
226     }
227
228     @Override
229     public Integer asyncSend(OFMessage msg) {
230         return asyncSend(msg, getNextXid());
231     }
232
233
234     @Override
235     public Integer asyncSend(OFMessage msg, int xid) {
236         // TODO:
237         // BATCHING IMPLEMENTATION. Please think hard before enablng this !!
238         // Some messages could be latency-sensitive and some could be batched
239         // for better throughput. So, below decision may not bring better
240         // throughput for latency-sensitive cases like FLOW-MODs or
241         // PACKET-OUTs
242
243         /*
244         if (bufferTrack == BUFFER_THRESHOLD){
245             this.channel.write(msgBuffer);
246             msgBuffer.clear();
247             bufferTrack = 0;
248
249         }
250         msg.setXid(xid);
251         msgBuffer.add(msg);
252         bufferTrack++;
253         */
254
255
256
257         //List<OFMessage> msglist = new ArrayList<OFMessage>(1);
258         msg.setXid(xid);
259         synchronized( flushableMsgBuffer ) {
260             flushableMsgBuffer.add(msg);
261         }
262
263         trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
264                 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
265
266         //this.channel.write(msglist);
267
268         /*
269         if (msg.getType() == OFType.FLOW_MOD){
270             this.trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
271             this.trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
272         }
273         */
274
275
276         return xid;
277     }
278
279
280     @Override
281     public Integer asyncFastSend(OFMessage msg) {
282         return asyncFastSend(msg, getNextXid());
283     }
284
285     @Override
286     public Integer asyncFastSend(OFMessage msg, int xid) {
287         msg.setXid(xid);
288         List<OFMessage> msglist = new ArrayList<OFMessage>(1);
289         msglist.add(msg);
290         this.channel.write(msglist);
291         trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
292                 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
293         return xid;
294     }
295
296     @Override
297     public Object syncSend(OFMessage msg) {
298         int xid = getNextXid();
299         return syncSend(msg, xid);
300     }
301
302     private Object syncSend(OFMessage msg, int xid) {
303         return syncMessageInternal(msg, xid, true);
304     }
305
306     @Override
307     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
308         return this.physicalPorts;
309     }
310
311     @Override
312     public Set<Short> getPorts() {
313         return this.physicalPorts.keySet();
314     }
315
316     @Override
317     public OFPhysicalPort getPhysicalPort(Short portNumber) {
318         return this.physicalPorts.get(portNumber);
319     }
320
321     @Override
322     public Integer getPortBandwidth(Short portNumber) {
323         return this.portBandwidth.get(portNumber);
324     }
325
326     @Override
327     public boolean isPortEnabled(short portNumber) {
328         return isPortEnabled(physicalPorts.get(portNumber));
329     }
330
331     @Override
332     public boolean isPortEnabled(OFPhysicalPort port) {
333         if (port == null) {
334             return false;
335         }
336         int portConfig = port.getConfig();
337         int portState = port.getState();
338         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
339             return false;
340         }
341         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
342             return false;
343         }
344         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
345                 .getValue()) {
346             return false;
347         }
348         return true;
349
350     }
351
352     @Override
353     public List<OFPhysicalPort> getEnabledPorts() {
354         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
355         synchronized (this.physicalPorts) {
356             for (OFPhysicalPort port : physicalPorts.values()) {
357                 if (isPortEnabled(port)) {
358                     result.add(port);
359                 }
360             }
361         }
362         return result;
363     }
364
365
366     /**
367      * WARNING: CALLER WOULD BE BLOCKED
368      *
369      */
370     @Override
371     public Object getStatistics(OFStatisticsRequest req) {
372         int xid = getNextXid();
373         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
374         messageWaitingDone.put(xid, worker);
375         Future<Object> submit = executor.submit(worker);
376         Object result = null;
377         try {
378             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
379             return result;
380         } catch (Exception e) {
381             logger.warn("Timeout while waiting for {} replies", req.getType());
382             result = null; // to indicate timeout has occurred
383             return result;
384         }
385     }
386
387     @Override
388     public boolean isOperational() {
389         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
390     }
391
392     @Override
393     public Object syncSendBarrierMessage() {
394         OFBarrierRequest barrierMsg = new OFBarrierRequest();
395         return syncSend(barrierMsg);
396     }
397
398     @Override
399     public Object asyncSendBarrierMessage() {
400         List<OFMessage> msglist = new ArrayList<OFMessage>(1);
401         OFBarrierRequest barrierMsg = new OFBarrierRequest();
402         int xid = getNextXid();
403
404         barrierMsg.setXid(xid);
405         msglist.add(barrierMsg);
406
407         this.channel.write(msglist);
408         trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REQUEST_SENT);
409         return Boolean.TRUE;
410     }
411
412
413     @Override
414     public void handleMessage(OFMessage ofMessage) {
415
416
417         logger.debug("Message received: {}", ofMessage.toString());
418         this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
419         OFType type = ofMessage.getType();
420         switch (type) {
421         case HELLO:
422             logger.debug("<<<< HELLO");
423             // send feature request
424             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_RECEIVED);
425             OFMessage featureRequest = factory
426                     .getMessage(OFType.FEATURES_REQUEST);
427             asyncFastSend(featureRequest);
428             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REQUEST_SENT);
429             // delete all pre-existing flows
430             OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
431             OFFlowMod flowMod = (OFFlowMod) factory
432                     .getMessage(OFType.FLOW_MOD);
433             flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
434                     .setOutPort(OFPort.OFPP_NONE)
435                     .setLength((short) OFFlowMod.MINIMUM_LENGTH);
436             asyncFastSend(flowMod);
437             this.state = SwitchState.WAIT_FEATURES_REPLY;
438             if (START_LIVELINESS_TIMER){
439                 startSwitchTimer();
440             }
441             break;
442         case ECHO_REQUEST:
443             logger.debug("<<<< ECHO REQUEST");
444             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_RECEIVED);
445             OFEchoReply echoReply = (OFEchoReply) factory
446                     .getMessage(OFType.ECHO_REPLY);
447             asyncFastSend(echoReply);
448             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_SENT);
449
450             break;
451         case ECHO_REPLY:
452             logger.debug("<<<< ECHO REPLY");
453             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_RECEIVED);
454             //this.probeSent = false;
455             break;
456         case FEATURES_REPLY:
457             logger.debug("<<<< FEATURES REPLY");
458             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REPLY_RECEIVED);
459             processFeaturesReply((OFFeaturesReply) ofMessage);
460             break;
461         case GET_CONFIG_REPLY:
462             logger.debug("<<<< CONFIG REPLY");
463             // make sure that the switch can send the whole packet to the
464             // controller
465             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REPLY_RECEIVED);
466             if (((OFGetConfigReply) ofMessage).getMissSendLength() == (short) 0xffff) {
467                 this.state = SwitchState.OPERATIONAL;
468             }
469             break;
470         case BARRIER_REPLY:
471             logger.debug("<<<< BARRIER REPLY");
472             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REPLY_RECEIVED);
473             processBarrierReply((OFBarrierReply) ofMessage);
474             break;
475         case ERROR:
476             logger.debug("<<<< ERROR");
477             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ERROR_MSG_RECEIVED);
478             processErrorReply((OFError) ofMessage);
479             break;
480         case PORT_STATUS:
481             logger.debug("<<<< PORT STATUS");
482             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PORT_STATUS_RECEIVED);
483             processPortStatusMsg((OFPortStatus) ofMessage);
484             break;
485         case STATS_REPLY:
486             logger.debug("<<<< STATS REPLY");
487             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.STATS_RESPONSE_RECEIVED);
488             processStatsReply((OFStatisticsReply) ofMessage);
489             break;
490         case PACKET_IN:
491             logger.debug("<<<< PACKET_IN");
492             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
493             trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
494             break;
495         default:
496             break;
497         } // end of switch
498         if (isOperational()) {
499             logger.debug("SWITCH IS OPERATIONAL ... forwarding");
500             SwitchEvent ev = new SwitchEvent(
501                     SwitchEvent.SwitchEventType.SWITCH_MESSAGE, this, ofMessage);
502             controller.switchMessage(ev, switchChannelID);
503         }
504     }
505
506
507     private void startSwitchTimer(){
508         if (this.timer != null){
509             if (switchLivelinessTask == null){
510                 switchLivelinessTask = new SwitchLivelinessTimerTask();
511             }
512             switchLivelinessTaskHandle = timer.newTimeout(switchLivelinessTask,
513                     switchLivenessTimeout, TimeUnit.SECONDS);
514         }
515     }
516
517
518
519     /**
520      * This method returns the switch liveness timeout value. If controller did
521      * not receive any message from the switch for such a long period,
522      * controller will tear down the connection to the switch.
523      *
524      * @return The timeout value
525      */
526     private static int getSwitchLivenessTimeout() {
527         String timeout = System.getProperty("of.switchLivenessTimeout");
528         int rv = 60500;
529         try {
530             if (timeout != null) {
531                 rv = Integer.parseInt(timeout);
532             }
533         } catch (Exception e) {
534         }
535         return rv;
536     }
537
538
539     private void processFeaturesReply(OFFeaturesReply reply) {
540         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
541             this.sid = reply.getDatapathId();
542             this.buffers = reply.getBuffers();
543             this.capabilities = reply.getCapabilities();
544             this.tables = reply.getTables();
545             this.actions = reply.getActions();
546             // notify core of this error event
547             for (OFPhysicalPort port : reply.getPorts()) {
548                 updatePhysicalPort(port);
549             }
550             // config the switch to send full data packet
551             OFSetConfig config = (OFSetConfig) factory
552                     .getMessage(OFType.SET_CONFIG);
553             config.setMissSendLength((short) 0xffff).setLengthU(
554                     OFSetConfig.MINIMUM_LENGTH);
555             asyncFastSend(config);
556             // send config request to make sure the switch can handle the set
557             // config
558             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
559             asyncFastSend(getConfig);
560             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REQUEST_SENT);
561             this.state = SwitchState.WAIT_CONFIG_REPLY;
562             // inform core that a new switch is now operational
563             reportSwitchStateChange(true);
564         }
565     }
566
567
568     private void updatePhysicalPort(OFPhysicalPort port) {
569         Short portNumber = port.getPortNumber();
570         physicalPorts.put(portNumber, port);
571         portBandwidth
572                 .put(portNumber,
573                         port.getCurrentFeatures()
574                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
575                                         | OFPortFeatures.OFPPF_10MB_HD
576                                                 .getValue()
577                                         | OFPortFeatures.OFPPF_100MB_FD
578                                                 .getValue()
579                                         | OFPortFeatures.OFPPF_100MB_HD
580                                                 .getValue()
581                                         | OFPortFeatures.OFPPF_1GB_FD
582                                                 .getValue()
583                                         | OFPortFeatures.OFPPF_1GB_HD
584                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
585                                             .getValue()));
586         trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.UPDATE_PHYSICAL_PORT);
587     }
588
589
590     private void reportSwitchStateChange(boolean added) {
591         SwitchEvent ev = null;
592         if (added) {
593             ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, this, null);
594             controller.switchAdded(ev, switchChannelID);
595         } else {
596             ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, this, null);
597             controller.switchDeleted(ev, switchChannelID);
598         }
599     }
600
601
602     protected class SwitchLivelinessTimerTask implements TimerTask {
603
604         @Override
605         public void run(Timeout timeout) throws Exception {
606
607             // set this reference in parent so that cancellation is
608             // possible
609             switchLivelinessTaskHandle = timeout;
610             Long now = System.currentTimeMillis();
611             if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
612                 if (state == SwitchState.WAIT_FEATURES_REPLY) {
613                     // send another features request
614                     OFMessage request = factory
615                             .getMessage(OFType.FEATURES_REQUEST);
616                     asyncFastSend(request);
617                 } else {
618                     if (state == SwitchState.WAIT_CONFIG_REPLY) {
619                         // send another config request
620                         OFSetConfig config = (OFSetConfig) factory
621                                 .getMessage(OFType.SET_CONFIG);
622                         config.setMissSendLength((short) 0xffff)
623                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
624                         asyncFastSend(config);
625                         OFMessage getConfig = factory
626                                 .getMessage(OFType.GET_CONFIG_REQUEST);
627                         asyncFastSend(getConfig);
628                     }
629                 }
630             }
631             timer.newTimeout(this, switchLivenessTimeout, TimeUnit.SECONDS);
632
633         }
634     }
635
636
637     /*
638      * Either a BarrierReply or a OFError is received. If this is a reply for an
639      * outstanding sync message, wake up associated task so that it can continue
640      */
641     private void processBarrierReply(OFBarrierReply msg) {
642         Integer xid = msg.getXid();
643         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
644                 .remove(xid);
645         if (worker == null) {
646             return;
647         }
648         worker.wakeup();
649     }
650
651     private void processErrorReply(OFError errorMsg) {
652         try{
653             OFMessage offendingMsg = errorMsg.getOffendingMsg();
654             Integer xi = 0;
655             if (offendingMsg != null) {
656                 xi = offendingMsg.getXid();
657             } else {
658                 xi = errorMsg.getXid();
659             }
660         }
661         catch(MessageParseException mpe){
662             reportError(mpe);
663         }
664     }
665
666     private void processPortStatusMsg(OFPortStatus msg) {
667         OFPhysicalPort port = msg.getDesc();
668         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
669             updatePhysicalPort(port);
670         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
671             updatePhysicalPort(port);
672         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
673                 .ordinal()) {
674             deletePhysicalPort(port);
675         }
676
677     }
678
679     private void deletePhysicalPort(OFPhysicalPort port) {
680         Short portNumber = port.getPortNumber();
681         physicalPorts.remove(portNumber);
682         portBandwidth.remove(portNumber);
683     }
684
685     private void processStatsReply(OFStatisticsReply reply) {
686         Integer xid = reply.getXid();
687         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
688                 .get(xid);
689         if (worker == null) {
690             return;
691         }
692         if (worker.collect(reply)) {
693             // if all the stats records are received (collect() returns true)
694             // then we are done.
695             messageWaitingDone.remove(xid);
696             worker.wakeup();
697         }
698     }
699
700
701     /**
702      * This method performs synchronous operations for a given message. If
703      * syncRequest is set to true, the message will be sent out followed by a
704      * Barrier request message. Then it's blocked until the Barrier rely arrives
705      * or timeout. If syncRequest is false, it simply skips the message send and
706      * just waits for the response back.
707      *
708      * @param msg
709      *            Message to be sent
710      * @param xid
711      *            Message XID
712      * @param request
713      *            If set to true, the message the message will be sent out
714      *            followed by a Barrier request message. If set to false, it
715      *            simply skips the sending and just waits for the Barrier reply.
716      * @return the result
717      */
718     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
719         Object result = null;
720
721         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
722         messageWaitingDone.put(xid, worker);
723
724         Boolean status = false;
725         Future<Object> submit = executor.submit(worker);
726         try {
727             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
728             messageWaitingDone.remove(xid);
729             if (result == null) {
730                 // if result is null, then it means the switch can handle this
731                 // message successfully
732                 // convert the result into a Boolean with value true
733                 status = true;
734                 // logger.debug("Successfully send " +
735                 // msg.getType().toString());
736                 result = status;
737             } else {
738                 // if result is not null, this means the switch can't handle
739                 // this message
740                 // the result if OFError already
741                 logger.debug("Send {} failed --> {}", msg.getType().toString(),
742                         ((OFError) result).toString());
743             }
744             return result;
745         } catch (Exception e) {
746             logger.warn("Timeout while waiting for {} reply", msg.getType()
747                     .toString());
748             // convert the result into a Boolean with value false
749             status = false;
750             result = status;
751             return result;
752         }
753
754
755     }
756
757
758     private void sendFirstHello() {
759         try {
760             OFMessage msg = factory.getMessage(OFType.HELLO);
761             asyncFastSend(msg);
762             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_SENT);
763             trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
764             trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
765         } catch (Exception e) {
766             reportError(e);
767         }
768     }
769
770
771     private void reportError(Exception e) {
772         if (e instanceof AsynchronousCloseException
773                 || e instanceof InterruptedException
774                 || e instanceof SocketException || e instanceof IOException
775                 || e instanceof ClosedSelectorException) {
776             logger.error("Caught exception {}", e.getMessage());
777         } else {
778             logger.error("Caught exception ", e);
779         }
780         // notify core of this error event and disconnect the switch
781
782         // TODO: We do not need this because except-hanling is done by
783         // Controller's OFChannelHandler
784
785         /*
786         SwitchEvent ev = new SwitchEvent(
787                 SwitchEvent.SwitchEventType.SWITCH_ERROR, this, null);
788
789         controller.switchError(ev, switchChannelID);
790         */
791     }
792
793
794     @Override
795     public void flushBufferedMessages() {
796         //flushBatchTrack++;
797         //if (flushBatchTrack > BATCH_COUNT_FOR_FLUSHING){
798         synchronized (flushableMsgBuffer) {
799             if (flushableMsgBuffer.size() > 0){
800                 channel.write(flushableMsgBuffer);
801                 flushableMsgBuffer.clear();
802             }
803         }
804         //    flushBatchTrack = 0;
805         //}
806
807     }
808
809     @Override
810     public SocketAddress getRemoteAddress() {
811         return (channel != null) ? channel.getRemoteAddress() : null;
812     }
813
814     @Override
815     public SocketAddress getLocalAddress() {
816         return (channel != null) ? channel.getLocalAddress() : null;
817     }
818
819 }