Move adsal into its own subdirectory.
[controller.git] / opendaylight / adsal / protocol_plugins / openflow / src / main / java / org / opendaylight / controller / protocol_plugin / openflow / core / internal / Controller.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
10
11 import java.io.IOException;
12 import java.nio.channels.SelectionKey;
13 import java.nio.channels.Selector;
14 import java.nio.channels.ServerSocketChannel;
15 import java.nio.channels.SocketChannel;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.Iterator;
19 import java.util.Map;
20 import java.util.Map.Entry;
21 import java.util.Set;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.PriorityBlockingQueue;
25 import java.util.concurrent.atomic.AtomicInteger;
26
27 import org.eclipse.osgi.framework.console.CommandInterpreter;
28 import org.eclipse.osgi.framework.console.CommandProvider;
29 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
30 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
31 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
32 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
33 import org.opendaylight.controller.sal.connection.ConnectionConstants;
34 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
35 import org.opendaylight.controller.sal.core.Node;
36 import org.opendaylight.controller.sal.utils.Status;
37 import org.opendaylight.controller.sal.utils.StatusCode;
38 import org.openflow.protocol.OFMessage;
39 import org.openflow.protocol.OFType;
40 import org.openflow.util.HexString;
41 import org.osgi.framework.BundleContext;
42 import org.osgi.framework.FrameworkUtil;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 public class Controller implements IController, CommandProvider, IPluginInConnectionService {
47     private static final Logger logger = LoggerFactory
48             .getLogger(Controller.class);
49     private ControllerIO controllerIO;
50     private Thread switchEventThread;
51     private volatile boolean shutdownSwitchEventThread;// default to false
52     private ConcurrentHashMap<Long, ISwitch> switches;
53     private PriorityBlockingQueue<SwitchEvent> switchEvents;
54     // only 1 message listener per OFType
55     private ConcurrentMap<OFType, IMessageListener> messageListeners;
56     // only 1 switch state listener
57     private ISwitchStateListener switchStateListener;
58     private AtomicInteger switchInstanceNumber;
59     private int MAXQUEUESIZE = 50000;
60
61     private static enum SwitchEventPriority { LOW, NORMAL, HIGH }
62
63     /*
64      * this thread monitors the switchEvents queue for new incoming events from
65      * switch
66      */
67     private class EventHandler implements Runnable {
68         @Override
69         public void run() {
70
71             while (true) {
72                 try {
73                     if(shutdownSwitchEventThread) {
74                         // break out of the infinite loop
75                         // if you are shutting down
76                         logger.info("Switch Event Thread is shutting down");
77                         break;
78                     }
79                     SwitchEvent ev = switchEvents.take();
80                     SwitchEvent.SwitchEventType eType = ev.getEventType();
81                     ISwitch sw = ev.getSwitch();
82                     switch (eType) {
83                     case SWITCH_ADD:
84                         Long sid = sw.getId();
85                         ISwitch existingSwitch = switches.get(sid);
86                         if (existingSwitch != null) {
87                             logger.info("Replacing existing {} with New {}",
88                                     existingSwitch, sw);
89                             disconnectSwitch(existingSwitch);
90                         }
91                         switches.put(sid, sw);
92                         notifySwitchAdded(sw);
93                         break;
94                     case SWITCH_DELETE:
95                         disconnectSwitch(sw);
96                         break;
97                     case SWITCH_ERROR:
98                         disconnectSwitch(sw);
99                         break;
100                     case SWITCH_MESSAGE:
101                         OFMessage msg = ev.getMsg();
102                         if (msg != null) {
103                             IMessageListener listener = messageListeners
104                                     .get(msg.getType());
105                             if (listener != null) {
106                                 listener.receive(sw, msg);
107                             }
108                         }
109                         break;
110                     default:
111                         logger.error("Unknown switch event {}", eType.ordinal());
112                     }
113                 } catch (InterruptedException e) {
114                     // nothing to do except retry
115                 } catch (Exception e) {
116                     // log the exception and retry
117                     logger.warn("Exception in Switch Event Thread is {}" ,e);
118                 }
119             }
120             switchEvents.clear();
121         }
122     }
123
124     /**
125      * Function called by the dependency manager when all the required
126      * dependencies are satisfied
127      *
128      */
129     public void init() {
130         logger.debug("Initializing!");
131         this.switches = new ConcurrentHashMap<Long, ISwitch>();
132         this.switchEvents = new PriorityBlockingQueue<SwitchEvent>(MAXQUEUESIZE, new Comparator<SwitchEvent>() {
133             @Override
134             public int compare(SwitchEvent p1, SwitchEvent p2) {
135                 return p2.getPriority() - p1.getPriority();
136             }
137         });
138         this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
139         this.switchStateListener = null;
140         this.switchInstanceNumber = new AtomicInteger(0);
141         registerWithOSGIConsole();
142     }
143
144     /**
145      * Function called by dependency manager after "init ()" is called and after
146      * the services provided by the class are registered in the service registry
147      *
148      */
149     public void start() {
150         logger.debug("Starting!");
151         /*
152          * start a thread to handle event coming from the switch
153          */
154         switchEventThread = new Thread(new EventHandler(), "SwitchEvent Thread");
155         switchEventThread.start();
156
157         // spawn a thread to start to listen on the open flow port
158         controllerIO = new ControllerIO(this);
159         try {
160             controllerIO.start();
161         } catch (IOException ex) {
162             logger.error("Caught exception while starting:", ex);
163         }
164     }
165
166     /**
167      * Function called by the dependency manager before the services exported by
168      * the component are unregistered, this will be followed by a "destroy ()"
169      * calls
170      *
171      */
172     public void stop() {
173         for (Iterator<Entry<Long, ISwitch>> it = switches.entrySet().iterator(); it
174                 .hasNext();) {
175             Entry<Long, ISwitch> entry = it.next();
176             ((SwitchHandler) entry.getValue()).stop();
177             it.remove();
178         }
179         shutdownSwitchEventThread = true;
180         switchEventThread.interrupt();
181         try {
182             controllerIO.shutDown();
183         } catch (IOException ex) {
184             logger.error("Caught exception while stopping:", ex);
185         }
186     }
187
188     /**
189      * Function called by the dependency manager when at least one dependency
190      * become unsatisfied or when the component is shutting down because for
191      * example bundle is being stopped.
192      *
193      */
194     public void destroy() {
195     }
196
197     @Override
198     public void addMessageListener(OFType type, IMessageListener listener) {
199         IMessageListener currentListener = this.messageListeners.get(type);
200         if (currentListener != null) {
201             logger.warn("{} is already listened by {}", type,
202                     currentListener);
203         }
204         this.messageListeners.put(type, listener);
205         logger.debug("{} is now listened by {}", type, listener);
206     }
207
208     @Override
209     public void removeMessageListener(OFType type, IMessageListener listener) {
210         IMessageListener currentListener = this.messageListeners.get(type);
211         if ((currentListener != null) && (currentListener == listener)) {
212             logger.debug("{} listener {} is Removed", type, listener);
213             this.messageListeners.remove(type);
214         }
215     }
216
217     @Override
218     public void addSwitchStateListener(ISwitchStateListener listener) {
219         if (this.switchStateListener != null) {
220             logger.warn("Switch events are already listened by {}",
221                     this.switchStateListener);
222         }
223         this.switchStateListener = listener;
224         logger.debug("Switch events are now listened by {}", listener);
225     }
226
227     @Override
228     public void removeSwitchStateListener(ISwitchStateListener listener) {
229         if ((this.switchStateListener != null)
230                 && (this.switchStateListener == listener)) {
231             logger.debug("SwitchStateListener {} is Removed", listener);
232             this.switchStateListener = null;
233         }
234     }
235
236     public void handleNewConnection(Selector selector,
237             SelectionKey serverSelectionKey) {
238         ServerSocketChannel ssc = (ServerSocketChannel) serverSelectionKey
239                 .channel();
240         SocketChannel sc = null;
241         try {
242             sc = ssc.accept();
243             // create new switch
244             int i = this.switchInstanceNumber.addAndGet(1);
245             String instanceName = "SwitchHandler-" + i;
246             SwitchHandler switchHandler = new SwitchHandler(this, sc, instanceName);
247             switchHandler.start();
248             if (sc.isConnected()) {
249                 logger.info("Switch:{} is connected to the Controller",
250                         sc.socket().getRemoteSocketAddress()
251                         .toString().split("/")[1]);
252             }
253
254         } catch (IOException e) {
255             return;
256         }
257     }
258
259     private void disconnectSwitch(ISwitch sw) {
260         if (((SwitchHandler) sw).isOperational()) {
261             Long sid = sw.getId();
262             if (this.switches.remove(sid, sw)) {
263                 logger.info("{} is removed", sw);
264                 notifySwitchDeleted(sw);
265             }
266         }
267         ((SwitchHandler) sw).stop();
268         sw = null;
269     }
270
271     private void notifySwitchAdded(ISwitch sw) {
272         if (switchStateListener != null) {
273             switchStateListener.switchAdded(sw);
274         }
275     }
276
277     private void notifySwitchDeleted(ISwitch sw) {
278         if (switchStateListener != null) {
279             switchStateListener.switchDeleted(sw);
280         }
281     }
282
283     private synchronized void addSwitchEvent(SwitchEvent event) {
284         this.switchEvents.put(event);
285     }
286
287     public void takeSwitchEventAdd(ISwitch sw) {
288         SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null,
289                 SwitchEventPriority.HIGH.ordinal());
290         addSwitchEvent(ev);
291     }
292
293     public void takeSwitchEventDelete(ISwitch sw) {
294         SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null,
295                 SwitchEventPriority.HIGH.ordinal());
296         addSwitchEvent(ev);
297     }
298
299     public void takeSwitchEventError(ISwitch sw) {
300         SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null,
301                 SwitchEventPriority.NORMAL.ordinal());
302         addSwitchEvent(ev);
303     }
304
305     public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) {
306         if (messageListeners.get(msg.getType()) != null) {
307             SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg,
308                     SwitchEventPriority.LOW.ordinal());
309             addSwitchEvent(ev);
310         }
311     }
312
313     @Override
314     public Map<Long, ISwitch> getSwitches() {
315         return this.switches;
316     }
317
318     @Override
319     public ISwitch getSwitch(Long switchId) {
320         return this.switches.get(switchId);
321     }
322
323     public void _controllerShowQueueSize(CommandInterpreter ci) {
324         ci.print("switchEvents queue size: " + switchEvents.size() + "\n");
325     }
326
327     public void _controllerShowSwitches(CommandInterpreter ci) {
328         Set<Long> sids = switches.keySet();
329         StringBuffer s = new StringBuffer();
330         int size = sids.size();
331         if (size == 0) {
332             ci.print("switches: empty");
333             return;
334         }
335         Iterator<Long> iter = sids.iterator();
336         s.append("Total: " + size + " switches\n");
337         while (iter.hasNext()) {
338             Long sid = iter.next();
339             Date date = switches.get(sid).getConnectedDate();
340             String switchInstanceName = ((SwitchHandler) switches.get(sid))
341                     .getInstanceName();
342             s.append(switchInstanceName + "/" + HexString.toHexString(sid)
343                     + " connected since " + date.toString() + "\n");
344         }
345         ci.print(s.toString());
346         return;
347     }
348
349     public void _controllerReset(CommandInterpreter ci) {
350         ci.print("...Disconnecting the communication to all switches...\n");
351         stop();
352         try {
353             Thread.sleep(1000);
354         } catch (InterruptedException ie) {
355         } finally {
356             ci.print("...start to accept connections from switches...\n");
357             start();
358         }
359     }
360
361     public void _controllerShowConnConfig(CommandInterpreter ci) {
362         String str = System.getProperty("secureChannelEnabled");
363         if ((str != null) && (str.trim().equalsIgnoreCase("true"))) {
364             ci.print("The Controller and Switch should communicate through TLS connetion.\n");
365
366             String keyStoreFile = System.getProperty("controllerKeyStore");
367             String trustStoreFile = System.getProperty("controllerTrustStore");
368             if ((keyStoreFile == null) || keyStoreFile.trim().isEmpty()) {
369                 ci.print("controllerKeyStore not specified\n");
370             } else {
371                 ci.print("controllerKeyStore=" + keyStoreFile + "\n");
372             }
373             if ((trustStoreFile == null) || trustStoreFile.trim().isEmpty()) {
374                 ci.print("controllerTrustStore not specified\n");
375             } else {
376                 ci.print("controllerTrustStore=" + trustStoreFile + "\n");
377             }
378         } else {
379             ci.print("The Controller and Switch should communicate through TCP connetion.\n");
380         }
381     }
382
383     private void registerWithOSGIConsole() {
384         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
385                 .getBundleContext();
386         bundleContext.registerService(CommandProvider.class.getName(), this,
387                 null);
388     }
389
390     @Override
391     public String getHelp() {
392         StringBuffer help = new StringBuffer();
393         help.append("---Open Flow Controller---\n");
394         help.append("\t controllerShowSwitches\n");
395         help.append("\t controllerReset\n");
396         help.append("\t controllerShowConnConfig\n");
397         help.append("\t controllerShowQueueSize\n");
398         return help.toString();
399     }
400
401     @Override
402     public Status disconnect(Node node) {
403         ISwitch sw = getSwitch((Long) node.getID());
404         if (sw != null) disconnectSwitch(sw);
405         return new Status(StatusCode.SUCCESS);
406     }
407
408     @Override
409     public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
410         return null;
411     }
412
413     /**
414      * View Change notification
415      */
416     public void notifyClusterViewChanged() {
417         for (ISwitch sw : switches.values()) {
418             notifySwitchAdded(sw);
419         }
420     }
421
422     /**
423      * Node Disconnected from the node's master controller.
424      */
425     @Override
426     public void notifyNodeDisconnectFromMaster(Node node) {
427         ISwitch sw = switches.get((Long)node.getID());
428         if (sw != null) notifySwitchAdded(sw);
429     }
430 }