Small fix for 3037-5 build failure
[openflowplugin.git] / openflow_netty / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / EnhancedController.java
1 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
2
3 import java.io.IOException;
4 import java.net.InetAddress;
5 import java.net.InetSocketAddress;
6 import java.net.SocketAddress;
7 import java.util.ArrayList;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Map;
11 import java.util.Map.Entry;
12 import java.util.concurrent.ConcurrentHashMap;
13 import java.util.concurrent.ConcurrentMap;
14 import java.util.concurrent.ExecutorService;
15 import java.util.concurrent.Executors;
16 import java.util.concurrent.ThreadPoolExecutor;
17 import java.util.concurrent.TimeUnit;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.locks.ReentrantLock;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.nio.channels.ClosedChannelException;
22 import java.nio.channels.SocketChannel;
23
24 import org.jboss.netty.bootstrap.ServerBootstrap;
25 import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
26 import org.jboss.netty.channel.Channel;
27 import org.jboss.netty.channel.ChannelHandlerContext;
28 import org.jboss.netty.channel.ChannelPipelineFactory;
29 import org.jboss.netty.channel.ChannelStateEvent;
30 import org.jboss.netty.channel.Channels;
31 import org.jboss.netty.channel.ExceptionEvent;
32 import org.jboss.netty.channel.MessageEvent;
33 import org.jboss.netty.channel.group.ChannelGroup;
34 import org.jboss.netty.channel.group.DefaultChannelGroup;
35 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
36 import org.jboss.netty.handler.execution.ExecutionHandler;
37 import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor;
38 import org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler;
39 import org.jboss.netty.handler.timeout.IdleStateEvent;
40 import org.jboss.netty.util.HashedWheelTimer;
41 import org.jboss.netty.util.ObjectSizeEstimator;
42 import org.jboss.netty.handler.timeout.ReadTimeoutException;
43
44
45
46
47 import org.openflow.protocol.OFMessage;
48 import org.openflow.protocol.OFType;
49 import org.openflow.protocol.factory.BasicFactory;
50 import org.openflow.protocol.factory.MessageParseException;
51 import org.opendaylight.controller.protocol_plugin.openflow.core.IEnhancedSwitch;
52 import org.opendaylight.controller.sal.connection.ConnectionConstants;
53 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
54 import org.opendaylight.controller.sal.core.Node;
55 import org.opendaylight.controller.sal.utils.Status;
56 import org.opendaylight.controller.sal.utils.StatusCode;
57 import org.opendaylight.openflowplugin.openflow.core.IController;
58 import org.opendaylight.openflowplugin.openflow.core.IMessageListener;
59 import org.opendaylight.openflowplugin.openflow.core.ISwitch;
60 import org.opendaylight.openflowplugin.openflow.core.ISwitchStateListener;
61 import org.opendaylight.openflowplugin.openflow.core.internal.SwitchEvent;
62 //import org.opendaylight.controller.protocol_plugin.openflow.core.internal.OFChannelState.HandshakeState;
63 //import org.openflow.protocol.OFType;
64 import org.slf4j.Logger;
65 import org.slf4j.LoggerFactory;
66
67
68
69
70 public class EnhancedController implements IController, IPluginInConnectionService {
71
72
73     protected BasicFactory factory;
74
75
76     private static final Logger logger = LoggerFactory
77             .getLogger(EnhancedController.class);
78
79
80     // Track connected switches via SwitchID
81     private ConcurrentHashMap<Long, ISwitch> connectedSwitches;
82
83     // Track connected switches via ChannelID. Whenever the message
84     private ConcurrentHashMap<Integer, IEnhancedSwitch> channelIDToSwitchMap;
85
86     // only 1 message listener per OFType
87     private ConcurrentMap<OFType, IMessageListener> messageListeners;
88
89     // only 1 switch state listener
90     private ISwitchStateListener switchStateListener;
91     private AtomicInteger switchInstanceNumber;
92
93
94     private OFChannelHandler ofChannelHandler = null;
95     private ControllerServerBootstrap bootstrap = null;
96
97     private ThreadPoolExecutor execHandler = null;
98
99     private static final int SEND_BUFFER_SIZE = 1 * 1024 * 1024;
100     private static final int RECEIVE_BUFFER_SIZE = 1 * 1024 * 1024;
101     private static final int WRITE_BUFFER_LOW_WATERMARK = 32 * 1024;
102     private static final int WRITE_BUFFER_HIGH_WATERMARK = 64 * 1024;
103     private static final String CONTROLLER_HOST = null;
104     private static final int CONTROLLER_PORT = 6633;
105
106     private static final int OMATPE_CORE_POOL_SIZE = 200;
107     private static final int OMATPE_PER_CHANNEL_SIZE = 2 * 1048576;
108     private static final int OMATPE_POOL_WIDE_SIZE = 0; //1073741824;
109     private static final int OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS = 100;
110     private static final int EXPERIMENTAL_OMATPE_OBJECT_SIZE = 1000; // bytes
111
112     private HashedWheelTimer hashedWheelTimer = null;
113
114     // This executor would be used by individual switches to handle
115     // cases like Stats Request/Response or Sync* methods which sends request and
116     // waits via Future for responses. Please note that threads in this
117     // pool are shared across multiple threads. So, if all threads are busy,
118     // Socket IO thread would get blocked creating sharp decline in performance
119     // If possible TOTALLY avoid any thread usage which does network level
120     // request / response by making a thread in this pool wait for response
121     // Consider storing the Future reference against the "sent" request and
122     // fire-event to wake-up the same when response is received rather than making the
123     // sender thread getting into a "wait" mode. That would never scale
124     private ExecutorService executorService = null;
125
126     // IMPORTANT: DO NOT REDUCE THIS THREAD COUNT TO 0
127     // THIS THREAD COUNT WOULD BE USED FOR SOCKET-IO + FOLLOWING EXECUTION CHAIN
128     // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
129     private static final int WORKER_THREAD_COUNT = 4;
130
131     // This is a handy thread-pool if WORKER_THREAD_COUNT is not able to cope with
132     // Socket IO + Execution of the following handling chain
133     // Plugin + SAL + North-to-SAL + Egress (flow_provisioning)
134     private static final int EXECUTION_HANDLER_THREAD_POOL_SIZE = 0;
135
136     // This is the thread-pool which can be optionally used for
137     // building synchronous semantics for flow_mod and stats handling cycle
138     // Flow_Mod in synchronous model could involve FLOW_MOD + BARRIER_MSG
139     // sending and receiving with wait timeout for reply
140     // Stats handling in synchronous model could involve STATS_REQUEST + STATS_REPLY
141     // sending and receiving with wait timeout for reply
142     private static final int THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS = 30;
143
144     private TrafficStatisticsHandler statsHandler = null;
145
146     // Lock for locking messagelisteners list while escalating the switch
147     // messages
148     private ReentrantLock lock = new ReentrantLock();
149
150     private static final int FLUSH_BATCH_SIZE = 100;
151
152     //****************** IController Interafce Methods Begin ******************
153
154     @Override
155     public void addMessageListener(OFType type, IMessageListener listener) {
156         IMessageListener currentListener = this.messageListeners.get(type);
157         if (currentListener != null) {
158             logger.warn("{} is already listened by {}", type.toString(),
159                     currentListener.toString());
160         }
161         this.messageListeners.put(type, listener);
162         logger.debug("{} is now listened by {}", type.toString(),
163                 listener.toString());
164
165     }
166
167     @Override
168     public void removeMessageListener(OFType type, IMessageListener listener) {
169         IMessageListener currentListener = this.messageListeners.get(type);
170         if ((currentListener != null) && (currentListener == listener)) {
171             logger.debug("{} listener {} is Removed", type.toString(),
172                     listener.toString());
173             this.messageListeners.remove(type);
174         }
175
176     }
177
178     @Override
179     public void addSwitchStateListener(ISwitchStateListener listener) {
180         if (this.switchStateListener != null) {
181             logger.warn("Switch events are already listened by {}",
182                     this.switchStateListener.toString());
183         }
184         this.switchStateListener = listener;
185         logger.debug("Switch events are now listened by {}",
186                 listener.toString());
187
188     }
189
190     @Override
191     public void removeSwitchStateListener(ISwitchStateListener listener) {
192         if ((this.switchStateListener != null)
193                 && (this.switchStateListener == listener)) {
194             logger.debug("SwitchStateListener {} is Removed",
195                     listener.toString());
196             this.switchStateListener = null;
197         }
198
199     }
200
201     @Override
202     public Map<Long, ISwitch> getSwitches() {
203         return this.connectedSwitches;
204     }
205
206     @Override
207     public ISwitch getSwitch(Long switchId) {
208         return this.connectedSwitches.get(switchId);
209     }
210
211     //****************** IController Interafce Methods End ******************
212
213
214
215     //****************** Dependency-manager callbacks Begin ******************
216     /**
217      * Function called by the dependency manager when all the required
218      * dependencies are satisfied
219      *
220      */
221     public void init() {
222         logger.debug("Initializing!");
223         this.connectedSwitches = new ConcurrentHashMap<Long, ISwitch>();
224         this.channelIDToSwitchMap = new ConcurrentHashMap<Integer, IEnhancedSwitch>();
225         this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
226         this.switchStateListener = null;
227         this.hashedWheelTimer = new HashedWheelTimer();
228         this.statsHandler = new TrafficStatisticsHandler(hashedWheelTimer);
229         this.switchInstanceNumber = new AtomicInteger(0);
230         this.factory = new BasicFactory();
231         this.bootstrap = new ControllerServerBootstrap(this);
232         this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE_FOR_EGRESS_SYNC_MSGS);
233
234
235     }
236
237     /**
238      * Function called by dependency manager after "init ()" is called and after
239      * the services provided by the class are registered in the service registry
240      *
241      */
242     public void start() {
243         this.statsHandler.init();
244         logger.debug("Starting!");
245         bootstrap.startServer(WORKER_THREAD_COUNT,
246                 CONTROLLER_HOST,
247                 CONTROLLER_PORT,
248                 ofChannelHandler);
249
250
251     }
252
253     /**
254      * Function called by the dependency manager before the services exported by
255      * the component are unregistered, this will be followed by a "destroy ()"
256      * calls
257      *
258      */
259     public void stop() {
260         for (Iterator<Entry<Integer, IEnhancedSwitch>> it = channelIDToSwitchMap.entrySet().iterator(); it
261                 .hasNext();) {
262             Entry<Integer, IEnhancedSwitch> entry = it.next();
263             ((EnhancedSwitchHandler) entry.getValue()).stop();
264         }
265
266         hashedWheelTimer.stop();
267
268         executorService.shutdown();
269     }
270
271     /**
272      * Function called by the dependency manager when at least one dependency
273      * become unsatisfied or when the component is shutting down because for
274      * example bundle is being stopped.
275      *
276      */
277     public void destroy() {
278     }
279     //****************** Dependency-manager callbacks End ******************
280
281
282
283     public OFChannelHandler getChannelHandler(){
284         return new OFChannelHandler(this);
285     }
286
287
288     protected class OFChannelHandler extends IdleStateAwareChannelUpstreamHandler{
289
290
291         protected EnhancedController controller = null;
292         protected Channel channel = null;
293
294
295         public OFChannelHandler(EnhancedController controller){
296             this.controller = controller;
297         }
298
299
300         @Override
301         public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
302                 throws Exception {
303             List<OFMessage> msglist = new ArrayList<OFMessage>(1);
304             msglist.add(factory.getMessage(OFType.ECHO_REQUEST));
305             e.getChannel().write(msglist);
306             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.ECHO_REQUEST_SENT);
307         }
308
309         @Override
310         public void channelConnected(ChannelHandlerContext ctx,
311                 ChannelStateEvent e) throws Exception {
312             channel = e.getChannel();
313             logger.info("New switch connection from {}",
314                      channel.getRemoteAddress());
315
316             Integer channelID = e.getChannel().getId();
317
318             IEnhancedSwitch switchHandler = new EnhancedSwitchHandler(controller,
319                     channelID, channel, hashedWheelTimer, executorService, statsHandler);
320             switchHandler.startHandler();
321             channelIDToSwitchMap.put(channelID, switchHandler);
322             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
323
324           }
325
326         @Override
327         public void channelDisconnected(ChannelHandlerContext ctx,
328                 ChannelStateEvent e) throws Exception {
329             // when SwitchHandler.shutDownHandler is called, Controller would
330             // get the feedback via switchDeleted method. So that both SwitchHandler and
331             // controller both release resources of the switch concerned
332
333             Integer channelID = e.getChannel().getId();
334             IEnhancedSwitch switchHandler = channelIDToSwitchMap.get(channelID);
335             if (switchHandler != null){
336                 switchHandler.shutDownHandler();
337             }
338             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.DISCONNECTED_SWITCHES);
339
340         }
341
342         @Override
343         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
344                 throws Exception {
345
346             EnhancedSwitchHandler sw = null;
347
348             if (e.getCause() instanceof ReadTimeoutException) {
349                 // switch timeout
350                 logger.error("Disconnecting switch {} due to read timeout",
351                         e.getChannel().getId(), e.getCause().getMessage());
352                 ctx.getChannel().close();
353                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
354                 sw.stop();
355             /*
356             } else if (e.getCause() instanceof HandshakeTimeoutException) {
357                 logger.error("Disconnecting switch {}: failed to complete handshake",
358                         e.getChannel().getId());
359                 ctx.getChannel().close();
360                 channelIDToSwitchMap.remove(e.getChannel().getId());
361                 */
362             } else if (e.getCause() instanceof ClosedChannelException) {
363                 logger.warn("Channel for sw {} already closed Error : {}",
364                         e.getChannel().getId(), e.getCause().getMessage());
365                 ctx.getChannel().close();
366                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
367                 sw.stop();
368             } else if (e.getCause() instanceof IOException) {
369                 logger.error("Disconnecting switch {} due to IO Error: {}",
370                         e.getChannel().getId(), e.getCause().getMessage());
371                 ctx.getChannel().close();
372                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
373                 sw.stop();
374             /*
375             } else if (e.getCause() instanceof SwitchStateException) {
376                 logger.error("Disconnecting switch {} due to switch state error: {}",
377                         e.getChannel().getId(), e.getCause().getMessage());
378                 ctx.getChannel().close();
379                 channelIDToSwitchMap.remove(e.getChannel().getId());
380
381             } else if (e.getCause() instanceof MessageParseException) {
382                 logger.error("Disconnecting switch {} due to message parse error Error : {}",
383                         e.getChannel().getId(), e.getCause().getMessage());
384                 ctx.getChannel().close();
385                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
386                 sw.stop(); */
387             } else if (e.getCause() instanceof RejectedExecutionException) {
388                 logger.warn("Could not process message: queue full");
389                 ctx.getChannel().close();
390                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
391                 sw.stop();
392             } else {
393                 logger.error("Error while processing message from switch {} Error : {}",
394                         e.getChannel().getId(), e.getCause().getMessage());
395                 e.getCause().printStackTrace();
396                 ctx.getChannel().close();
397                 sw = (EnhancedSwitchHandler)channelIDToSwitchMap.get(e.getChannel().getId());
398                 sw.stop();
399             }
400
401             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.EXCEPTION_CAUGHT);
402         }
403
404         @Override
405         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
406                 throws Exception {
407             Integer messageChannelId = e.getChannel().getId();
408             IEnhancedSwitch swHan = (EnhancedSwitchHandler)channelIDToSwitchMap.get(messageChannelId);
409
410             if (e.getMessage() instanceof List) {
411                 //@SuppressWarnings("unchecked")
412                 List<OFMessage> msglist = (List<OFMessage>)e.getMessage();
413                 if (msglist != null){ // this check actually brought down rate to some extent - weird !!!
414                     for (OFMessage ofm : msglist) {
415                         try {
416
417                             // Do the actual packet processing
418                             processOFMessage(ofm, messageChannelId);
419                         }
420                         catch (Exception ex) {
421                             // We are the last handler in the stream, so run the
422                             // exception through the channel again by passing in
423                             // ctx.getChannel().
424                             Channels.fireExceptionCaught(ctx.getChannel(), ex);
425                         }
426                     }
427                 }
428             }
429
430             // Flush all flow-mods/packet-out/stats generated from this "train"
431             swHan.flushBufferedMessages();
432
433             statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.MESSAGE_RECEIVED);
434
435
436         }
437
438
439         public void processOFMessage(OFMessage ofm, Integer channelID){
440             IEnhancedSwitch switchHandler = (IEnhancedSwitch) channelIDToSwitchMap.get(channelID);
441             statsHandler.countForEntitySimpleMeasurement(channelID, TrafficStatisticsHandler.ENTITY_COUNTER_RCV_MSG);
442             if (switchHandler != null){
443                 switchHandler.handleMessage(ofm);
444             }
445         }
446
447
448     }
449
450
451     protected class ControllerServerBootstrap{
452
453         private int workerThreads = 0;
454         private EnhancedController controller = null;
455
456         public ControllerServerBootstrap(EnhancedController controller){
457             this.controller = controller;
458         }
459
460
461         public void startServer(int numWorkerThreads, String openFlowHost, int openFlowPort, OFChannelHandler ofchan){
462             this.workerThreads = numWorkerThreads;
463             try {
464                 final ServerBootstrap bootstrap = createServerBootStrap();
465
466                  bootstrap.setOption("reuseAddr", true);
467                  bootstrap.setOption("child.keepAlive", true);
468                  bootstrap.setOption("child.tcpNoDelay", true);
469                  bootstrap.setOption("child.receiveBufferSize", EnhancedController.RECEIVE_BUFFER_SIZE);
470                  bootstrap.setOption("child.sendBufferSize", EnhancedController.SEND_BUFFER_SIZE);
471
472                  // better to have an receive buffer predictor
473                  //bootstrap.setOption("receiveBufferSizePredictorFactory",
474                  //      new AdaptiveReceiveBufferSizePredictorFactory());
475                  //if the server is sending 1000 messages per sec, optimum write buffer water marks will
476                  //prevent unnecessary throttling, Check NioSocketChannelConfig doc
477                  //bootstrap.setOption("writeBufferLowWaterMark", WRITE_BUFFER_LOW_WATERMARK);
478                  //bootstrap.setOption("writeBufferHighWaterMark", WRITE_BUFFER_HIGH_WATERMARK);
479
480                  // TODO: IMPORTANT: If the threadpool is supplied as null, ExecutionHandler would
481                  // not be present in pipeline. If the load increases and ordering is required ,
482                  // use OrderedMemoryAwareThreadPoolExecutor as argument instead of null
483
484                  /*
485                  execHandler = new OrderedMemoryAwareThreadPoolExecutor(
486                                  OMATPE_CORE_POOL_SIZE,
487                                  OMATPE_PER_CHANNEL_SIZE,
488                                  OMATPE_POOL_WIDE_SIZE,
489                                  OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
490                                  TimeUnit.MILLISECONDS,
491                                  new ObjectSizeEstimator() {
492
493                                     @Override
494                                     public int estimateSize(Object o) {
495                                         return 30000;
496                                     }
497                                 },
498                                 Executors.defaultThreadFactory());     */
499
500                  execHandler = new OrderedMemoryAwareThreadPoolExecutor(
501                          OMATPE_CORE_POOL_SIZE,
502                          OMATPE_PER_CHANNEL_SIZE,
503                          OMATPE_POOL_WIDE_SIZE,
504                          OMATPE_THREAD_KEEP_ALIVE_IN_MILLISECONDS,
505                          TimeUnit.MILLISECONDS);
506
507
508
509                  ChannelPipelineFactory pfact =
510                          new OpenflowPipelineFactory(controller, execHandler);
511                  bootstrap.setPipelineFactory(pfact);
512                  InetSocketAddress sa =
513                          (openFlowHost == null)
514                          ? new InetSocketAddress(openFlowPort)
515                          : new InetSocketAddress(openFlowHost, openFlowPort);
516                  final ChannelGroup cg = new DefaultChannelGroup();
517                  cg.add(bootstrap.bind(sa));
518
519
520              } catch (Exception e) {
521                  throw new RuntimeException(e);
522              }
523
524         }
525
526         private ServerBootstrap createServerBootStrap() {
527             if (workerThreads == 0) {
528                 return new ServerBootstrap(
529                         new NioServerSocketChannelFactory(
530                                 Executors.newCachedThreadPool(),
531                                 Executors.newCachedThreadPool()));
532             } else {
533                 return new ServerBootstrap(
534                         new NioServerSocketChannelFactory(
535                                 Executors.newCachedThreadPool(),
536                                 Executors.newCachedThreadPool(), workerThreads));
537             }
538         }
539
540
541
542     }
543
544
545     /**
546      * Method called by SwitchHandler once the handshake state is completed
547      *
548      * @param sw
549      */
550     public void switchAdded(SwitchEvent switchEv, Integer switchChannelID){
551
552         ISwitch sw = switchEv.getSwitch();
553         Long switchId = sw.getId();
554
555         connectedSwitches.put(switchId, sw);
556         statsHandler.countForSimpleMeasurement(TrafficStatisticsHandler.CONNECTED_SWITCHES);
557
558         logger.info("Switch with DPID : {} connected ", switchId);
559
560         notifySwitchAdded(sw);
561     }
562
563
564     /**
565      * Method called by SwitchHandler switch is disconnected
566      *
567      * @param sw
568      */
569
570     public void switchDeleted(SwitchEvent switchEv, Integer switchChannelID){
571         ISwitch sw = switchEv.getSwitch();
572         disconnectSwitch(sw, switchChannelID);
573     }
574
575
576     /**
577      * Method called by SwitchHandler when it encounters any errors
578      *
579      *
580      * @param sw
581      */
582
583     public void switchError(SwitchEvent switchEv, Integer switchChannelID){
584
585     }
586
587
588     public void switchMessage(SwitchEvent switchEv, Integer switchChannelID){
589         long startTime = 0L;
590         long endTime = 0L;
591
592
593         OFMessage msg = switchEv.getMsg();
594         ISwitch sw = switchEv.getSwitch();
595         if (msg != null) {
596             //try{
597             //    lock.lock();
598                 IMessageListener listener = messageListeners
599                         .get(msg.getType());
600                 if (listener != null) {
601                     //logger.debug("delegating to msg-receiver");
602                     //startTime = System.nanoTime();
603                     listener.receive(sw, msg);
604                     //endTime = System.nanoTime();
605                     //this.statsHandler.reportPacketInProcessingTime(endTime - startTime);
606                 }
607             //}
608             //finally{
609             //    lock.unlock();
610             //}
611         }
612     }
613
614     public void disconnectSwitch(ISwitch sw, Integer switchChannelID){
615         Long sid = null;
616         if (((EnhancedSwitchHandler) sw).isOperational()) {
617             sid = sw.getId();
618
619             this.connectedSwitches.remove(sid);
620             this.channelIDToSwitchMap.remove(switchChannelID);
621             notifySwitchDeleted(sw);
622         }
623         //((EnhancedSwitchHandler) sw).stop();
624         logger.info("Switch with DPID {} disconnected", sid);
625         sw = null;
626     }
627
628
629     private void notifySwitchAdded(ISwitch sw) {
630         if (switchStateListener != null) {
631             switchStateListener.switchAdded(sw);
632         }
633     }
634
635     private void notifySwitchDeleted(ISwitch sw) {
636         if (switchStateListener != null) {
637             switchStateListener.switchDeleted(sw);
638         }
639     }
640
641     @Override
642     public Status disconnect(Node node) {
643         ISwitch sw = getSwitch((Long)node.getID());
644         if (sw != null) {
645             if (sw instanceof EnhancedSwitchHandler) {
646                 EnhancedSwitchHandler eSw = (EnhancedSwitchHandler)sw;
647                 disconnectSwitch(sw, eSw.getSwitchChannelID());
648             }
649         }
650         return new Status(StatusCode.SUCCESS);
651     }
652
653     @Override
654     public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
655         return null;
656     }
657
658     /**
659      * View Change notification
660      */
661     public void notifyClusterViewChanged() {
662         for (ISwitch sw : connectedSwitches.values()) {
663             notifySwitchAdded(sw);
664         }
665     }
666
667     /**
668      * Node Disconnected from the node's master controller.
669      */
670     @Override
671     public void notifyNodeDisconnectFromMaster(Node node) {
672         ISwitch sw = connectedSwitches.get((Long)node.getID());
673         if (sw != null) notifySwitchAdded(sw);
674     }
675 }