Small fix for 3037-5 build failure
[openflowplugin.git] / openflow_netty / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / EnhancedSwitchHandler.java
1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
2
3 import java.io.IOException;
4 import java.net.SocketAddress;
5 import java.net.SocketException;
6 import java.nio.channels.AsynchronousCloseException;
7 import java.nio.channels.ClosedSelectorException;
8 import java.util.ArrayList;
9 import java.util.Date;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Set;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.util.HashedWheelTimer;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.TimerTask;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
26 import org.opendaylight.openflowplugin.openflow.core.internal.StatisticsCollector;
27 import org.opendaylight.openflowplugin.openflow.core.internal.SwitchEvent;
28 import org.opendaylight.openflowplugin.openflow.core.internal.SwitchEvent.SwitchEventType;
29 import org.opendaylight.openflowplugin.openflow.core.internal.SynchronousMessage;
30 import org.openflow.protocol.OFBarrierReply;
31 import org.openflow.protocol.OFBarrierRequest;
32 import org.openflow.protocol.OFEchoReply;
33 import org.openflow.protocol.OFError;
34 import org.openflow.protocol.OFFeaturesReply;
35 import org.openflow.protocol.OFFlowMod;
36 import org.openflow.protocol.OFGetConfigReply;
37 import org.openflow.protocol.OFMatch;
38 import org.openflow.protocol.OFMessage;
39 import org.openflow.protocol.OFPhysicalPort;
40 import org.openflow.protocol.OFPort;
41 import org.openflow.protocol.OFPortStatus;
42 import org.openflow.protocol.OFSetConfig;
43 import org.openflow.protocol.OFStatisticsReply;
44 import org.openflow.protocol.OFStatisticsRequest;
45 import org.openflow.protocol.OFType;
46 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
47 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
48 import org.openflow.protocol.OFPhysicalPort.OFPortState;
49 import org.openflow.protocol.OFPortStatus.OFPortReason;
50 import org.openflow.protocol.factory.BasicFactory;
51 import org.openflow.protocol.factory.MessageParseException;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55 public class EnhancedSwitchHandler implements IEnhancedSwitch {
56
57
58     private static final Logger logger = LoggerFactory
59             .getLogger(EnhancedSwitchHandler.class);
60     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
61     private int MESSAGE_RESPONSE_TIMER = 2000;
62
63     private EnhancedController controller = null;
64     private Integer switchChannelID = null;
65     private Channel channel;
66     private long lastMsgReceivedTimeStamp = 0;
67     private SwitchState state = null;
68     private BasicFactory factory = null;
69     private HashedWheelTimer timer = null;
70     private SwitchLivelinessTimerTask switchLivelinessTask = null;
71     private Timeout switchLivelinessTaskHandle = null;
72     private long sid;
73     private AtomicInteger xid;
74     private int buffers;
75     private int capabilities;
76     private byte tables;
77     private int actions;
78     private Map<Short, OFPhysicalPort> physicalPorts;
79     private Map<Short, Integer> portBandwidth;
80     private Date connectedDate;
81     private ExecutorService executor = null;
82     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
83     private Integer responseTimerValue;
84     private TrafficStatisticsHandler trafficStatsHandler = null;
85     private static final boolean START_LIVELINESS_TIMER = false;
86
87     private static final int BATCH_COUNT_FOR_FLUSHING = 3;
88     private int flushBatchTrack = 0;
89
90     /*
91     private List<OFMessage> msgBuffer = new ArrayList<OFMessage>();
92     private int bufferTrack = 0;
93     private static final int BATCH_BUFFER_THRESHOLD = 100;
94     */
95
96
97     // PLEASE .. IF THERE IS SOMETHING CALLED GOD, HELP ME GET THE THROUGHPUT WITH THIS !!
98     private List<OFMessage> flushableMsgBuffer = new ArrayList<OFMessage>();
99
100
101     public enum SwitchState {
102         NON_OPERATIONAL(0),
103         WAIT_FEATURES_REPLY(1),
104         WAIT_CONFIG_REPLY(2),
105         OPERATIONAL(3);
106
107         private int value;
108
109         private SwitchState(int value) {
110             this.value = value;
111         }
112
113         @SuppressWarnings("unused")
114         public int value() {
115             return this.value;
116         }
117     }
118
119
120     public EnhancedSwitchHandler(EnhancedController controller,
121             Integer switchConnectionChannelID,
122             Channel channel,
123             HashedWheelTimer timer,
124             ExecutorService executor,
125             TrafficStatisticsHandler tHandler){
126
127         this.controller = controller;
128         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
129         this.portBandwidth = new HashMap<Short, Integer>();
130         this.switchChannelID = switchConnectionChannelID;
131         this.timer = timer;
132         this.sid = (long) 0;
133         this.tables = (byte) 0;
134         this.actions = (int) 0;
135         this.capabilities = (int) 0;
136         this.buffers = (int) 0;
137         this.connectedDate = new Date();
138         this.state = SwitchState.NON_OPERATIONAL;
139         this.executor = executor;
140         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
141         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
142         this.channel = channel;
143         this.xid = new AtomicInteger(this.channel.hashCode());
144         this.trafficStatsHandler = tHandler;
145
146     }
147
148     Integer getSwitchChannelID() {
149         return this.switchChannelID;
150     }
151
152     public void startHandler(){
153         this.factory = new BasicFactory();
154         start();
155
156     }
157
158
159     public void shutDownHandler(){
160         stop();
161
162     }
163
164
165     public void handleChannelIdle(){
166         // TODO: this is already handled by OFChannelHandler
167         // so DON'T care
168
169
170     }
171
172
173     public void start() {
174         sendFirstHello();
175     }
176
177     public void stop() {
178         cancelSwitchTimer();
179         SwitchEvent ev = new SwitchEvent(SwitchEventType.SWITCH_DELETE, this, null);
180         controller.switchDeleted(ev, switchChannelID);
181     }
182
183     private void cancelSwitchTimer() {
184         if (switchLivelinessTaskHandle != null){
185             this.switchLivelinessTaskHandle.cancel();
186         }
187     }
188
189
190     public void handleCaughtException(){
191
192
193
194     }
195
196
197
198
199     @Override
200     public int getNextXid() {
201         return this.xid.incrementAndGet();
202     }
203
204     @Override
205     public Long getId() {
206         return this.sid;
207     }
208
209     @Override
210     public Byte getTables() {
211         return this.tables;
212     }
213
214     @Override
215     public Integer getActions() {
216         return this.actions;
217     }
218
219     @Override
220     public Integer getCapabilities() {
221         return this.capabilities;
222     }
223
224     @Override
225     public Integer getBuffers() {
226         return this.buffers;
227     }
228
229     @Override
230     public Date getConnectedDate() {
231         return this.connectedDate;
232     }
233
234     @Override
235     public Integer asyncSend(OFMessage msg) {
236         return asyncSend(msg, getNextXid());
237     }
238
239
240     @Override
241     public Integer asyncSend(OFMessage msg, int xid) {
242         // TODO:
243         // BATCHING IMPLEMENTATION. Please think hard before enablng this !!
244         // Some messages could be latency-sensitive and some could be batched
245         // for better throughput. So, below decision may not bring better
246         // throughput for latency-sensitive cases like FLOW-MODs or
247         // PACKET-OUTs
248
249         /*
250         if (bufferTrack == BUFFER_THRESHOLD){
251             this.channel.write(msgBuffer);
252             msgBuffer.clear();
253             bufferTrack = 0;
254
255         }
256         msg.setXid(xid);
257         msgBuffer.add(msg);
258         bufferTrack++;
259         */
260
261
262
263         //List<OFMessage> msglist = new ArrayList<OFMessage>(1);
264         msg.setXid(xid);
265         synchronized( flushableMsgBuffer ) {
266             flushableMsgBuffer.add(msg);
267         }
268
269         trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
270                 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
271
272         //this.channel.write(msglist);
273
274         /*
275         if (msg.getType() == OFType.FLOW_MOD){
276             this.trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
277             this.trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
278         }
279         */
280
281
282         return xid;
283     }
284
285
286     @Override
287     public Integer asyncFastSend(OFMessage msg) {
288         return asyncFastSend(msg, getNextXid());
289     }
290
291     @Override
292     public Integer asyncFastSend(OFMessage msg, int xid) {
293         msg.setXid(xid);
294         List<OFMessage> msglist = new ArrayList<OFMessage>(1);
295         msglist.add(msg);
296         this.channel.write(msglist);
297         trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
298                 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
299         return xid;
300     }
301
302     @Override
303     public Object syncSend(OFMessage msg) {
304         int xid = getNextXid();
305         return syncSend(msg, xid);
306     }
307
308     private Object syncSend(OFMessage msg, int xid) {
309         return syncMessageInternal(msg, xid, true);
310     }
311
312     @Override
313     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
314         return this.physicalPorts;
315     }
316
317     @Override
318     public Set<Short> getPorts() {
319         return this.physicalPorts.keySet();
320     }
321
322     @Override
323     public OFPhysicalPort getPhysicalPort(Short portNumber) {
324         return this.physicalPorts.get(portNumber);
325     }
326
327     @Override
328     public Integer getPortBandwidth(Short portNumber) {
329         return this.portBandwidth.get(portNumber);
330     }
331
332     @Override
333     public boolean isPortEnabled(short portNumber) {
334         return isPortEnabled(physicalPorts.get(portNumber));
335     }
336
337     @Override
338     public boolean isPortEnabled(OFPhysicalPort port) {
339         if (port == null) {
340             return false;
341         }
342         int portConfig = port.getConfig();
343         int portState = port.getState();
344         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
345             return false;
346         }
347         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
348             return false;
349         }
350         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
351                 .getValue()) {
352             return false;
353         }
354         return true;
355
356     }
357
358     @Override
359     public List<OFPhysicalPort> getEnabledPorts() {
360         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
361         synchronized (this.physicalPorts) {
362             for (OFPhysicalPort port : physicalPorts.values()) {
363                 if (isPortEnabled(port)) {
364                     result.add(port);
365                 }
366             }
367         }
368         return result;
369     }
370
371
372     /**
373      * WARNING: CALLER WOULD BE BLOCKED
374      *
375      */
376     @Override
377     public Object getStatistics(OFStatisticsRequest req) {
378         int xid = getNextXid();
379         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
380         messageWaitingDone.put(xid, worker);
381         Future<Object> submit = executor.submit(worker);
382         Object result = null;
383         try {
384             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
385             return result;
386         } catch (Exception e) {
387             logger.warn("Timeout while waiting for {} replies", req.getType());
388             result = null; // to indicate timeout has occurred
389             return result;
390         }
391     }
392
393     @Override
394     public boolean isOperational() {
395         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
396     }
397
398     @Override
399     public Object syncSendBarrierMessage() {
400         OFBarrierRequest barrierMsg = new OFBarrierRequest();
401         return syncSend(barrierMsg);
402     }
403
404     @Override
405     public Object asyncSendBarrierMessage() {
406         List<OFMessage> msglist = new ArrayList<OFMessage>(1);
407         OFBarrierRequest barrierMsg = new OFBarrierRequest();
408         int xid = getNextXid();
409
410         barrierMsg.setXid(xid);
411         msglist.add(barrierMsg);
412
413         this.channel.write(msglist);
414         trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REQUEST_SENT);
415         return Boolean.TRUE;
416     }
417
418
419     @Override
420     public void handleMessage(OFMessage ofMessage) {
421
422
423         logger.debug("Message received: {}", ofMessage.toString());
424         this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
425         OFType type = ofMessage.getType();
426         switch (type) {
427         case HELLO:
428             logger.debug("<<<< HELLO");
429             // send feature request
430             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_RECEIVED);
431             OFMessage featureRequest = factory
432                     .getMessage(OFType.FEATURES_REQUEST);
433             asyncFastSend(featureRequest);
434             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REQUEST_SENT);
435             // delete all pre-existing flows
436             OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
437             OFFlowMod flowMod = (OFFlowMod) factory
438                     .getMessage(OFType.FLOW_MOD);
439             flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
440                     .setOutPort(OFPort.OFPP_NONE)
441                     .setLength((short) OFFlowMod.MINIMUM_LENGTH);
442             asyncFastSend(flowMod);
443             this.state = SwitchState.WAIT_FEATURES_REPLY;
444             if (START_LIVELINESS_TIMER){
445                 startSwitchTimer();
446             }
447             break;
448         case ECHO_REQUEST:
449             logger.debug("<<<< ECHO REQUEST");
450             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_RECEIVED);
451             OFEchoReply echoReply = (OFEchoReply) factory
452                     .getMessage(OFType.ECHO_REPLY);
453             asyncFastSend(echoReply);
454             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_SENT);
455
456             break;
457         case ECHO_REPLY:
458             logger.debug("<<<< ECHO REPLY");
459             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_RECEIVED);
460             //this.probeSent = false;
461             break;
462         case FEATURES_REPLY:
463             logger.debug("<<<< FEATURES REPLY");
464             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REPLY_RECEIVED);
465             processFeaturesReply((OFFeaturesReply) ofMessage);
466             break;
467         case GET_CONFIG_REPLY:
468             logger.debug("<<<< CONFIG REPLY");
469             // make sure that the switch can send the whole packet to the
470             // controller
471             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REPLY_RECEIVED);
472             if (((OFGetConfigReply) ofMessage).getMissSendLength() == (short) 0xffff) {
473                 this.state = SwitchState.OPERATIONAL;
474             }
475             break;
476         case BARRIER_REPLY:
477             logger.debug("<<<< BARRIER REPLY");
478             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REPLY_RECEIVED);
479             processBarrierReply((OFBarrierReply) ofMessage);
480             break;
481         case ERROR:
482             logger.debug("<<<< ERROR");
483             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ERROR_MSG_RECEIVED);
484             processErrorReply((OFError) ofMessage);
485             break;
486         case PORT_STATUS:
487             logger.debug("<<<< PORT STATUS");
488             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PORT_STATUS_RECEIVED);
489             processPortStatusMsg((OFPortStatus) ofMessage);
490             break;
491         case STATS_REPLY:
492             logger.debug("<<<< STATS REPLY");
493             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.STATS_RESPONSE_RECEIVED);
494             processStatsReply((OFStatisticsReply) ofMessage);
495             break;
496         case PACKET_IN:
497             logger.debug("<<<< PACKET_IN");
498             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
499             trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
500             break;
501         default:
502             break;
503         } // end of switch
504         if (isOperational()) {
505             logger.debug("SWITCH IS OPERATIONAL ... forwarding");
506             SwitchEvent ev = new SwitchEvent(
507                     SwitchEvent.SwitchEventType.SWITCH_MESSAGE, this, ofMessage);
508             controller.switchMessage(ev, switchChannelID);
509         }
510     }
511
512
513     private void startSwitchTimer(){
514         if (this.timer != null){
515             if (switchLivelinessTask == null){
516                 switchLivelinessTask = new SwitchLivelinessTimerTask();
517             }
518             switchLivelinessTaskHandle = timer.newTimeout(switchLivelinessTask,
519                     switchLivenessTimeout, TimeUnit.SECONDS);
520         }
521     }
522
523
524
525     /**
526      * This method returns the switch liveness timeout value. If controller did
527      * not receive any message from the switch for such a long period,
528      * controller will tear down the connection to the switch.
529      *
530      * @return The timeout value
531      */
532     private static int getSwitchLivenessTimeout() {
533         String timeout = System.getProperty("of.switchLivenessTimeout");
534         int rv = 60500;
535         try {
536             if (timeout != null) {
537                 rv = Integer.parseInt(timeout);
538             }
539         } catch (Exception e) {
540         }
541         return rv;
542     }
543
544
545     private void processFeaturesReply(OFFeaturesReply reply) {
546         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
547             this.sid = reply.getDatapathId();
548             this.buffers = reply.getBuffers();
549             this.capabilities = reply.getCapabilities();
550             this.tables = reply.getTables();
551             this.actions = reply.getActions();
552             // notify core of this error event
553             for (OFPhysicalPort port : reply.getPorts()) {
554                 updatePhysicalPort(port);
555             }
556             // config the switch to send full data packet
557             OFSetConfig config = (OFSetConfig) factory
558                     .getMessage(OFType.SET_CONFIG);
559             config.setMissSendLength((short) 0xffff).setLengthU(
560                     OFSetConfig.MINIMUM_LENGTH);
561             asyncFastSend(config);
562             // send config request to make sure the switch can handle the set
563             // config
564             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
565             asyncFastSend(getConfig);
566             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REQUEST_SENT);
567             this.state = SwitchState.WAIT_CONFIG_REPLY;
568             // inform core that a new switch is now operational
569             reportSwitchStateChange(true);
570         }
571     }
572
573
574     private void updatePhysicalPort(OFPhysicalPort port) {
575         Short portNumber = port.getPortNumber();
576         physicalPorts.put(portNumber, port);
577         portBandwidth
578                 .put(portNumber,
579                         port.getCurrentFeatures()
580                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
581                                         | OFPortFeatures.OFPPF_10MB_HD
582                                                 .getValue()
583                                         | OFPortFeatures.OFPPF_100MB_FD
584                                                 .getValue()
585                                         | OFPortFeatures.OFPPF_100MB_HD
586                                                 .getValue()
587                                         | OFPortFeatures.OFPPF_1GB_FD
588                                                 .getValue()
589                                         | OFPortFeatures.OFPPF_1GB_HD
590                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
591                                             .getValue()));
592         trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.UPDATE_PHYSICAL_PORT);
593     }
594
595
596     private void reportSwitchStateChange(boolean added) {
597         SwitchEvent ev = null;
598         if (added) {
599             ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, this, null);
600             controller.switchAdded(ev, switchChannelID);
601         } else {
602             ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, this, null);
603             controller.switchDeleted(ev, switchChannelID);
604         }
605     }
606
607
608     protected class SwitchLivelinessTimerTask implements TimerTask {
609
610         @Override
611         public void run(Timeout timeout) throws Exception {
612
613             // set this reference in parent so that cancellation is
614             // possible
615             switchLivelinessTaskHandle = timeout;
616             Long now = System.currentTimeMillis();
617             if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
618                 if (state == SwitchState.WAIT_FEATURES_REPLY) {
619                     // send another features request
620                     OFMessage request = factory
621                             .getMessage(OFType.FEATURES_REQUEST);
622                     asyncFastSend(request);
623                 } else {
624                     if (state == SwitchState.WAIT_CONFIG_REPLY) {
625                         // send another config request
626                         OFSetConfig config = (OFSetConfig) factory
627                                 .getMessage(OFType.SET_CONFIG);
628                         config.setMissSendLength((short) 0xffff)
629                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
630                         asyncFastSend(config);
631                         OFMessage getConfig = factory
632                                 .getMessage(OFType.GET_CONFIG_REQUEST);
633                         asyncFastSend(getConfig);
634                     }
635                 }
636             }
637             timer.newTimeout(this, switchLivenessTimeout, TimeUnit.SECONDS);
638
639         }
640     }
641
642
643     /*
644      * Either a BarrierReply or a OFError is received. If this is a reply for an
645      * outstanding sync message, wake up associated task so that it can continue
646      */
647     private void processBarrierReply(OFBarrierReply msg) {
648         Integer xid = msg.getXid();
649         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
650                 .remove(xid);
651         if (worker == null) {
652             return;
653         }
654         worker.wakeup();
655     }
656
657     private void processErrorReply(OFError errorMsg) {
658         try{
659             OFMessage offendingMsg = errorMsg.getOffendingMsg();
660             Integer xi = 0;
661             if (offendingMsg != null) {
662                 xi = offendingMsg.getXid();
663             } else {
664                 xi = errorMsg.getXid();
665             }
666         }
667         catch(MessageParseException mpe){
668             reportError(mpe);
669         }
670     }
671
672     private void processPortStatusMsg(OFPortStatus msg) {
673         OFPhysicalPort port = msg.getDesc();
674         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
675             updatePhysicalPort(port);
676         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
677             updatePhysicalPort(port);
678         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
679                 .ordinal()) {
680             deletePhysicalPort(port);
681         }
682
683     }
684
685     private void deletePhysicalPort(OFPhysicalPort port) {
686         Short portNumber = port.getPortNumber();
687         physicalPorts.remove(portNumber);
688         portBandwidth.remove(portNumber);
689     }
690
691     private void processStatsReply(OFStatisticsReply reply) {
692         Integer xid = reply.getXid();
693         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
694                 .get(xid);
695         if (worker == null) {
696             return;
697         }
698         if (worker.collect(reply)) {
699             // if all the stats records are received (collect() returns true)
700             // then we are done.
701             messageWaitingDone.remove(xid);
702             worker.wakeup();
703         }
704     }
705
706
707     /**
708      * This method performs synchronous operations for a given message. If
709      * syncRequest is set to true, the message will be sent out followed by a
710      * Barrier request message. Then it's blocked until the Barrier rely arrives
711      * or timeout. If syncRequest is false, it simply skips the message send and
712      * just waits for the response back.
713      *
714      * @param msg
715      *            Message to be sent
716      * @param xid
717      *            Message XID
718      * @param request
719      *            If set to true, the message the message will be sent out
720      *            followed by a Barrier request message. If set to false, it
721      *            simply skips the sending and just waits for the Barrier reply.
722      * @return the result
723      */
724     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
725         Object result = null;
726
727         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
728         messageWaitingDone.put(xid, worker);
729
730         Boolean status = false;
731         Future<Object> submit = executor.submit(worker);
732         try {
733             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
734             messageWaitingDone.remove(xid);
735             if (result == null) {
736                 // if result is null, then it means the switch can handle this
737                 // message successfully
738                 // convert the result into a Boolean with value true
739                 status = true;
740                 // logger.debug("Successfully send " +
741                 // msg.getType().toString());
742                 result = status;
743             } else {
744                 // if result is not null, this means the switch can't handle
745                 // this message
746                 // the result if OFError already
747                 logger.debug("Send {} failed --> {}", msg.getType().toString(),
748                         ((OFError) result).toString());
749             }
750             return result;
751         } catch (Exception e) {
752             logger.warn("Timeout while waiting for {} reply", msg.getType()
753                     .toString());
754             // convert the result into a Boolean with value false
755             status = false;
756             result = status;
757             return result;
758         }
759
760
761     }
762
763
764     private void sendFirstHello() {
765         try {
766             OFMessage msg = factory.getMessage(OFType.HELLO);
767             asyncFastSend(msg);
768             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_SENT);
769             trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
770             trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
771         } catch (Exception e) {
772             reportError(e);
773         }
774     }
775
776
777     private void reportError(Exception e) {
778         if (e instanceof AsynchronousCloseException
779                 || e instanceof InterruptedException
780                 || e instanceof SocketException || e instanceof IOException
781                 || e instanceof ClosedSelectorException) {
782             logger.error("Caught exception {}", e.getMessage());
783         } else {
784             logger.error("Caught exception ", e);
785         }
786         // notify core of this error event and disconnect the switch
787
788         // TODO: We do not need this because except-hanling is done by
789         // Controller's OFChannelHandler
790
791         /*
792         SwitchEvent ev = new SwitchEvent(
793                 SwitchEvent.SwitchEventType.SWITCH_ERROR, this, null);
794
795         controller.switchError(ev, switchChannelID);
796         */
797     }
798
799
800     @Override
801     public void flushBufferedMessages() {
802         //flushBatchTrack++;
803         //if (flushBatchTrack > BATCH_COUNT_FOR_FLUSHING){
804         synchronized (flushableMsgBuffer) {
805             if (flushableMsgBuffer.size() > 0){
806                 channel.write(flushableMsgBuffer);
807                 flushableMsgBuffer.clear();
808             }
809         }
810         //    flushBatchTrack = 0;
811         //}
812
813     }
814
815     @Override
816     public SocketAddress getRemoteAddress() {
817         return (channel != null) ? channel.getRemoteAddress() : null;
818     }
819
820     @Override
821     public SocketAddress getLocalAddress() {
822         return (channel != null) ? channel.getLocalAddress() : null;
823     }
824
825 }