protocol.openflow_netty cleanup
[controller.git] / opendaylight / protocol_plugins / openflow_netty / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / EnhancedSwitchHandler.java
1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
2
3 import java.io.IOException;
4 import java.net.SocketAddress;
5 import java.net.SocketException;
6 import java.nio.channels.AsynchronousCloseException;
7 import java.nio.channels.ClosedSelectorException;
8 import java.util.ArrayList;
9 import java.util.Date;
10 import java.util.HashMap;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Set;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ConcurrentHashMap;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Future;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import org.jboss.netty.channel.Channel;
22 import org.jboss.netty.util.HashedWheelTimer;
23 import org.jboss.netty.util.Timeout;
24 import org.jboss.netty.util.TimerTask;
25 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
26 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
27 import org.opendaylight.controller.protocol_plugin.openflow.core.internal.SwitchEvent.SwitchEventType;
28 import org.openflow.protocol.OFBarrierReply;
29 import org.openflow.protocol.OFBarrierRequest;
30 import org.openflow.protocol.OFEchoReply;
31 import org.openflow.protocol.OFError;
32 import org.openflow.protocol.OFFeaturesReply;
33 import org.openflow.protocol.OFFlowMod;
34 import org.openflow.protocol.OFGetConfigReply;
35 import org.openflow.protocol.OFMatch;
36 import org.openflow.protocol.OFMessage;
37 import org.openflow.protocol.OFPhysicalPort;
38 import org.openflow.protocol.OFPort;
39 import org.openflow.protocol.OFPortStatus;
40 import org.openflow.protocol.OFSetConfig;
41 import org.openflow.protocol.OFStatisticsReply;
42 import org.openflow.protocol.OFStatisticsRequest;
43 import org.openflow.protocol.OFType;
44 import org.openflow.protocol.OFPhysicalPort.OFPortConfig;
45 import org.openflow.protocol.OFPhysicalPort.OFPortFeatures;
46 import org.openflow.protocol.OFPhysicalPort.OFPortState;
47 import org.openflow.protocol.OFPortStatus.OFPortReason;
48 import org.openflow.protocol.factory.BasicFactory;
49 import org.openflow.protocol.factory.MessageParseException;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 public class EnhancedSwitchHandler implements IEnhancedSwitch {
54
55
56     private static final Logger logger = LoggerFactory
57             .getLogger(EnhancedSwitchHandler.class);
58     private static final int switchLivenessTimeout = getSwitchLivenessTimeout();
59     private int MESSAGE_RESPONSE_TIMER = 2000;
60
61     private EnhancedController controller = null;
62     private Integer switchChannelID = null;
63     private Channel channel;
64     private long lastMsgReceivedTimeStamp = 0;
65     private SwitchState state = null;
66     private BasicFactory factory = null;
67     private HashedWheelTimer timer = null;
68     private SwitchLivelinessTimerTask switchLivelinessTask = null;
69     private Timeout switchLivelinessTaskHandle = null;
70     private long sid;
71     private AtomicInteger xid;
72     private int buffers;
73     private int capabilities;
74     private byte tables;
75     private int actions;
76     private Map<Short, OFPhysicalPort> physicalPorts;
77     private Map<Short, Integer> portBandwidth;
78     private Date connectedDate;
79     private ExecutorService executor = null;
80     private ConcurrentHashMap<Integer, Callable<Object>> messageWaitingDone;
81     private Integer responseTimerValue;
82     private TrafficStatisticsHandler trafficStatsHandler = null;
83     private static final boolean START_LIVELINESS_TIMER = false;
84
85     private static final int BATCH_COUNT_FOR_FLUSHING = 3;
86     private int flushBatchTrack = 0;
87
88     /*
89     private List<OFMessage> msgBuffer = new ArrayList<OFMessage>();
90     private int bufferTrack = 0;
91     private static final int BATCH_BUFFER_THRESHOLD = 100;
92     */
93
94
95     // PLEASE .. IF THERE IS SOMETHING CALLED GOD, HELP ME GET THE THROUGHPUT WITH THIS !!
96     private List<OFMessage> flushableMsgBuffer = new ArrayList<OFMessage>();
97
98
99     public enum SwitchState {
100         NON_OPERATIONAL(0),
101         WAIT_FEATURES_REPLY(1),
102         WAIT_CONFIG_REPLY(2),
103         OPERATIONAL(3);
104
105         private int value;
106
107         private SwitchState(int value) {
108             this.value = value;
109         }
110
111         @SuppressWarnings("unused")
112         public int value() {
113             return this.value;
114         }
115     }
116
117
118     public EnhancedSwitchHandler(EnhancedController controller,
119             Integer switchConnectionChannelID,
120             Channel channel,
121             HashedWheelTimer timer,
122             ExecutorService executor,
123             TrafficStatisticsHandler tHandler){
124
125         this.controller = controller;
126         this.physicalPorts = new HashMap<Short, OFPhysicalPort>();
127         this.portBandwidth = new HashMap<Short, Integer>();
128         this.switchChannelID = switchConnectionChannelID;
129         this.timer = timer;
130         this.sid = (long) 0;
131         this.tables = (byte) 0;
132         this.actions = (int) 0;
133         this.capabilities = (int) 0;
134         this.buffers = (int) 0;
135         this.connectedDate = new Date();
136         this.state = SwitchState.NON_OPERATIONAL;
137         this.executor = executor;
138         this.messageWaitingDone = new ConcurrentHashMap<Integer, Callable<Object>>();
139         this.responseTimerValue = MESSAGE_RESPONSE_TIMER;
140         this.channel = channel;
141         this.xid = new AtomicInteger(this.channel.hashCode());
142         this.trafficStatsHandler = tHandler;
143
144     }
145
146     Integer getSwitchChannelID() {
147         return this.switchChannelID;
148     }
149
150     public void startHandler(){
151         this.factory = new BasicFactory();
152         start();
153
154     }
155
156
157     public void shutDownHandler(){
158         stop();
159
160     }
161
162
163     public void handleChannelIdle(){
164         // TODO: this is already handled by OFChannelHandler
165         // so DON'T care
166
167
168     }
169
170
171     public void start() {
172         sendFirstHello();
173     }
174
175     public void stop() {
176         cancelSwitchTimer();
177         SwitchEvent ev = new SwitchEvent(SwitchEventType.SWITCH_DELETE, this, null);
178         controller.switchDeleted(ev, switchChannelID);
179     }
180
181     private void cancelSwitchTimer() {
182         if (switchLivelinessTaskHandle != null){
183             this.switchLivelinessTaskHandle.cancel();
184         }
185     }
186
187
188     public void handleCaughtException(){
189
190
191
192     }
193
194
195
196
197     @Override
198     public int getNextXid() {
199         return this.xid.incrementAndGet();
200     }
201
202     @Override
203     public Long getId() {
204         return this.sid;
205     }
206
207     @Override
208     public Byte getTables() {
209         return this.tables;
210     }
211
212     @Override
213     public Integer getActions() {
214         return this.actions;
215     }
216
217     @Override
218     public Integer getCapabilities() {
219         return this.capabilities;
220     }
221
222     @Override
223     public Integer getBuffers() {
224         return this.buffers;
225     }
226
227     @Override
228     public Date getConnectedDate() {
229         return this.connectedDate;
230     }
231
232     @Override
233     public Integer asyncSend(OFMessage msg) {
234         return asyncSend(msg, getNextXid());
235     }
236
237
238     @Override
239     public Integer asyncSend(OFMessage msg, int xid) {
240         // TODO:
241         // BATCHING IMPLEMENTATION. Please think hard before enablng this !!
242         // Some messages could be latency-sensitive and some could be batched
243         // for better throughput. So, below decision may not bring better
244         // throughput for latency-sensitive cases like FLOW-MODs or
245         // PACKET-OUTs
246
247         /*
248         if (bufferTrack == BUFFER_THRESHOLD){
249             this.channel.write(msgBuffer);
250             msgBuffer.clear();
251             bufferTrack = 0;
252
253         }
254         msg.setXid(xid);
255         msgBuffer.add(msg);
256         bufferTrack++;
257         */
258
259
260
261         //List<OFMessage> msglist = new ArrayList<OFMessage>(1);
262         msg.setXid(xid);
263         synchronized( flushableMsgBuffer ) {
264             flushableMsgBuffer.add(msg);
265         }
266
267         trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
268                 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
269
270         //this.channel.write(msglist);
271
272         /*
273         if (msg.getType() == OFType.FLOW_MOD){
274             this.trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
275             this.trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.FLOW_MOD_SENT);
276         }
277         */
278
279
280         return xid;
281     }
282
283
284     @Override
285     public Integer asyncFastSend(OFMessage msg) {
286         return asyncFastSend(msg, getNextXid());
287     }
288
289     @Override
290     public Integer asyncFastSend(OFMessage msg, int xid) {
291         msg.setXid(xid);
292         List<OFMessage> msglist = new ArrayList<OFMessage>(1);
293         msglist.add(msg);
294         this.channel.write(msglist);
295         trafficStatsHandler.countForEntitySimpleMeasurement(switchChannelID,
296                 TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
297         return xid;
298     }
299
300     @Override
301     public Object syncSend(OFMessage msg) {
302         int xid = getNextXid();
303         return syncSend(msg, xid);
304     }
305
306     private Object syncSend(OFMessage msg, int xid) {
307         return syncMessageInternal(msg, xid, true);
308     }
309
310     @Override
311     public Map<Short, OFPhysicalPort> getPhysicalPorts() {
312         return this.physicalPorts;
313     }
314
315     @Override
316     public Set<Short> getPorts() {
317         return this.physicalPorts.keySet();
318     }
319
320     @Override
321     public OFPhysicalPort getPhysicalPort(Short portNumber) {
322         return this.physicalPorts.get(portNumber);
323     }
324
325     @Override
326     public Integer getPortBandwidth(Short portNumber) {
327         return this.portBandwidth.get(portNumber);
328     }
329
330     @Override
331     public boolean isPortEnabled(short portNumber) {
332         return isPortEnabled(physicalPorts.get(portNumber));
333     }
334
335     @Override
336     public boolean isPortEnabled(OFPhysicalPort port) {
337         if (port == null) {
338             return false;
339         }
340         int portConfig = port.getConfig();
341         int portState = port.getState();
342         if ((portConfig & OFPortConfig.OFPPC_PORT_DOWN.getValue()) > 0) {
343             return false;
344         }
345         if ((portState & OFPortState.OFPPS_LINK_DOWN.getValue()) > 0) {
346             return false;
347         }
348         if ((portState & OFPortState.OFPPS_STP_MASK.getValue()) == OFPortState.OFPPS_STP_BLOCK
349                 .getValue()) {
350             return false;
351         }
352         return true;
353
354     }
355
356     @Override
357     public List<OFPhysicalPort> getEnabledPorts() {
358         List<OFPhysicalPort> result = new ArrayList<OFPhysicalPort>();
359         synchronized (this.physicalPorts) {
360             for (OFPhysicalPort port : physicalPorts.values()) {
361                 if (isPortEnabled(port)) {
362                     result.add(port);
363                 }
364             }
365         }
366         return result;
367     }
368
369
370     /**
371      * WARNING: CALLER WOULD BE BLOCKED
372      *
373      */
374     @Override
375     public Object getStatistics(OFStatisticsRequest req) {
376         int xid = getNextXid();
377         StatisticsCollector worker = new StatisticsCollector(this, xid, req);
378         messageWaitingDone.put(xid, worker);
379         Future<Object> submit = executor.submit(worker);
380         Object result = null;
381         try {
382             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
383             return result;
384         } catch (Exception e) {
385             logger.warn("Timeout while waiting for {} replies", req.getType());
386             result = null; // to indicate timeout has occurred
387             return result;
388         }
389     }
390
391     @Override
392     public boolean isOperational() {
393         return ((this.state == SwitchState.WAIT_CONFIG_REPLY) || (this.state == SwitchState.OPERATIONAL));
394     }
395
396     @Override
397     public Object syncSendBarrierMessage() {
398         OFBarrierRequest barrierMsg = new OFBarrierRequest();
399         return syncSend(barrierMsg);
400     }
401
402     @Override
403     public Object asyncSendBarrierMessage() {
404         List<OFMessage> msglist = new ArrayList<OFMessage>(1);
405         OFBarrierRequest barrierMsg = new OFBarrierRequest();
406         int xid = getNextXid();
407
408         barrierMsg.setXid(xid);
409         msglist.add(barrierMsg);
410
411         this.channel.write(msglist);
412         trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REQUEST_SENT);
413         return Boolean.TRUE;
414     }
415
416
417     @Override
418     public void handleMessage(OFMessage ofMessage) {
419
420
421         logger.debug("Message received: {}", ofMessage.toString());
422         this.lastMsgReceivedTimeStamp = System.currentTimeMillis();
423         OFType type = ofMessage.getType();
424         switch (type) {
425         case HELLO:
426             logger.debug("<<<< HELLO");
427             // send feature request
428             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_RECEIVED);
429             OFMessage featureRequest = factory
430                     .getMessage(OFType.FEATURES_REQUEST);
431             asyncFastSend(featureRequest);
432             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REQUEST_SENT);
433             // delete all pre-existing flows
434             OFMatch match = new OFMatch().setWildcards(OFMatch.OFPFW_ALL);
435             OFFlowMod flowMod = (OFFlowMod) factory
436                     .getMessage(OFType.FLOW_MOD);
437             flowMod.setMatch(match).setCommand(OFFlowMod.OFPFC_DELETE)
438                     .setOutPort(OFPort.OFPP_NONE)
439                     .setLength((short) OFFlowMod.MINIMUM_LENGTH);
440             asyncFastSend(flowMod);
441             this.state = SwitchState.WAIT_FEATURES_REPLY;
442             if (START_LIVELINESS_TIMER){
443                 startSwitchTimer();
444             }
445             break;
446         case ECHO_REQUEST:
447             logger.debug("<<<< ECHO REQUEST");
448             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_RECEIVED);
449             OFEchoReply echoReply = (OFEchoReply) factory
450                     .getMessage(OFType.ECHO_REPLY);
451             asyncFastSend(echoReply);
452             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_SENT);
453
454             break;
455         case ECHO_REPLY:
456             logger.debug("<<<< ECHO REPLY");
457             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REPLY_RECEIVED);
458             //this.probeSent = false;
459             break;
460         case FEATURES_REPLY:
461             logger.debug("<<<< FEATURES REPLY");
462             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.FEATURES_REPLY_RECEIVED);
463             processFeaturesReply((OFFeaturesReply) ofMessage);
464             break;
465         case GET_CONFIG_REPLY:
466             logger.debug("<<<< CONFIG REPLY");
467             // make sure that the switch can send the whole packet to the
468             // controller
469             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REPLY_RECEIVED);
470             if (((OFGetConfigReply) ofMessage).getMissSendLength() == (short) 0xffff) {
471                 this.state = SwitchState.OPERATIONAL;
472             }
473             break;
474         case BARRIER_REPLY:
475             logger.debug("<<<< BARRIER REPLY");
476             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.BARRIER_REPLY_RECEIVED);
477             processBarrierReply((OFBarrierReply) ofMessage);
478             break;
479         case ERROR:
480             logger.debug("<<<< ERROR");
481             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ERROR_MSG_RECEIVED);
482             processErrorReply((OFError) ofMessage);
483             break;
484         case PORT_STATUS:
485             logger.debug("<<<< PORT STATUS");
486             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PORT_STATUS_RECEIVED);
487             processPortStatusMsg((OFPortStatus) ofMessage);
488             break;
489         case STATS_REPLY:
490             logger.debug("<<<< STATS REPLY");
491             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.STATS_RESPONSE_RECEIVED);
492             processStatsReply((OFStatisticsReply) ofMessage);
493             break;
494         case PACKET_IN:
495             logger.debug("<<<< PACKET_IN");
496             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
497             trafficStatsHandler.countForRateMeasurement(TrafficStatisticsHandler.PACKET_IN_RECEIVED);
498             break;
499         default:
500             break;
501         } // end of switch
502         if (isOperational()) {
503             logger.debug("SWITCH IS OPERATIONAL ... forwarding");
504             SwitchEvent ev = new SwitchEvent(
505                     SwitchEvent.SwitchEventType.SWITCH_MESSAGE, this, ofMessage);
506             controller.switchMessage(ev, switchChannelID);
507         }
508     }
509
510
511     private void startSwitchTimer(){
512         if (this.timer != null){
513             if (switchLivelinessTask == null){
514                 switchLivelinessTask = new SwitchLivelinessTimerTask();
515             }
516             switchLivelinessTaskHandle = timer.newTimeout(switchLivelinessTask,
517                     switchLivenessTimeout, TimeUnit.SECONDS);
518         }
519     }
520
521
522
523     /**
524      * This method returns the switch liveness timeout value. If controller did
525      * not receive any message from the switch for such a long period,
526      * controller will tear down the connection to the switch.
527      *
528      * @return The timeout value
529      */
530     private static int getSwitchLivenessTimeout() {
531         String timeout = System.getProperty("of.switchLivenessTimeout");
532         int rv = 60500;
533         try {
534             if (timeout != null) {
535                 rv = Integer.parseInt(timeout);
536             }
537         } catch (Exception e) {
538         }
539         return rv;
540     }
541
542
543     private void processFeaturesReply(OFFeaturesReply reply) {
544         if (this.state == SwitchState.WAIT_FEATURES_REPLY) {
545             this.sid = reply.getDatapathId();
546             this.buffers = reply.getBuffers();
547             this.capabilities = reply.getCapabilities();
548             this.tables = reply.getTables();
549             this.actions = reply.getActions();
550             // notify core of this error event
551             for (OFPhysicalPort port : reply.getPorts()) {
552                 updatePhysicalPort(port);
553             }
554             // config the switch to send full data packet
555             OFSetConfig config = (OFSetConfig) factory
556                     .getMessage(OFType.SET_CONFIG);
557             config.setMissSendLength((short) 0xffff).setLengthU(
558                     OFSetConfig.MINIMUM_LENGTH);
559             asyncFastSend(config);
560             // send config request to make sure the switch can handle the set
561             // config
562             OFMessage getConfig = factory.getMessage(OFType.GET_CONFIG_REQUEST);
563             asyncFastSend(getConfig);
564             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONFIG_REQUEST_SENT);
565             this.state = SwitchState.WAIT_CONFIG_REPLY;
566             // inform core that a new switch is now operational
567             reportSwitchStateChange(true);
568         }
569     }
570
571
572     private void updatePhysicalPort(OFPhysicalPort port) {
573         Short portNumber = port.getPortNumber();
574         physicalPorts.put(portNumber, port);
575         portBandwidth
576                 .put(portNumber,
577                         port.getCurrentFeatures()
578                                 & (OFPortFeatures.OFPPF_10MB_FD.getValue()
579                                         | OFPortFeatures.OFPPF_10MB_HD
580                                                 .getValue()
581                                         | OFPortFeatures.OFPPF_100MB_FD
582                                                 .getValue()
583                                         | OFPortFeatures.OFPPF_100MB_HD
584                                                 .getValue()
585                                         | OFPortFeatures.OFPPF_1GB_FD
586                                                 .getValue()
587                                         | OFPortFeatures.OFPPF_1GB_HD
588                                                 .getValue() | OFPortFeatures.OFPPF_10GB_FD
589                                             .getValue()));
590         trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.UPDATE_PHYSICAL_PORT);
591     }
592
593
594     private void reportSwitchStateChange(boolean added) {
595         SwitchEvent ev = null;
596         if (added) {
597             ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, this, null);
598             controller.switchAdded(ev, switchChannelID);
599         } else {
600             ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, this, null);
601             controller.switchDeleted(ev, switchChannelID);
602         }
603     }
604
605
606     protected class SwitchLivelinessTimerTask implements TimerTask {
607
608         @Override
609         public void run(Timeout timeout) throws Exception {
610
611             // set this reference in parent so that cancellation is
612             // possible
613             switchLivelinessTaskHandle = timeout;
614             Long now = System.currentTimeMillis();
615             if ((now - lastMsgReceivedTimeStamp) > switchLivenessTimeout) {
616                 if (state == SwitchState.WAIT_FEATURES_REPLY) {
617                     // send another features request
618                     OFMessage request = factory
619                             .getMessage(OFType.FEATURES_REQUEST);
620                     asyncFastSend(request);
621                 } else {
622                     if (state == SwitchState.WAIT_CONFIG_REPLY) {
623                         // send another config request
624                         OFSetConfig config = (OFSetConfig) factory
625                                 .getMessage(OFType.SET_CONFIG);
626                         config.setMissSendLength((short) 0xffff)
627                         .setLengthU(OFSetConfig.MINIMUM_LENGTH);
628                         asyncFastSend(config);
629                         OFMessage getConfig = factory
630                                 .getMessage(OFType.GET_CONFIG_REQUEST);
631                         asyncFastSend(getConfig);
632                     }
633                 }
634             }
635             timer.newTimeout(this, switchLivenessTimeout, TimeUnit.SECONDS);
636
637         }
638     }
639
640
641     /*
642      * Either a BarrierReply or a OFError is received. If this is a reply for an
643      * outstanding sync message, wake up associated task so that it can continue
644      */
645     private void processBarrierReply(OFBarrierReply msg) {
646         Integer xid = msg.getXid();
647         SynchronousMessage worker = (SynchronousMessage) messageWaitingDone
648                 .remove(xid);
649         if (worker == null) {
650             return;
651         }
652         worker.wakeup();
653     }
654
655     private void processErrorReply(OFError errorMsg) {
656         try{
657             OFMessage offendingMsg = errorMsg.getOffendingMsg();
658             Integer xi = 0;
659             if (offendingMsg != null) {
660                 xi = offendingMsg.getXid();
661             } else {
662                 xi = errorMsg.getXid();
663             }
664         }
665         catch(MessageParseException mpe){
666             reportError(mpe);
667         }
668     }
669
670     private void processPortStatusMsg(OFPortStatus msg) {
671         OFPhysicalPort port = msg.getDesc();
672         if (msg.getReason() == (byte) OFPortReason.OFPPR_MODIFY.ordinal()) {
673             updatePhysicalPort(port);
674         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_ADD.ordinal()) {
675             updatePhysicalPort(port);
676         } else if (msg.getReason() == (byte) OFPortReason.OFPPR_DELETE
677                 .ordinal()) {
678             deletePhysicalPort(port);
679         }
680
681     }
682
683     private void deletePhysicalPort(OFPhysicalPort port) {
684         Short portNumber = port.getPortNumber();
685         physicalPorts.remove(portNumber);
686         portBandwidth.remove(portNumber);
687     }
688
689     private void processStatsReply(OFStatisticsReply reply) {
690         Integer xid = reply.getXid();
691         StatisticsCollector worker = (StatisticsCollector) messageWaitingDone
692                 .get(xid);
693         if (worker == null) {
694             return;
695         }
696         if (worker.collect(reply)) {
697             // if all the stats records are received (collect() returns true)
698             // then we are done.
699             messageWaitingDone.remove(xid);
700             worker.wakeup();
701         }
702     }
703
704
705     /**
706      * This method performs synchronous operations for a given message. If
707      * syncRequest is set to true, the message will be sent out followed by a
708      * Barrier request message. Then it's blocked until the Barrier rely arrives
709      * or timeout. If syncRequest is false, it simply skips the message send and
710      * just waits for the response back.
711      *
712      * @param msg
713      *            Message to be sent
714      * @param xid
715      *            Message XID
716      * @param request
717      *            If set to true, the message the message will be sent out
718      *            followed by a Barrier request message. If set to false, it
719      *            simply skips the sending and just waits for the Barrier reply.
720      * @return the result
721      */
722     private Object syncMessageInternal(OFMessage msg, int xid, boolean syncRequest) {
723         Object result = null;
724
725         SynchronousMessage worker = new SynchronousMessage(this, xid, msg, syncRequest);
726         messageWaitingDone.put(xid, worker);
727
728         Boolean status = false;
729         Future<Object> submit = executor.submit(worker);
730         try {
731             result = submit.get(responseTimerValue, TimeUnit.MILLISECONDS);
732             messageWaitingDone.remove(xid);
733             if (result == null) {
734                 // if result is null, then it means the switch can handle this
735                 // message successfully
736                 // convert the result into a Boolean with value true
737                 status = true;
738                 // logger.debug("Successfully send " +
739                 // msg.getType().toString());
740                 result = status;
741             } else {
742                 // if result is not null, this means the switch can't handle
743                 // this message
744                 // the result if OFError already
745                 logger.debug("Send {} failed --> {}", msg.getType().toString(),
746                         ((OFError) result).toString());
747             }
748             return result;
749         } catch (Exception e) {
750             logger.warn("Timeout while waiting for {} reply", msg.getType()
751                     .toString());
752             // convert the result into a Boolean with value false
753             status = false;
754             result = status;
755             return result;
756         }
757
758
759     }
760
761
762     private void sendFirstHello() {
763         try {
764             OFMessage msg = factory.getMessage(OFType.HELLO);
765             asyncFastSend(msg);
766             trafficStatsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.HELLO_SENT);
767             trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
768             trafficStatsHandler.addEntityForCounter(switchChannelID, TrafficStatisticsHandler.ENTITY_COUNTER_SND_MSG);
769         } catch (Exception e) {
770             reportError(e);
771         }
772     }
773
774
775     private void reportError(Exception e) {
776         if (e instanceof AsynchronousCloseException
777                 || e instanceof InterruptedException
778                 || e instanceof SocketException || e instanceof IOException
779                 || e instanceof ClosedSelectorException) {
780             logger.error("Caught exception {}", e.getMessage());
781         } else {
782             logger.error("Caught exception ", e);
783         }
784         // notify core of this error event and disconnect the switch
785
786         // TODO: We do not need this because except-hanling is done by
787         // Controller's OFChannelHandler
788
789         /*
790         SwitchEvent ev = new SwitchEvent(
791                 SwitchEvent.SwitchEventType.SWITCH_ERROR, this, null);
792
793         controller.switchError(ev, switchChannelID);
794         */
795     }
796
797
798     @Override
799     public void flushBufferedMessages() {
800         //flushBatchTrack++;
801         //if (flushBatchTrack > BATCH_COUNT_FOR_FLUSHING){
802         synchronized (flushableMsgBuffer) {
803             if (flushableMsgBuffer.size() > 0){
804                 channel.write(flushableMsgBuffer);
805                 flushableMsgBuffer.clear();
806             }
807         }
808         //    flushBatchTrack = 0;
809         //}
810
811     }
812
813     @Override
814     public SocketAddress getRemoteAddress() {
815         return (channel != null) ? channel.getRemoteAddress() : null;
816     }
817
818     @Override
819     public SocketAddress getLocalAddress() {
820         return (channel != null) ? channel.getLocalAddress() : null;
821     }
822
823 }