protocol.openflow_netty cleanup
[controller.git] / opendaylight / protocol_plugins / openflow_netty / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / EnhancedController.java
1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
2
3 import java.io.IOException;
4 import java.net.InetAddress;
5 import java.net.InetSocketAddress;
6 import java.net.SocketAddress;
7 import java.util.ArrayList;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.ThreadPoolExecutor;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.SocketChannel;
23
24 import org.jboss.netty.bootstrap.ServerBootstrap;
25 import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelHandlerContext;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.group.ChannelGroup;
34 import org.jboss.netty.channel.group.DefaultChannelGroup;
35 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
36 import org.jboss.netty.handler.execution.ExecutionHandler;
37 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
38 import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
39 import org.jboss.netty.handler.timeout.IdleStateEvent;
40 import org.jboss.netty.util.HashedWheelTimer;
41 import org.jboss.netty.util.ObjectSizeEstimator;
42 import org.jboss.netty.handler.timeout.ReadTimeoutException;
43
44
45
46
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFType;
49 import org.openflow.protocol.factory.BasicFactory;
50 import org.openflow.protocol.factory.MessageParseException;
51 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
52 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
53 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
54 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
55 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
56 import org.opendaylight.controller.sal.connection.ConnectionConstants;
57 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
58 import org.opendaylight.controller.sal.core.Node;
59 import org.opendaylight.controller.sal.utils.Status;
60 import org.opendaylight.controller.sal.utils.StatusCode;
61 //import org.opendaylight.controller.protocol_plugin.openflow.core.internal.OFChannelState.HandshakeState;
62 //import org.openflow.protocol.OFType;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66
67
68
69 public class EnhancedController implements IController, IPluginInConnectionService {
70
71
72     protected BasicFactory factory;
73
74
75     private static final Logger logger = LoggerFactory
76             .getLogger(EnhancedController.class);
77
78
79     // Track connected switches via SwitchID
80     private ConcurrentHashMap<Long, ISwitch> connectedSwitches;
81
82     // Track connected switches via ChannelID. Whenever the message
83     private ConcurrentHashMap<Integer, IEnhancedSwitch> channelIDToSwitchMap;
84
85     // only 1 message listener per OFType
86     private ConcurrentMap<OFType, IMessageListener> messageListeners;
87
88     // only 1 switch state listener
89     private ISwitchStateListener switchStateListener;
90     private AtomicInteger switchInstanceNumber;
91
92
93     private OFChannelHandler ofChannelHandler = null;
94     private ControllerServerBootstrap bootstrap = null;
95
96     private ThreadPoolExecutor execHandler = null;
97
98     private static final int SEND_BUFFER_SIZE = 1 * 1024 * 1024;
99     private static final int RECEIVE_BUFFER_SIZE = 1 * 1024 * 1024;
100     private static final int WRITE_BUFFER_LOW_WATERMARK = 32 * 1024;
101     private static final int WRITE_BUFFER_HIGH_WATERMARK = 64 * 1024;
102     private static final String CONTROLLER_HOST = null;
103     private static final int CONTROLLER_PORT = 6633;
104
105     private static final int OMATPE_CORE_POOL_SIZE = 200;
106     private static final int OMATPE_PER_CHANNEL_SIZE = 2 * 1048576;
107     private static final int OMATPE_POOL_WIDE_SIZE = 0; //1073741824;
108     private static final int OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS = 100;
109     private static final int EXPERIMENTAL_OMATPE_OBJECT_SIZE = 1000; // bytes
110
111     private HashedWheelTimer hashedWheelTimer = null;
112
113     // This executor would be used by individual switches to handle
114     // cases like Stats Request/Response or Sync* methods which sends request and
115     // waits via Future for responses. Please note that threads in this
116     // pool are shared across multiple threads. So, if all threads are busy,
117     // Socket IO thread would get blocked creating sharp decline in performance
118     // If possible TOTALLY avoid any thread usage which does network level
119     // request / response by making a thread in this pool wait for response
120     // Consider storing the Future reference against the "sent" request and
121     // fire-event to wake-up the same when response is received rather than making the
122     // sender thread getting into a "wait" mode. That would never scale
123     private ExecutorService executorService = null;
124
125     // IMPORTANT: DO NOT REDUCE THIS THREAD COUNT TO 0
126     // THIS THREAD COUNT WOULD BE USED FOR SOCKET-IO + FOLLOWING EXECUTION CHAIN
127     // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
128     private static final int WORKER_THREAD_COUNT = 4;
129
130     // This is a handy thread-pool if WORKER_THREAD_COUNT is not able to cope with
131     // Socket IO + Execution of the following handling chain
132     // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
133     private static final int EXECUTION_HANDLER_THREAD_POOL_SIZE = 0;
134
135     // This is the thread-pool which can be optionally used for
136     // building synchronous semantics for flow_mod and stats handling cycle
137     // Flow_Mod in synchronous model could involve FLOW_MOD + BARRIER_MSG
138     // sending and receiving with wait timeout for reply
139     // Stats handling in synchronous model could involve STATS_REQUEST + STATS_REPLY
140     // sending and receiving with wait timeout for reply
141     private static final int THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS = 30;
142
143     private TrafficStatisticsHandler statsHandler = null;
144
145     // Lock for locking messagelisteners list while escalating the switch
146     // messages
147     private ReentrantLock lock = new ReentrantLock();
148
149     private static final int FLUSH_BATCH_SIZE = 100;
150
151     //****************** IController Interafce Methods Begin ******************
152
153     @Override
154     public void addMessageListener(OFType type, IMessageListener listener) {
155         IMessageListener currentListener = this.messageListeners.get(type);
156         if (currentListener != null) {
157             logger.warn("{} is already listened by {}", type.toString(),
158                     currentListener.toString());
159         }
160         this.messageListeners.put(type, listener);
161         logger.debug("{} is now listened by {}", type.toString(),
162                 listener.toString());
163
164     }
165
166     @Override
167     public void removeMessageListener(OFType type, IMessageListener listener) {
168         IMessageListener currentListener = this.messageListeners.get(type);
169         if ((currentListener != null) && (currentListener == listener)) {
170             logger.debug("{} listener {} is Removed", type.toString(),
171                     listener.toString());
172             this.messageListeners.remove(type);
173         }
174
175     }
176
177     @Override
178     public void addSwitchStateListener(ISwitchStateListener listener) {
179         if (this.switchStateListener != null) {
180             logger.warn("Switch events are already listened by {}",
181                     this.switchStateListener.toString());
182         }
183         this.switchStateListener = listener;
184         logger.debug("Switch events are now listened by {}",
185                 listener.toString());
186
187     }
188
189     @Override
190     public void removeSwitchStateListener(ISwitchStateListener listener) {
191         if ((this.switchStateListener != null)
192                 && (this.switchStateListener == listener)) {
193             logger.debug("SwitchStateListener {} is Removed",
194                     listener.toString());
195             this.switchStateListener = null;
196         }
197
198     }
199
200     @Override
201     public Map<Long, ISwitch> getSwitches() {
202         return this.connectedSwitches;
203     }
204
205     @Override
206     public ISwitch getSwitch(Long switchId) {
207         return this.connectedSwitches.get(switchId);
208     }
209
210     //****************** IController Interafce Methods End ******************
211
212
213
214     //****************** Dependency-manager callbacks Begin ******************
215     /**
216      * Function called by the dependency manager when all the required
217      * dependencies are satisfied
218      *
219      */
220     public void init() {
221         logger.debug("Initializing!");
222         this.connectedSwitches = new ConcurrentHashMap<Long, ISwitch>();
223         this.channelIDToSwitchMap = new ConcurrentHashMap<Integer, IEnhancedSwitch>();
224         this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
225         this.switchStateListener = null;
226         this.hashedWheelTimer = new HashedWheelTimer();
227         this.statsHandler = new TrafficStatisticsHandler(hashedWheelTimer);
228         this.switchInstanceNumber = new AtomicInteger(0);
229         this.factory = new BasicFactory();
230         this.bootstrap = new ControllerServerBootstrap(this);
231         this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS);
232
233
234     }
235
236     /**
237      * Function called by dependency manager after "init ()" is called and after
238      * the services provided by the class are registered in the service registry
239      *
240      */
241     public void start() {
242         this.statsHandler.init();
243         logger.debug("Starting!");
244         bootstrap.startServer(WORKER_THREAD_COUNT,
245                 CONTROLLER_HOST,
246                 CONTROLLER_PORT,
247                 ofChannelHandler);
248
249
250     }
251
252     /**
253      * Function called by the dependency manager before the services exported by
254      * the component are unregistered, this will be followed by a "destroy ()"
255      * calls
256      *
257      */
258     public void stop() {
259         for (Iterator<Entry<Integer, IEnhancedSwitch>> it = channelIDToSwitchMap.entrySet().iterator(); it
260                 .hasNext();) {
261             Entry<Integer, IEnhancedSwitch> entry = it.next();
262             ((EnhancedSwitchHandler) entry.getValue()).stop();
263         }
264
265         hashedWheelTimer.stop();
266
267         executorService.shutdown();
268     }
269
270     /**
271      * Function called by the dependency manager when at least one dependency
272      * become unsatisfied or when the component is shutting down because for
273      * example bundle is being stopped.
274      *
275      */
276     public void destroy() {
277     }
278     //****************** Dependency-manager callbacks End ******************
279
280
281
282     public OFChannelHandler getChannelHandler(){
283         return new OFChannelHandler(this);
284     }
285
286
287     protected class OFChannelHandler extends IdleStateAwareChannelUpstreamHandler{
288
289
290         protected EnhancedController controller = null;
291         protected Channel channel = null;
292
293
294         public OFChannelHandler(EnhancedController controller){
295             this.controller = controller;
296         }
297
298
299         @Override
300         public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
301                 throws Exception {
302             List<OFMessage> msglist = new ArrayList<OFMessage>(1);
303             msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
304             e.getChannel().write(msglist);
305             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_SENT);
306         }
307
308         @Override
309         public void channelConnected(ChannelHandlerContext ctx,
310                 ChannelStateEvent e) throws Exception {
311             channel = e.getChannel();
312             logger.info("New switch connection from {}",
313                      channel.getRemoteAddress());
314
315             Integer channelID = e.getChannel().getId();
316
317             IEnhancedSwitch switchHandler = new EnhancedSwitchHandler(controller,
318                     channelID, channel, hashedWheelTimer, executorService, statsHandler);
319             switchHandler.startHandler();
320             channelIDToSwitchMap.put(channelID, switchHandler);
321             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
322
323           }
324
325         @Override
326         public void channelDisconnected(ChannelHandlerContext ctx,
327                 ChannelStateEvent e) throws Exception {
328             // when SwitchHandler.shutDownHandler is called, Controller would
329             // get the feedback via switchDeleted method. So that both SwitchHandler and
330             // controller both release resources of the switch concerned
331
332             Integer channelID = e.getChannel().getId();
333             IEnhancedSwitch switchHandler = channelIDToSwitchMap.get(channelID);
334             if (switchHandler != null){
335                 switchHandler.shutDownHandler();
336             }
337             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.DISCONNECTED_SWITCHES);
338
339         }
340
341         @Override
342         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
343                 throws Exception {
344
345             EnhancedSwitchHandler sw = null;
346
347             if (e.getCause() instanceof ReadTimeoutException) {
348                 // switch timeout
349                 logger.error("Disconnecting switch {} due to read timeout",
350                         e.getChannel().getId(), e.getCause().getMessage());
351                 ctx.getChannel().close();
352                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
353                 sw.stop();
354             /*
355             } else if (e.getCause() instanceof HandshakeTimeoutException) {
356                 logger.error("Disconnecting switch {}: failed to complete handshake",
357                         e.getChannel().getId());
358                 ctx.getChannel().close();
359                 channelIDToSwitchMap.remove(e.getChannel().getId());
360                 */
361             } else if (e.getCause() instanceof ClosedChannelException) {
362                 logger.warn("Channel for sw {} already closed Error : {}",
363                         e.getChannel().getId(), e.getCause().getMessage());
364                 ctx.getChannel().close();
365                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
366                 sw.stop();
367             } else if (e.getCause() instanceof IOException) {
368                 logger.error("Disconnecting switch {} due to IO Error: {}",
369                         e.getChannel().getId(), e.getCause().getMessage());
370                 ctx.getChannel().close();
371                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
372                 sw.stop();
373             /*
374             } else if (e.getCause() instanceof SwitchStateException) {
375                 logger.error("Disconnecting switch {} due to switch state error: {}",
376                         e.getChannel().getId(), e.getCause().getMessage());
377                 ctx.getChannel().close();
378                 channelIDToSwitchMap.remove(e.getChannel().getId());
379
380             } else if (e.getCause() instanceof MessageParseException) {
381                 logger.error("Disconnecting switch {} due to message parse error Error : {}",
382                         e.getChannel().getId(), e.getCause().getMessage());
383                 ctx.getChannel().close();
384                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
385                 sw.stop(); */
386             } else if (e.getCause() instanceof RejectedExecutionException) {
387                 logger.warn("Could not process message: queue full");
388                 ctx.getChannel().close();
389                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
390                 sw.stop();
391             } else {
392                 logger.error("Error while processing message from switch {} Error : {}",
393                         e.getChannel().getId(), e.getCause().getMessage());
394                 e.getCause().printStackTrace();
395                 ctx.getChannel().close();
396                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
397                 sw.stop();
398             }
399
400             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.EXCEPTION_CAUGHT);
401         }
402
403         @Override
404         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
405                 throws Exception {
406             Integer messageChannelId = e.getChannel().getId();
407             IEnhancedSwitch swHan = (EnhancedSwitchHandler)channelIDToSwitchMap.get(messageChannelId);
408
409             if (e.getMessage() instanceof List) {
410                 //@SuppressWarnings("unchecked")
411                 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
412                 if (msglist != null){ // this check actually brought down rate to some extent - weird !!!
413                     for (OFMessage ofm : msglist) {
414                         try {
415
416                             // Do the actual packet processing
417                             processOFMessage(ofm, messageChannelId);
418                         }
419                         catch (Exception ex) {
420                             // We are the last handler in the stream, so run the
421                             // exception through the channel again by passing in
422                             // ctx.getChannel().
423                             Channels.fireExceptionCaught(ctx.getChannel(), ex);
424                         }
425                     }
426                 }
427             }
428
429             // Flush all flow-mods/packet-out/stats generated from this "train"
430             swHan.flushBufferedMessages();
431
432             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.MESSAGE_RECEIVED);
433
434
435         }
436
437
438         public void processOFMessage(OFMessage ofm, Integer channelID){
439             IEnhancedSwitch switchHandler = (IEnhancedSwitch) channelIDToSwitchMap.get(channelID);
440             statsHandler.countForEntitySimpleMeasurement(channelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
441             if (switchHandler != null){
442                 switchHandler.handleMessage(ofm);
443             }
444         }
445
446
447     }
448
449
450     protected class ControllerServerBootstrap{
451
452         private int workerThreads = 0;
453         private EnhancedController controller = null;
454
455         public ControllerServerBootstrap(EnhancedController controller){
456             this.controller = controller;
457         }
458
459
460         public void startServer(int numWorkerThreads, String openFlowHost, int openFlowPort, OFChannelHandler ofchan){
461             this.workerThreads = numWorkerThreads;
462             try {
463                 final ServerBootstrap bootstrap = createServerBootStrap();
464
465                  bootstrap.setOption("reuseAddr", true);
466                  bootstrap.setOption("child.keepAlive", true);
467                  bootstrap.setOption("child.tcpNoDelay", true);
468                  bootstrap.setOption("child.receiveBufferSize", EnhancedController.RECEIVE_BUFFER_SIZE);
469                  bootstrap.setOption("child.sendBufferSize", EnhancedController.SEND_BUFFER_SIZE);
470
471                  // better to have an receive buffer predictor
472                  //bootstrap.setOption("receiveBufferSizePredictorFactory",
473                  //      new AdaptiveReceiveBufferSizePredictorFactory());
474                  //if the server is sending 1000 messages per sec, optimum write buffer water marks will
475                  //prevent unnecessary throttling, Check NioSocketChannelConfig doc
476                  //bootstrap.setOption("writeBufferLowWaterMark", WRITE_BUFFER_LOW_WATERMARK);
477                  //bootstrap.setOption("writeBufferHighWaterMark", WRITE_BUFFER_HIGH_WATERMARK);
478
479                  // TODO: IMPORTANT: If the threadpool is supplied as null, ExecutionHandler would
480                  // not be present in pipeline. If the load increases and ordering is required ,
481                  // use OrderedMemoryAwareThreadPoolExecutor as argument instead of null
482
483                  /*
484                  execHandler = new OrderedMemoryAwareThreadPoolExecutor(
485                                  OMATPE_CORE_POOL_SIZE,
486                                  OMATPE_PER_CHANNEL_SIZE,
487                                  OMATPE_POOL_WIDE_SIZE,
488                                  OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
489                                  TimeUnit.MILLISECONDS,
490                                  new ObjectSizeEstimator() {
491
492                                     @Override
493                                     public int estimateSize(Object o) {
494                                         return 30000;
495                                     }
496                                 },
497                                 Executors.defaultThreadFactory());     */
498
499                  execHandler = new OrderedMemoryAwareThreadPoolExecutor(
500                          OMATPE_CORE_POOL_SIZE,
501                          OMATPE_PER_CHANNEL_SIZE,
502                          OMATPE_POOL_WIDE_SIZE,
503                          OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
504                          TimeUnit.MILLISECONDS);
505
506
507
508                  ChannelPipelineFactory pfact =
509                          new OpenflowPipelineFactory(controller, execHandler);
510                  bootstrap.setPipelineFactory(pfact);
511                  InetSocketAddress sa =
512                          (openFlowHost == null)
513                          ? new InetSocketAddress(openFlowPort)
514                          : new InetSocketAddress(openFlowHost, openFlowPort);
515                  final ChannelGroup cg = new DefaultChannelGroup();
516                  cg.add(bootstrap.bind(sa));
517
518
519              } catch (Exception e) {
520                  throw new RuntimeException(e);
521              }
522
523         }
524
525         private ServerBootstrap createServerBootStrap() {
526             if (workerThreads == 0) {
527                 return new ServerBootstrap(
528                         new NioServerSocketChannelFactory(
529                                 Executors.newCachedThreadPool(),
530                                 Executors.newCachedThreadPool()));
531             } else {
532                 return new ServerBootstrap(
533                         new NioServerSocketChannelFactory(
534                                 Executors.newCachedThreadPool(),
535                                 Executors.newCachedThreadPool(), workerThreads));
536             }
537         }
538
539
540
541     }
542
543
544     /**
545      * Method called by SwitchHandler once the handshake state is completed
546      *
547      * @param sw
548      */
549     public void switchAdded(SwitchEvent switchEv, Integer switchChannelID){
550
551         ISwitch sw = switchEv.getSwitch();
552         Long switchId = sw.getId();
553
554         connectedSwitches.put(switchId, sw);
555         statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
556
557         logger.info("Switch with DPID : {} connected ", switchId);
558
559         notifySwitchAdded(sw);
560     }
561
562
563     /**
564      * Method called by SwitchHandler switch is disconnected
565      *
566      * @param sw
567      */
568
569     public void switchDeleted(SwitchEvent switchEv, Integer switchChannelID){
570         ISwitch sw = switchEv.getSwitch();
571         disconnectSwitch(sw, switchChannelID);
572     }
573
574
575     /**
576      * Method called by SwitchHandler when it encounters any errors
577      *
578      *
579      * @param sw
580      */
581
582     public void switchError(SwitchEvent switchEv, Integer switchChannelID){
583
584     }
585
586
587     public void switchMessage(SwitchEvent switchEv, Integer switchChannelID){
588         long startTime = 0L;
589         long endTime = 0L;
590
591
592         OFMessage msg = switchEv.getMsg();
593         ISwitch sw = switchEv.getSwitch();
594         if (msg != null) {
595             //try{
596             //    lock.lock();
597                 IMessageListener listener = messageListeners
598                         .get(msg.getType());
599                 if (listener != null) {
600                     //logger.debug("delegating to msg-receiver");
601                     //startTime = System.nanoTime();
602                     listener.receive(sw, msg);
603                     //endTime = System.nanoTime();
604                     //this.statsHandler.reportPacketInProcessingTime(endTime - startTime);
605                 }
606             //}
607             //finally{
608             //    lock.unlock();
609             //}
610         }
611     }
612
613     public void disconnectSwitch(ISwitch sw, Integer switchChannelID){
614         Long sid = null;
615         if (((EnhancedSwitchHandler) sw).isOperational()) {
616             sid = sw.getId();
617
618             this.connectedSwitches.remove(sid);
619             this.channelIDToSwitchMap.remove(switchChannelID);
620             notifySwitchDeleted(sw);
621         }
622         //((EnhancedSwitchHandler) sw).stop();
623         logger.info("Switch with DPID {} disconnected", sid);
624         sw = null;
625     }
626
627
628     private void notifySwitchAdded(ISwitch sw) {
629         if (switchStateListener != null) {
630             switchStateListener.switchAdded(sw);
631         }
632     }
633
634     private void notifySwitchDeleted(ISwitch sw) {
635         if (switchStateListener != null) {
636             switchStateListener.switchDeleted(sw);
637         }
638     }
639
640     @Override
641     public Status disconnect(Node node) {
642         ISwitch sw = getSwitch((Long)node.getID());
643         if (sw != null) {
644             if (sw instanceof EnhancedSwitchHandler) {
645                 EnhancedSwitchHandler eSw = (EnhancedSwitchHandler)sw;
646                 disconnectSwitch(sw, eSw.getSwitchChannelID());
647             }
648         }
649         return new Status(StatusCode.SUCCESS);
650     }
651
652     @Override
653     public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
654         return null;
655     }
656
657     /**
658      * View Change notification
659      */
660     public void notifyClusterViewChanged() {
661         for (ISwitch sw : connectedSwitches.values()) {
662             notifySwitchAdded(sw);
663         }
664     }
665
666     /**
667      * Node Disconnected from the node's master controller.
668      */
669     @Override
670     public void notifyNodeDisconnectFromMaster(Node node) {
671         ISwitch sw = connectedSwitches.get((Long)node.getID());
672         if (sw != null) notifySwitchAdded(sw);
673     }
674 }