2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.protocol_plugin.openflow.core.internal;
11 import java.io.IOException;
12 import java.nio.channels.SelectionKey;
13 import java.nio.channels.Selector;
14 import java.nio.channels.ServerSocketChannel;
15 import java.nio.channels.SocketChannel;
16 import java.util.Comparator;
17 import java.util.Date;
18 import java.util.Iterator;
20 import java.util.Map.Entry;
22 import java.util.concurrent.ConcurrentHashMap;
23 import java.util.concurrent.ConcurrentMap;
24 import java.util.concurrent.PriorityBlockingQueue;
25 import java.util.concurrent.atomic.AtomicInteger;
27 import org.eclipse.osgi.framework.console.CommandInterpreter;
28 import org.eclipse.osgi.framework.console.CommandProvider;
29 import org.opendaylight.controller.protocol_plugin.openflow.core.IController;
30 import org.opendaylight.controller.protocol_plugin.openflow.core.IMessageListener;
31 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitch;
32 import org.opendaylight.controller.protocol_plugin.openflow.core.ISwitchStateListener;
33 import org.opendaylight.controller.sal.connection.ConnectionConstants;
34 import org.opendaylight.controller.sal.connection.IPluginInConnectionService;
35 import org.opendaylight.controller.sal.core.Node;
36 import org.opendaylight.controller.sal.utils.Status;
37 import org.opendaylight.controller.sal.utils.StatusCode;
38 import org.openflow.protocol.OFMessage;
39 import org.openflow.protocol.OFType;
40 import org.openflow.util.HexString;
41 import org.osgi.framework.BundleContext;
42 import org.osgi.framework.FrameworkUtil;
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
46 public class Controller implements IController, CommandProvider, IPluginInConnectionService {
47 private static final Logger logger = LoggerFactory
48 .getLogger(Controller.class);
49 private ControllerIO controllerIO;
50 private Thread switchEventThread;
51 private volatile boolean shutdownSwitchEventThread;// default to false
52 private ConcurrentHashMap<Long, ISwitch> switches;
53 private PriorityBlockingQueue<SwitchEvent> switchEvents;
54 // only 1 message listener per OFType
55 private ConcurrentMap<OFType, IMessageListener> messageListeners;
56 // only 1 switch state listener
57 private ISwitchStateListener switchStateListener;
58 private AtomicInteger switchInstanceNumber;
59 private int MAXQUEUESIZE = 50000;
61 private static enum SwitchEventPriority { LOW, NORMAL, HIGH }
64 * this thread monitors the switchEvents queue for new incoming events from
67 private class EventHandler implements Runnable {
73 if(shutdownSwitchEventThread) {
74 // break out of the infinite loop
75 // if you are shutting down
76 logger.info("Switch Event Thread is shutting down");
79 SwitchEvent ev = switchEvents.take();
80 SwitchEvent.SwitchEventType eType = ev.getEventType();
81 ISwitch sw = ev.getSwitch();
84 Long sid = sw.getId();
85 ISwitch existingSwitch = switches.get(sid);
86 if (existingSwitch != null) {
87 logger.info("Replacing existing {} with New {}",
89 disconnectSwitch(existingSwitch);
91 switches.put(sid, sw);
92 notifySwitchAdded(sw);
101 OFMessage msg = ev.getMsg();
103 IMessageListener listener = messageListeners
105 if (listener != null) {
106 listener.receive(sw, msg);
111 logger.error("Unknown switch event {}", eType.ordinal());
113 } catch (InterruptedException e) {
114 // nothing to do except retry
115 } catch (Exception e) {
116 // log the exception and retry
117 logger.warn("Exception in Switch Event Thread is {}" ,e);
120 switchEvents.clear();
125 * Function called by the dependency manager when all the required
126 * dependencies are satisfied
130 logger.debug("Initializing!");
131 this.switches = new ConcurrentHashMap<Long, ISwitch>();
132 this.switchEvents = new PriorityBlockingQueue<SwitchEvent>(MAXQUEUESIZE, new Comparator<SwitchEvent>() {
134 public int compare(SwitchEvent p1, SwitchEvent p2) {
135 return p2.getPriority() - p1.getPriority();
138 this.messageListeners = new ConcurrentHashMap<OFType, IMessageListener>();
139 this.switchStateListener = null;
140 this.switchInstanceNumber = new AtomicInteger(0);
141 registerWithOSGIConsole();
145 * Function called by dependency manager after "init ()" is called and after
146 * the services provided by the class are registered in the service registry
149 public void start() {
150 logger.debug("Starting!");
152 * start a thread to handle event coming from the switch
154 switchEventThread = new Thread(new EventHandler(), "SwitchEvent Thread");
155 switchEventThread.start();
157 // spawn a thread to start to listen on the open flow port
158 controllerIO = new ControllerIO(this);
160 controllerIO.start();
161 } catch (IOException ex) {
162 logger.error("Caught exception while starting:", ex);
167 * Function called by the dependency manager before the services exported by
168 * the component are unregistered, this will be followed by a "destroy ()"
173 for (Iterator<Entry<Long, ISwitch>> it = switches.entrySet().iterator(); it
175 Entry<Long, ISwitch> entry = it.next();
176 ((SwitchHandler) entry.getValue()).stop();
179 shutdownSwitchEventThread = true;
180 switchEventThread.interrupt();
182 controllerIO.shutDown();
183 } catch (IOException ex) {
184 logger.error("Caught exception while stopping:", ex);
189 * Function called by the dependency manager when at least one dependency
190 * become unsatisfied or when the component is shutting down because for
191 * example bundle is being stopped.
194 public void destroy() {
198 public void addMessageListener(OFType type, IMessageListener listener) {
199 IMessageListener currentListener = this.messageListeners.get(type);
200 if (currentListener != null) {
201 logger.warn("{} is already listened by {}", type,
204 this.messageListeners.put(type, listener);
205 logger.debug("{} is now listened by {}", type, listener);
209 public void removeMessageListener(OFType type, IMessageListener listener) {
210 IMessageListener currentListener = this.messageListeners.get(type);
211 if ((currentListener != null) && (currentListener == listener)) {
212 logger.debug("{} listener {} is Removed", type, listener);
213 this.messageListeners.remove(type);
218 public void addSwitchStateListener(ISwitchStateListener listener) {
219 if (this.switchStateListener != null) {
220 logger.warn("Switch events are already listened by {}",
221 this.switchStateListener);
223 this.switchStateListener = listener;
224 logger.debug("Switch events are now listened by {}", listener);
228 public void removeSwitchStateListener(ISwitchStateListener listener) {
229 if ((this.switchStateListener != null)
230 && (this.switchStateListener == listener)) {
231 logger.debug("SwitchStateListener {} is Removed", listener);
232 this.switchStateListener = null;
236 public void handleNewConnection(Selector selector,
237 SelectionKey serverSelectionKey) {
238 ServerSocketChannel ssc = (ServerSocketChannel) serverSelectionKey
240 SocketChannel sc = null;
244 int i = this.switchInstanceNumber.addAndGet(1);
245 String instanceName = "SwitchHandler-" + i;
246 SwitchHandler switchHandler = new SwitchHandler(this, sc, instanceName);
247 switchHandler.start();
248 if (sc.isConnected()) {
249 logger.info("Switch:{} is connected to the Controller",
250 sc.socket().getRemoteSocketAddress()
251 .toString().split("/")[1]);
254 } catch (IOException e) {
259 private void disconnectSwitch(ISwitch sw) {
260 if (((SwitchHandler) sw).isOperational()) {
261 Long sid = sw.getId();
262 if (this.switches.remove(sid, sw)) {
263 logger.info("{} is removed", sw);
264 notifySwitchDeleted(sw);
267 ((SwitchHandler) sw).stop();
271 private void notifySwitchAdded(ISwitch sw) {
272 if (switchStateListener != null) {
273 switchStateListener.switchAdded(sw);
277 private void notifySwitchDeleted(ISwitch sw) {
278 if (switchStateListener != null) {
279 switchStateListener.switchDeleted(sw);
283 private synchronized void addSwitchEvent(SwitchEvent event) {
284 this.switchEvents.put(event);
287 public void takeSwitchEventAdd(ISwitch sw) {
288 SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ADD, sw, null,
289 SwitchEventPriority.HIGH.ordinal());
293 public void takeSwitchEventDelete(ISwitch sw) {
294 SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_DELETE, sw, null,
295 SwitchEventPriority.HIGH.ordinal());
299 public void takeSwitchEventError(ISwitch sw) {
300 SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_ERROR, sw, null,
301 SwitchEventPriority.NORMAL.ordinal());
305 public void takeSwitchEventMsg(ISwitch sw, OFMessage msg) {
306 if (messageListeners.get(msg.getType()) != null) {
307 SwitchEvent ev = new SwitchEvent(SwitchEvent.SwitchEventType.SWITCH_MESSAGE, sw, msg,
308 SwitchEventPriority.LOW.ordinal());
314 public Map<Long, ISwitch> getSwitches() {
315 return this.switches;
319 public ISwitch getSwitch(Long switchId) {
320 return this.switches.get(switchId);
323 public void _controllerShowQueueSize(CommandInterpreter ci) {
324 ci.print("switchEvents queue size: " + switchEvents.size() + "\n");
327 public void _controllerShowSwitches(CommandInterpreter ci) {
328 Set<Long> sids = switches.keySet();
329 StringBuffer s = new StringBuffer();
330 int size = sids.size();
332 ci.print("switches: empty");
335 Iterator<Long> iter = sids.iterator();
336 s.append("Total: " + size + " switches\n");
337 while (iter.hasNext()) {
338 Long sid = iter.next();
339 Date date = switches.get(sid).getConnectedDate();
340 String switchInstanceName = ((SwitchHandler) switches.get(sid))
342 s.append(switchInstanceName + "/" + HexString.toHexString(sid)
343 + " connected since " + date.toString() + "\n");
345 ci.print(s.toString());
349 public void _controllerReset(CommandInterpreter ci) {
350 ci.print("...Disconnecting the communication to all switches...\n");
354 } catch (InterruptedException ie) {
356 ci.print("...start to accept connections from switches...\n");
361 public void _controllerShowConnConfig(CommandInterpreter ci) {
362 String str = System.getProperty("secureChannelEnabled");
363 if ((str != null) && (str.trim().equalsIgnoreCase("true"))) {
364 ci.print("The Controller and Switch should communicate through TLS connetion.\n");
366 String keyStoreFile = System.getProperty("controllerKeyStore");
367 String trustStoreFile = System.getProperty("controllerTrustStore");
368 if ((keyStoreFile == null) || keyStoreFile.trim().isEmpty()) {
369 ci.print("controllerKeyStore not specified\n");
371 ci.print("controllerKeyStore=" + keyStoreFile + "\n");
373 if ((trustStoreFile == null) || trustStoreFile.trim().isEmpty()) {
374 ci.print("controllerTrustStore not specified\n");
376 ci.print("controllerTrustStore=" + trustStoreFile + "\n");
379 ci.print("The Controller and Switch should communicate through TCP connetion.\n");
383 private void registerWithOSGIConsole() {
384 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
386 bundleContext.registerService(CommandProvider.class.getName(), this,
391 public String getHelp() {
392 StringBuffer help = new StringBuffer();
393 help.append("---Open Flow Controller---\n");
394 help.append("\t controllerShowSwitches\n");
395 help.append("\t controllerReset\n");
396 help.append("\t controllerShowConnConfig\n");
397 help.append("\t controllerShowQueueSize\n");
398 return help.toString();
402 public Status disconnect(Node node) {
403 ISwitch sw = getSwitch((Long) node.getID());
404 if (sw != null) disconnectSwitch(sw);
405 return new Status(StatusCode.SUCCESS);
409 public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
414 * View Change notification
416 public void notifyClusterViewChanged() {
417 for (ISwitch sw : switches.values()) {
418 notifySwitchAdded(sw);
423 * Node Disconnected from the node's master controller.
426 public void notifyNodeDisconnectFromMaster(Node node) {
427 ISwitch sw = switches.get((Long)node.getID());
428 if (sw != null) notifySwitchAdded(sw);