Added Configurable parameter for the Connection Manager scheme and Container profile.
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / internal / ConnectionManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * Connection Manager provides south-bound connectivity services.
12  * The APIs are currently focused towards Active-Active Clustering support
13  * wherein the node can connect to any of the Active Controller in the Cluster.
14  * This component can also host the necessary logic for south-bound connectivity
15  * when partial cluster is identified during Partition scenarios.
16  *
17  * But this (and its corresponding implementation) component can also be used for
18  * basic connectivity mechansims for various south-bound plugins.
19  */
20
21 package org.opendaylight.controller.connectionmanager.internal;
22
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.LinkedBlockingQueue;
32
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.eclipse.osgi.framework.console.CommandInterpreter;
36 import org.eclipse.osgi.framework.console.CommandProvider;
37 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
38 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
39 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
40 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
41 import org.opendaylight.controller.connectionmanager.IConnectionManager;
42 import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
43 import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
44 import org.opendaylight.controller.sal.connection.ConnectionConstants;
45 import org.opendaylight.controller.sal.connection.IConnectionListener;
46 import org.opendaylight.controller.sal.connection.IConnectionService;
47 import org.opendaylight.controller.sal.core.Node;
48 import org.opendaylight.controller.sal.core.NodeConnector;
49 import org.opendaylight.controller.sal.core.Property;
50 import org.opendaylight.controller.sal.core.UpdateType;
51 import org.opendaylight.controller.sal.inventory.IInventoryService;
52 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
53 import org.opendaylight.controller.sal.utils.Status;
54 import org.opendaylight.controller.sal.utils.StatusCode;
55 import org.osgi.framework.BundleContext;
56 import org.osgi.framework.FrameworkUtil;
57
58 public class ConnectionManager implements IConnectionManager, IConnectionListener,
59                                           ICoordinatorChangeAware, IListenInventoryUpdates,
60                                           ICacheUpdateAware<Node, Set<InetAddress>>,
61                                           CommandProvider {
62     private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
63     private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
64     private IClusterGlobalServices clusterServices;
65     private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
66     private IConnectionService connectionService;
67     private Thread connectionEventThread;
68     private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
69     private IInventoryService inventoryService;
70
71     public void setClusterServices(IClusterGlobalServices i) {
72         this.clusterServices = i;
73     }
74
75     public void unsetClusterServices(IClusterGlobalServices i) {
76         if (this.clusterServices == i) {
77             this.clusterServices = null;
78         }
79     }
80
81     public void setConnectionService(IConnectionService i) {
82         this.connectionService = i;
83     }
84
85     public void unsetConnectionService(IConnectionService i) {
86         if (this.connectionService == i) {
87             this.connectionService = null;
88         }
89     }
90
91     public void setInventoryService(IInventoryService service) {
92         logger.trace("Got inventory service set request {}", service);
93         this.inventoryService = service;
94     }
95
96     public void unsetInventoryService(IInventoryService service) {
97         logger.trace("Got a service UNset request");
98         this.inventoryService = null;
99     }
100
101     private void getInventories() {
102         Map<Node, Map<String, Property>> nodeProp = this.inventoryService.getNodeProps();
103         for (Map.Entry<Node, Map<String, Property>> entry : nodeProp.entrySet()) {
104             Node node = entry.getKey();
105             logger.debug("getInventories for node:{}", new Object[] { node });
106             Map<String, Property> propMap = entry.getValue();
107             Set<Property> props = new HashSet<Property>();
108             for (Property property : propMap.values()) {
109                 props.add(property);
110             }
111             updateNode(node, UpdateType.ADDED, props);
112         }
113
114         Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService.getNodeConnectorProps();
115         for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp.entrySet()) {
116             Map<String, Property> propMap = entry.getValue();
117             Set<Property> props = new HashSet<Property>();
118             for (Property property : propMap.values()) {
119                 props.add(property);
120             }
121             updateNodeConnector(entry.getKey(), UpdateType.ADDED, props);
122         }
123     }
124
125     public void started() {
126         connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
127         connectionEventThread.start();
128
129         registerWithOSGIConsole();
130         notifyClusterViewChanged();
131         // Should pull the Inventory updates in case we missed it
132         getInventories();
133     }
134
135     public void init() {
136         String schemeStr = System.getProperty("connection.scheme");
137         this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
138         schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
139         for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
140             AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
141             if (schemeImpl != null) {
142                 schemes.put(scheme, schemeImpl);
143                 if (scheme.name().equalsIgnoreCase(schemeStr)) {
144                     activeScheme = scheme;
145                 }
146             }
147         }
148     }
149
150     public void stop() {
151         connectionEventThread.interrupt();
152         Set<Node> localNodes = getLocalNodes();
153         if (localNodes != null) {
154             AbstractScheme scheme = schemes.get(activeScheme);
155             for (Node localNode : localNodes) {
156                 connectionService.disconnect(localNode);
157                 if (scheme != null) scheme.removeNode(localNode);
158             }
159         }
160     }
161
162     @Override
163     public ConnectionMgmtScheme getActiveScheme() {
164         return activeScheme;
165     }
166
167     @Override
168     public Set<Node> getNodes(InetAddress controller) {
169         AbstractScheme scheme = schemes.get(activeScheme);
170         if (scheme == null) return null;
171         return scheme.getNodes(controller);
172     }
173
174     @Override
175     public Set<Node> getLocalNodes() {
176         AbstractScheme scheme = schemes.get(activeScheme);
177         if (scheme == null) return null;
178         return scheme.getNodes();
179     }
180
181     @Override
182     public boolean isLocal(Node node) {
183         AbstractScheme scheme = schemes.get(activeScheme);
184         if (scheme == null) return false;
185         return scheme.isLocal(node);
186     }
187
188     @Override
189     public void updateNode(Node node, UpdateType type, Set<Property> props) {
190         logger.debug("updateNode: {} type {} props {}", node, type, props);
191         AbstractScheme scheme = schemes.get(activeScheme);
192         if (scheme == null) return;
193         switch (type) {
194         case ADDED:
195             scheme.addNode(node);
196             break;
197         case REMOVED:
198             scheme.removeNode(node);
199             break;
200         default:
201                 break;
202         }
203     }
204
205     @Override
206     public void updateNodeConnector(NodeConnector nodeConnector,
207             UpdateType type, Set<Property> props) {
208         logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props);
209         AbstractScheme scheme = schemes.get(activeScheme);
210         if (scheme == null) return;
211         switch (type) {
212         case ADDED:
213             scheme.addNode(nodeConnector.getNode());
214             break;
215         default:
216                 break;
217         }
218     }
219
220     @Override
221     public void coordinatorChanged() {
222         notifyClusterViewChanged();
223     }
224
225     @Override
226     public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
227         if (connectionService == null) return null;
228         return connectionService.connect(connectionIdentifier, params);
229     }
230
231     @Override
232     public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
233         if (connectionService == null) return null;
234         return connectionService.connect(type, connectionIdentifier, params);
235     }
236
237     @Override
238     public Status disconnect (Node node) {
239         if (connectionService == null) return new Status(StatusCode.NOSERVICE);
240         return connectionService.disconnect(node);
241     }
242
243     @Override
244     public void entryCreated(Node key, String cacheName, boolean originLocal) {
245         if (originLocal) return;
246     }
247
248     /*
249      * Clustering Services' doesnt provide the existing states in the cache update callbacks.
250      * Hence, using a scratch local cache to maintain the existing state.
251      *
252      */
253     private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
254
255     @Override
256     public void entryUpdated(Node node, Set<InetAddress> newControllers, String cacheName, boolean originLocal) {
257         if (originLocal) return;
258         Set<InetAddress> existingControllers = existingConnections.get(node);
259         if (existingControllers != null) {
260             logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node,
261                     newControllers.toString(), existingControllers.toString());
262             if (newControllers.size() < existingControllers.size()) {
263                 Set<InetAddress> removed = new HashSet<InetAddress>(existingControllers);
264                 if (removed.removeAll(newControllers)) {
265                     logger.debug("notifyNodeDisconnectFromMaster({})", node);
266                     notifyNodeDisconnectedEvent(node);
267                 }
268             }
269         } else {
270             logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString());
271         }
272         existingConnections.put(node, newControllers);
273     }
274
275     @Override
276     public void entryDeleted(Node key, String cacheName, boolean originLocal) {
277         if (originLocal) return;
278         logger.debug("Deleted : {} cache : {}", key, cacheName);
279         notifyNodeDisconnectedEvent(key);
280     }
281
282     private void enqueueConnectionEvent(ConnectionMgmtEvent event) {
283         try {
284             if (!connectionEvents.contains(event)) {
285                 this.connectionEvents.put(event);
286             }
287         } catch (InterruptedException e) {
288             logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event);
289         }
290     }
291
292     private void notifyClusterViewChanged() {
293         ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
294         enqueueConnectionEvent(event);
295     }
296
297     private void notifyNodeDisconnectedEvent(Node node) {
298         ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
299         enqueueConnectionEvent(event);
300     }
301
302     /*
303      * this thread monitors the connectionEvent queue for new incoming events from
304      */
305     private class EventHandler implements Runnable {
306         @Override
307         public void run() {
308
309             while (true) {
310                 try {
311                     ConnectionMgmtEvent ev = connectionEvents.take();
312                     ConnectionMgmtEventType eType = ev.getEvent();
313                     switch (eType) {
314                     case NODE_DISCONNECTED_FROM_MASTER:
315                         Node node = (Node)ev.getData();
316                         connectionService.notifyNodeDisconnectFromMaster(node);
317                         break;
318                     case CLUSTER_VIEW_CHANGED:
319                         AbstractScheme scheme = schemes.get(activeScheme);
320                         if (scheme == null) return;
321                         scheme.handleClusterViewChanged();
322                         connectionService.notifyClusterViewChanged();
323                         break;
324                     default:
325                         logger.error("Unknown Connection event {}", eType.ordinal());
326                     }
327                 } catch (InterruptedException e) {
328                     connectionEvents.clear();
329                     return;
330                 }
331             }
332         }
333     }
334
335     private void registerWithOSGIConsole() {
336         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
337                 .getBundleContext();
338         bundleContext.registerService(CommandProvider.class.getName(), this,
339                 null);
340     }
341
342     public void _scheme (CommandInterpreter ci) {
343         String schemeStr = ci.nextArgument();
344         if (schemeStr == null) {
345             ci.println("Please enter valid Scheme name");
346             ci.println("Current Scheme : " + activeScheme.name());
347             return;
348         }
349         ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr);
350         if (scheme == null) {
351             ci.println("Please enter a valid Scheme name");
352             return;
353         }
354         activeScheme = scheme;
355     }
356
357     public void _printNodes (CommandInterpreter ci) {
358         String controller = ci.nextArgument();
359         if (controller == null) {
360             ci.println("Nodes connected to this controller : ");
361             if (this.getLocalNodes() == null) {
362                 ci.println("None");
363             } else {
364                 ci.println(this.getLocalNodes().toString());
365             }
366             return;
367         }
368         try {
369             InetAddress address = InetAddress.getByName(controller);
370             ci.println("Nodes connected to controller "+controller);
371             if (this.getNodes(address) == null) {
372                 ci.println("None");
373             } else {
374                 ci.println(this.getNodes(address).toString());
375             }
376         } catch (UnknownHostException e) {
377            logger.error("An error occured",e);
378         }
379     }
380
381     @Override
382     public String getHelp() {
383         StringBuffer help = new StringBuffer();
384         help.append("---Connection Manager---\n");
385         help.append("\t scheme [<name>]                      - Print / Set scheme\n");
386         help.append("\t printNodes [<controller>]            - Print connected nodes\n");
387         return help.toString();
388     }
389 }