2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
10 * Connection Manager provides south-bound connectivity services.
11 * The APIs are currently focused towards Active-Active Clustering support
12 * wherein the node can connect to any of the Active Controller in the Cluster.
13 * This component can also host the necessary logic for south-bound connectivity
14 * when partial cluster is identified during Partition scenarios.
16 * But this (and its corresponding implementation) component can also be used for
17 * basic connectivity mechansims for various south-bound plugins.
20 package org.opendaylight.controller.connectionmanager.internal;
22 import java.net.InetAddress;
23 import java.net.UnknownHostException;
24 import java.util.Collections;
25 import java.util.HashSet;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.LinkedBlockingQueue;
33 import org.eclipse.osgi.framework.console.CommandInterpreter;
34 import org.eclipse.osgi.framework.console.CommandProvider;
35 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
36 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
37 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
38 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
39 import org.opendaylight.controller.connectionmanager.IConnectionManager;
40 import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
41 import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
42 import org.opendaylight.controller.sal.connection.ConnectionConstants;
43 import org.opendaylight.controller.sal.connection.ConnectionLocality;
44 import org.opendaylight.controller.sal.connection.IConnectionListener;
45 import org.opendaylight.controller.sal.connection.IConnectionService;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.NodeConnector;
48 import org.opendaylight.controller.sal.core.Property;
49 import org.opendaylight.controller.sal.core.UpdateType;
50 import org.opendaylight.controller.sal.inventory.IInventoryService;
51 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
52 import org.opendaylight.controller.sal.utils.Status;
53 import org.opendaylight.controller.sal.utils.StatusCode;
54 import org.osgi.framework.BundleContext;
55 import org.osgi.framework.FrameworkUtil;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
59 public class ConnectionManager implements IConnectionManager,
60 IConnectionListener, ICoordinatorChangeAware, IListenInventoryUpdates,
61 ICacheUpdateAware<Node, Set<InetAddress>>, CommandProvider {
62 private static final Logger logger = LoggerFactory
63 .getLogger(ConnectionManager.class);
64 private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
65 private IClusterGlobalServices clusterServices;
66 private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
67 private IConnectionService connectionService;
68 private Thread connectionEventThread;
69 private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
70 private IInventoryService inventoryService;
72 public void setClusterServices(IClusterGlobalServices i) {
73 this.clusterServices = i;
76 public void unsetClusterServices(IClusterGlobalServices i) {
77 if (this.clusterServices == i) {
78 this.clusterServices = null;
82 public void setConnectionService(IConnectionService i) {
83 this.connectionService = i;
86 public void unsetConnectionService(IConnectionService i) {
87 if (this.connectionService == i) {
88 this.connectionService = null;
92 public void setInventoryService(IInventoryService service) {
93 logger.trace("Got inventory service set request {}", service);
94 this.inventoryService = service;
97 public void unsetInventoryService(IInventoryService service) {
98 logger.trace("Got a service UNset request");
99 this.inventoryService = null;
102 private void getInventories() {
103 Map<Node, Map<String, Property>> nodeProp = this.inventoryService
105 for (Map.Entry<Node, Map<String, Property>> entry : nodeProp.entrySet()) {
106 Node node = entry.getKey();
107 logger.debug("getInventories for node:{}", new Object[] { node });
108 Map<String, Property> propMap = entry.getValue();
109 Set<Property> props = new HashSet<Property>();
110 for (Property property : propMap.values()) {
113 updateNode(node, UpdateType.ADDED, props);
116 Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService
117 .getNodeConnectorProps();
118 for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp
120 Map<String, Property> propMap = entry.getValue();
121 Set<Property> props = new HashSet<Property>();
122 for (Property property : propMap.values()) {
125 updateNodeConnector(entry.getKey(), UpdateType.ADDED, props);
129 public void started() {
130 String schemeStr = System.getProperty("connection.scheme");
131 for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
132 AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme,
134 if (schemeImpl != null) {
135 schemes.put(scheme, schemeImpl);
136 if (scheme.name().equalsIgnoreCase(schemeStr)) {
137 activeScheme = scheme;
142 connectionEventThread.start();
144 registerWithOSGIConsole();
145 notifyClusterViewChanged();
146 // Should pull the Inventory updates in case we missed it
151 connectionEventThread = new Thread(new EventHandler(),
152 "ConnectionEvent Thread");
153 this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
154 schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
157 public void stopping() {
158 connectionEventThread.interrupt();
159 Set<Node> localNodes = getLocalNodes();
160 if (localNodes != null) {
161 AbstractScheme scheme = schemes.get(activeScheme);
162 for (Node localNode : localNodes) {
163 connectionService.disconnect(localNode);
165 scheme.removeNode(localNode);
171 public ConnectionMgmtScheme getActiveScheme() {
176 public Set<Node> getNodes(InetAddress controller) {
177 AbstractScheme scheme = schemes.get(activeScheme);
180 return scheme.getNodes(controller);
184 public Set<Node> getLocalNodes() {
185 AbstractScheme scheme = schemes.get(activeScheme);
188 return scheme.getNodes();
192 public boolean isLocal(Node node) {
193 AbstractScheme scheme = schemes.get(activeScheme);
196 return scheme.isLocal(node);
200 public ConnectionLocality getLocalityStatus(Node node) {
201 AbstractScheme scheme = schemes.get(activeScheme);
203 return ConnectionLocality.NOT_CONNECTED;
204 return scheme.getLocalityStatus(node);
208 public void updateNode(Node node, UpdateType type, Set<Property> props) {
209 logger.debug("updateNode: {} type {} props {}", node, type, props);
210 AbstractScheme scheme = schemes.get(activeScheme);
215 scheme.addNode(node);
218 scheme.removeNode(node);
226 public void updateNodeConnector(NodeConnector nodeConnector,
227 UpdateType type, Set<Property> props) {
228 logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector,
230 AbstractScheme scheme = schemes.get(activeScheme);
235 scheme.addNode(nodeConnector.getNode());
243 public void coordinatorChanged() {
244 notifyClusterViewChanged();
248 public Node connect(String connectionIdentifier,
249 Map<ConnectionConstants, String> params) {
250 if (connectionService == null)
252 Node node = connectionService.connect(connectionIdentifier, params);
253 AbstractScheme scheme = schemes.get(activeScheme);
254 if (scheme != null && node != null)
255 scheme.addNode(node);
260 public Node connect(String type, String connectionIdentifier,
261 Map<ConnectionConstants, String> params) {
262 if (connectionService == null)
264 Node node = connectionService.connect(connectionIdentifier, params);
265 AbstractScheme scheme = schemes.get(activeScheme);
266 if (scheme != null && node != null)
267 scheme.addNode(node);
272 public Status disconnect(Node node) {
274 return new Status(StatusCode.BADREQUEST);
275 if (connectionService == null)
276 return new Status(StatusCode.NOSERVICE);
277 Status status = connectionService.disconnect(node);
278 if (status.isSuccess()) {
279 AbstractScheme scheme = schemes.get(activeScheme);
281 scheme.removeNode(node);
287 public void entryCreated(Node key, String cacheName, boolean originLocal) {
293 * Clustering Services' doesnt provide the existing states in the cache
294 * update callbacks. Hence, using a scratch local cache to maintain the
297 private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
300 public void entryUpdated(Node node, Set<InetAddress> newControllers,
301 String cacheName, boolean originLocal) {
304 Set<InetAddress> existingControllers = existingConnections.get(node);
305 if (existingControllers != null) {
307 "Processing Update for : {} NewControllers : {} existingControllers : {}",
308 node, newControllers.toString(),
309 existingControllers.toString());
310 if (newControllers.size() < existingControllers.size()) {
311 Set<InetAddress> removed = new HashSet<InetAddress>(
312 existingControllers);
313 if (removed.removeAll(newControllers)) {
314 logger.debug("notifyNodeDisconnectFromMaster({})", node);
315 notifyNodeDisconnectedEvent(node);
319 logger.debug("Ignoring the Update for : {} NewControllers : {}",
320 node, newControllers.toString());
322 existingConnections.put(node, newControllers);
326 public void entryDeleted(Node key, String cacheName, boolean originLocal) {
329 logger.debug("Deleted : {} cache : {}", key, cacheName);
330 notifyNodeDisconnectedEvent(key);
333 private void enqueueConnectionEvent(ConnectionMgmtEvent event) {
335 if (!connectionEvents.contains(event)) {
336 this.connectionEvents.put(event);
338 } catch (InterruptedException e) {
340 "enqueueConnectionEvent caught Interrupt Exception for event {}",
345 private void notifyClusterViewChanged() {
346 ConnectionMgmtEvent event = new ConnectionMgmtEvent(
347 ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
348 enqueueConnectionEvent(event);
351 private void notifyNodeDisconnectedEvent(Node node) {
352 ConnectionMgmtEvent event = new ConnectionMgmtEvent(
353 ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
354 enqueueConnectionEvent(event);
358 * this thread monitors the connectionEvent queue for new incoming events
361 private class EventHandler implements Runnable {
367 ConnectionMgmtEvent ev = connectionEvents.take();
368 ConnectionMgmtEventType eType = ev.getEvent();
370 case NODE_DISCONNECTED_FROM_MASTER:
371 Node node = (Node) ev.getData();
372 connectionService.notifyNodeDisconnectFromMaster(node);
374 case CLUSTER_VIEW_CHANGED:
375 AbstractScheme scheme = schemes.get(activeScheme);
378 scheme.handleClusterViewChanged();
379 connectionService.notifyClusterViewChanged();
382 logger.error("Unknown Connection event {}",
385 } catch (InterruptedException e) {
386 connectionEvents.clear();
393 private void registerWithOSGIConsole() {
394 BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
396 bundleContext.registerService(CommandProvider.class.getName(), this,
400 public void _scheme(CommandInterpreter ci) {
401 String schemeStr = ci.nextArgument();
402 if (schemeStr == null) {
403 ci.println("Please enter valid Scheme name");
404 ci.println("Current Scheme : " + activeScheme.name());
407 ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr);
408 if (scheme == null) {
409 ci.println("Please enter a valid Scheme name");
412 activeScheme = scheme;
415 public void _printNodes(CommandInterpreter ci) {
416 String controller = ci.nextArgument();
417 if (controller == null) {
418 ci.println("Nodes connected to this controller : ");
419 if (this.getLocalNodes() == null) {
422 ci.println(this.getLocalNodes().toString());
427 InetAddress address = InetAddress.getByName(controller);
428 ci.println("Nodes connected to controller " + controller);
429 if (this.getNodes(address) == null) {
432 ci.println(this.getNodes(address).toString());
434 } catch (UnknownHostException e) {
435 logger.error("An error occured", e);
440 public String getHelp() {
441 StringBuffer help = new StringBuffer();
442 help.append("---Connection Manager---\n");
443 help.append("\t scheme [<name>] - Print / Set scheme\n");
444 help.append("\t printNodes [<controller>] - Print connected nodes\n");
445 return help.toString();
449 public Set<InetAddress> getControllers(Node node) {
450 AbstractScheme scheme = schemes.get(activeScheme);
452 return Collections.emptySet();
453 return scheme.getControllers(node);