3 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
5 * This program and the accompanying materials are made available under the
6 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7 * and is available at http://www.eclipse.org/legal/epl-v10.html
11 * Connection Manager provides south-bound connectivity services.
12 * The APIs are currently focused towards Active-Active Clustering support
13 * wherein the node can connect to any of the Active Controller in the Cluster.
14 * This component can also host the necessary logic for south-bound connectivity
15 * when partial cluster is identified during Partition scenarios.
17 * But this (and its corresponding implementation) component can also be used for
18 * basic connectivity mechansims for various south-bound plugins.
21 package org.opendaylight.controller.connectionmanager.internal;
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.HashSet;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.LinkedBlockingQueue;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
36 import org.eclipse.osgi.framework.console.CommandInterpreter;
37 import org.eclipse.osgi.framework.console.CommandProvider;
38 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
39 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
40 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
41 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
42 import org.opendaylight.controller.connectionmanager.IConnectionManager;
43 import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
44 import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
45 import org.opendaylight.controller.sal.connection.ConnectionConstants;
46 import org.opendaylight.controller.sal.connection.IConnectionListener;
47 import org.opendaylight.controller.sal.connection.IConnectionService;
48 import org.opendaylight.controller.sal.core.Node;
49 import org.opendaylight.controller.sal.core.NodeConnector;
50 import org.opendaylight.controller.sal.core.Property;
51 import org.opendaylight.controller.sal.core.UpdateType;
52 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
53 import org.opendaylight.controller.sal.utils.Status;
54 import org.opendaylight.controller.sal.utils.StatusCode;
55 import org.osgi.framework.BundleContext;
56 import org.osgi.framework.FrameworkUtil;
58 public class ConnectionManager implements IConnectionManager, IConnectionListener,
59 ICoordinatorChangeAware, IListenInventoryUpdates,
60 ICacheUpdateAware<Node, Set<InetAddress>>,
62 private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
63 private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
64 private IClusterGlobalServices clusterServices;
65 private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
66 private IConnectionService connectionService;
67 private Thread connectionEventThread;
68 private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
70 public void setClusterServices(IClusterGlobalServices i) {
71 this.clusterServices = i;
74 public void unsetClusterServices(IClusterGlobalServices i) {
75 if (this.clusterServices == i) {
76 this.clusterServices = null;
80 public void setConnectionService(IConnectionService i) {
81 this.connectionService = i;
84 public void unsetConnectionService(IConnectionService i) {
85 if (this.connectionService == i) {
86 this.connectionService = null;
90 public void started() {
91 connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
92 connectionEventThread.start();
94 registerWithOSGIConsole();
95 notifyClusterViewChanged();
99 this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
100 schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
101 for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
102 AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
103 if (schemeImpl != null) schemes.put(scheme, schemeImpl);
108 connectionEventThread.interrupt();
109 Set<Node> localNodes = getLocalNodes();
110 if (localNodes != null) {
111 AbstractScheme scheme = schemes.get(activeScheme);
112 for (Node localNode : localNodes) {
113 connectionService.disconnect(localNode);
114 if (scheme != null) scheme.removeNode(localNode);
120 public ConnectionMgmtScheme getActiveScheme() {
125 public Set<Node> getNodes(InetAddress controller) {
126 AbstractScheme scheme = schemes.get(activeScheme);
127 if (scheme == null) return null;
128 return scheme.getNodes(controller);
132 public Set<Node> getLocalNodes() {
133 AbstractScheme scheme = schemes.get(activeScheme);
134 if (scheme == null) return null;
135 return scheme.getNodes();
139 public boolean isLocal(Node node) {
140 AbstractScheme scheme = schemes.get(activeScheme);
141 if (scheme == null) return false;
142 return scheme.isLocal(node);
146 public void updateNode(Node node, UpdateType type, Set<Property> props) {
147 logger.debug("updateNode: {} type {} props {}", node, type, props);
148 AbstractScheme scheme = schemes.get(activeScheme);
149 if (scheme == null) return;
152 scheme.addNode(node);
155 scheme.removeNode(node);
163 public void updateNodeConnector(NodeConnector nodeConnector,
164 UpdateType type, Set<Property> props) {
165 logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props);
166 AbstractScheme scheme = schemes.get(activeScheme);
167 if (scheme == null) return;
170 scheme.addNode(nodeConnector.getNode());
178 public void coordinatorChanged() {
179 AbstractScheme scheme = schemes.get(activeScheme);
180 if (scheme == null) return;
181 scheme.handleClusterViewChanged();
182 notifyClusterViewChanged();
186 public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
187 if (connectionService == null) return null;
188 return connectionService.connect(connectionIdentifier, params);
192 public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
193 if (connectionService == null) return null;
194 return connectionService.connect(type, connectionIdentifier, params);
198 public Status disconnect (Node node) {
199 if (connectionService == null) return new Status(StatusCode.NOSERVICE);
200 return connectionService.disconnect(node);
204 public void entryCreated(Node key, String cacheName, boolean originLocal) {
205 AbstractScheme scheme = schemes.get(activeScheme);
206 logger.debug("Created : {} cache : {} existingValue : {}", key, cacheName, scheme.getNodeConnections().get(key));
210 * Clustering Services' doesnt provide the existing states in the cache update callbacks.
211 * Hence, using a scratch local cache to maintain the existing state.
214 private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
217 public void entryUpdated(Node node, Set<InetAddress> newControllers, String cacheName, boolean originLocal) {
218 if (originLocal) return;
219 Set<InetAddress> existingControllers = existingConnections.get(node);
220 if (existingControllers != null) {
221 logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node,
222 newControllers.toString(), existingControllers.toString());
223 if (newControllers.size() < existingControllers.size()) {
224 Set<InetAddress> removed = new HashSet<InetAddress>(existingControllers);
225 if (removed.removeAll(newControllers)) {
226 logger.debug("notifyNodeDisconnectFromMaster({})", node);
227 notifyNodeDisconnectedEvent(node);
231 logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString());
233 existingConnections.put(node, newControllers);
237 public void entryDeleted(Node key, String cacheName, boolean originLocal) {
238 if (originLocal) return;
239 logger.debug("Deleted : {} cache : {}", key, cacheName);
240 notifyNodeDisconnectedEvent(key);
243 private void enqueueConnectionEvent(ConnectionMgmtEvent event) {
245 if (!connectionEvents.contains(event)) {
246 this.connectionEvents.put(event);
248 } catch (InterruptedException e) {
249 logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event);
253 private void notifyClusterViewChanged() {
254 ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
255 enqueueConnectionEvent(event);
258 private void notifyNodeDisconnectedEvent(Node node) {
259 ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
260 enqueueConnectionEvent(event);
264 * this thread monitors the connectionEvent queue for new incoming events from
266 private class EventHandler implements Runnable {
272 ConnectionMgmtEvent ev = connectionEvents.take();
273 ConnectionMgmtEventType eType = ev.getEvent();
275 case NODE_DISCONNECTED_FROM_MASTER:
276 Node node = (Node)ev.getData();
277 connectionService.notifyNodeDisconnectFromMaster(node);
279 case CLUSTER_VIEW_CHANGED:
280 connectionService.notifyClusterViewChanged();
283 logger.error("Unknown Connection event {}", eType.ordinal());
285 } catch (InterruptedException e) {
286 connectionEvents.clear();
293 private void registerWithOSGIConsole() {
294 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
296 bundleContext.registerService(CommandProvider.class.getName(), this,
300 public void _scheme (CommandInterpreter ci) {
301 String schemeStr = ci.nextArgument();
302 if (schemeStr == null) {
303 ci.println("Please enter valid Scheme name");
304 ci.println("Current Scheme : " + activeScheme.name());
307 ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr);
308 if (scheme == null) {
309 ci.println("Please enter a valid Scheme name");
312 activeScheme = scheme;
315 public void _printNodes (CommandInterpreter ci) {
316 String controller = ci.nextArgument();
317 if (controller == null) {
318 ci.println("Nodes connected to this controller : ");
319 if (this.getLocalNodes() == null) ci.println("None");
320 else ci.println(this.getLocalNodes().toString());
324 InetAddress address = InetAddress.getByName(controller);
325 ci.println("Nodes connected to controller "+controller);
326 if (this.getNodes(address) == null) ci.println("None");
327 else ci.println(this.getNodes(address).toString());
329 } catch (UnknownHostException e) {
335 public String getHelp() {
336 StringBuffer help = new StringBuffer();
337 help.append("---Connection Manager---\n");
338 help.append("\t scheme [<name>] - Print / Set scheme\n");
339 help.append("\t printNodes [<controller>] - Print connected nodes\n");
340 return help.toString();