Take advantage of MultipartTransactionAware
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / internal / ConnectionManager.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 /**
10  * Connection Manager provides south-bound connectivity services.
11  * The APIs are currently focused towards Active-Active Clustering support
12  * wherein the node can connect to any of the Active Controller in the Cluster.
13  * This component can also host the necessary logic for south-bound connectivity
14  * when partial cluster is identified during Partition scenarios.
15  *
16  * But this (and its corresponding implementation) component can also be used for
17  * basic connectivity mechansims for various south-bound plugins.
18  */
19
20 package org.opendaylight.controller.connectionmanager.internal;
21
22 import java.net.InetAddress;
23 import java.net.UnknownHostException;
24 import java.util.Collections;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.LinkedBlockingQueue;
32
33 import org.eclipse.osgi.framework.console.CommandInterpreter;
34 import org.eclipse.osgi.framework.console.CommandProvider;
35 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
36 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
37 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
38 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
39 import org.opendaylight.controller.connectionmanager.IConnectionManager;
40 import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
41 import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
42 import org.opendaylight.controller.sal.connection.ConnectionConstants;
43 import org.opendaylight.controller.sal.connection.ConnectionLocality;
44 import org.opendaylight.controller.sal.connection.IConnectionListener;
45 import org.opendaylight.controller.sal.connection.IConnectionService;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.NodeConnector;
48 import org.opendaylight.controller.sal.core.Property;
49 import org.opendaylight.controller.sal.core.UpdateType;
50 import org.opendaylight.controller.sal.inventory.IInventoryService;
51 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
52 import org.opendaylight.controller.sal.utils.Status;
53 import org.opendaylight.controller.sal.utils.StatusCode;
54 import org.osgi.framework.BundleContext;
55 import org.osgi.framework.FrameworkUtil;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class ConnectionManager implements IConnectionManager,
60         IConnectionListener, ICoordinatorChangeAware, IListenInventoryUpdates,
61         ICacheUpdateAware<Node, Set<InetAddress>>, CommandProvider {
62     private static final Logger logger = LoggerFactory
63             .getLogger(ConnectionManager.class);
64     private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
65     private IClusterGlobalServices clusterServices;
66     private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
67     private IConnectionService connectionService;
68     private Thread connectionEventThread;
69     private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
70     private IInventoryService inventoryService;
71
72     public void setClusterServices(IClusterGlobalServices i) {
73         this.clusterServices = i;
74     }
75
76     public void unsetClusterServices(IClusterGlobalServices i) {
77         if (this.clusterServices == i) {
78             this.clusterServices = null;
79         }
80     }
81
82     public void setConnectionService(IConnectionService i) {
83         this.connectionService = i;
84     }
85
86     public void unsetConnectionService(IConnectionService i) {
87         if (this.connectionService == i) {
88             this.connectionService = null;
89         }
90     }
91
92     public void setInventoryService(IInventoryService service) {
93         logger.trace("Got inventory service set request {}", service);
94         this.inventoryService = service;
95     }
96
97     public void unsetInventoryService(IInventoryService service) {
98         logger.trace("Got a service UNset request");
99         this.inventoryService = null;
100     }
101
102     private void getInventories() {
103         Map<Node, Map<String, Property>> nodeProp = this.inventoryService
104                 .getNodeProps();
105         for (Map.Entry<Node, Map<String, Property>> entry : nodeProp.entrySet()) {
106             Node node = entry.getKey();
107             logger.debug("getInventories for node:{}", new Object[] { node });
108             Map<String, Property> propMap = entry.getValue();
109             Set<Property> props = new HashSet<Property>();
110             for (Property property : propMap.values()) {
111                 props.add(property);
112             }
113             updateNode(node, UpdateType.ADDED, props);
114         }
115
116         Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService
117                 .getNodeConnectorProps();
118         for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp
119                 .entrySet()) {
120             Map<String, Property> propMap = entry.getValue();
121             Set<Property> props = new HashSet<Property>();
122             for (Property property : propMap.values()) {
123                 props.add(property);
124             }
125             updateNodeConnector(entry.getKey(), UpdateType.ADDED, props);
126         }
127     }
128
129     public void started() {
130         String schemeStr = System.getProperty("connection.scheme");
131         for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
132             AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme,
133                     clusterServices);
134             if (schemeImpl != null) {
135                 schemes.put(scheme, schemeImpl);
136                 if (scheme.name().equalsIgnoreCase(schemeStr)) {
137                     activeScheme = scheme;
138                 }
139             }
140         }
141
142         connectionEventThread.start();
143
144         registerWithOSGIConsole();
145         notifyClusterViewChanged();
146         // Should pull the Inventory updates in case we missed it
147         getInventories();
148     }
149
150     public void init() {
151         connectionEventThread = new Thread(new EventHandler(),
152                 "ConnectionEvent Thread");
153         this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
154         schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
155     }
156
157     public void stopping() {
158         connectionEventThread.interrupt();
159         Set<Node> localNodes = getLocalNodes();
160         if (localNodes != null) {
161             AbstractScheme scheme = schemes.get(activeScheme);
162             for (Node localNode : localNodes) {
163                 connectionService.disconnect(localNode);
164                 if (scheme != null)
165                     scheme.removeNode(localNode);
166             }
167         }
168     }
169
170     @Override
171     public ConnectionMgmtScheme getActiveScheme() {
172         return activeScheme;
173     }
174
175     @Override
176     public Set<Node> getNodes(InetAddress controller) {
177         AbstractScheme scheme = schemes.get(activeScheme);
178         if (scheme == null)
179             return null;
180         return scheme.getNodes(controller);
181     }
182
183     @Override
184     public Set<Node> getLocalNodes() {
185         AbstractScheme scheme = schemes.get(activeScheme);
186         if (scheme == null)
187             return null;
188         return scheme.getNodes();
189     }
190
191     @Override
192     public boolean isLocal(Node node) {
193         AbstractScheme scheme = schemes.get(activeScheme);
194         if (scheme == null)
195             return false;
196         return scheme.isLocal(node);
197     }
198
199     @Override
200     public ConnectionLocality getLocalityStatus(Node node) {
201         AbstractScheme scheme = schemes.get(activeScheme);
202         if (scheme == null)
203             return ConnectionLocality.NOT_CONNECTED;
204         return scheme.getLocalityStatus(node);
205     }
206
207     @Override
208     public void updateNode(Node node, UpdateType type, Set<Property> props) {
209         logger.debug("updateNode: {} type {} props {}", node, type, props);
210         AbstractScheme scheme = schemes.get(activeScheme);
211         if (scheme == null)
212             return;
213         switch (type) {
214         case ADDED:
215             scheme.addNode(node);
216             break;
217         case REMOVED:
218             scheme.removeNode(node);
219             break;
220         default:
221             break;
222         }
223     }
224
225     @Override
226     public void updateNodeConnector(NodeConnector nodeConnector,
227             UpdateType type, Set<Property> props) {
228         logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector,
229                 type, props);
230         AbstractScheme scheme = schemes.get(activeScheme);
231         if (scheme == null)
232             return;
233         switch (type) {
234         case ADDED:
235             scheme.addNode(nodeConnector.getNode());
236             break;
237         default:
238             break;
239         }
240     }
241
242     @Override
243     public void coordinatorChanged() {
244         notifyClusterViewChanged();
245     }
246
247     @Override
248     public Node connect(String connectionIdentifier,
249             Map<ConnectionConstants, String> params) {
250         if (connectionService == null)
251             return null;
252         Node node = connectionService.connect(connectionIdentifier, params);
253         AbstractScheme scheme = schemes.get(activeScheme);
254         if (scheme != null && node != null)
255             scheme.addNode(node);
256         return node;
257     }
258
259     @Override
260     public Node connect(String type, String connectionIdentifier,
261             Map<ConnectionConstants, String> params) {
262         if (connectionService == null)
263             return null;
264         Node node = connectionService.connect(connectionIdentifier, params);
265         AbstractScheme scheme = schemes.get(activeScheme);
266         if (scheme != null && node != null)
267             scheme.addNode(node);
268         return node;
269     }
270
271     @Override
272     public Status disconnect(Node node) {
273         if (node == null)
274             return new Status(StatusCode.BADREQUEST);
275         if (connectionService == null)
276             return new Status(StatusCode.NOSERVICE);
277         Status status = connectionService.disconnect(node);
278         if (status.isSuccess()) {
279             AbstractScheme scheme = schemes.get(activeScheme);
280             if (scheme != null)
281                 scheme.removeNode(node);
282         }
283         return status;
284     }
285
286     @Override
287     public void entryCreated(Node key, String cacheName, boolean originLocal) {
288         if (originLocal)
289             return;
290     }
291
292     /*
293      * Clustering Services' doesnt provide the existing states in the cache
294      * update callbacks. Hence, using a scratch local cache to maintain the
295      * existing state.
296      */
297     private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
298
299     @Override
300     public void entryUpdated(Node node, Set<InetAddress> newControllers,
301             String cacheName, boolean originLocal) {
302         if (originLocal)
303             return;
304         Set<InetAddress> existingControllers = existingConnections.get(node);
305         if (existingControllers != null) {
306             logger.debug(
307                     "Processing Update for : {} NewControllers : {} existingControllers : {}",
308                     node, newControllers.toString(),
309                     existingControllers.toString());
310             if (newControllers.size() < existingControllers.size()) {
311                 Set<InetAddress> removed = new HashSet<InetAddress>(
312                         existingControllers);
313                 if (removed.removeAll(newControllers)) {
314                     logger.debug("notifyNodeDisconnectFromMaster({})", node);
315                     notifyNodeDisconnectedEvent(node);
316                 }
317             }
318         } else {
319             logger.debug("Ignoring the Update for : {} NewControllers : {}",
320                     node, newControllers.toString());
321         }
322         existingConnections.put(node, newControllers);
323     }
324
325     @Override
326     public void entryDeleted(Node key, String cacheName, boolean originLocal) {
327         if (originLocal)
328             return;
329         logger.debug("Deleted : {} cache : {}", key, cacheName);
330         notifyNodeDisconnectedEvent(key);
331     }
332
333     private void enqueueConnectionEvent(ConnectionMgmtEvent event) {
334         try {
335             if (!connectionEvents.contains(event)) {
336                 this.connectionEvents.put(event);
337             }
338         } catch (InterruptedException e) {
339             logger.debug(
340                     "enqueueConnectionEvent caught Interrupt Exception for event {}",
341                     event);
342         }
343     }
344
345     private void notifyClusterViewChanged() {
346         ConnectionMgmtEvent event = new ConnectionMgmtEvent(
347                 ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
348         enqueueConnectionEvent(event);
349     }
350
351     private void notifyNodeDisconnectedEvent(Node node) {
352         ConnectionMgmtEvent event = new ConnectionMgmtEvent(
353                 ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
354         enqueueConnectionEvent(event);
355     }
356
357     /*
358      * this thread monitors the connectionEvent queue for new incoming events
359      * from
360      */
361     private class EventHandler implements Runnable {
362         @Override
363         public void run() {
364
365             while (true) {
366                 try {
367                     ConnectionMgmtEvent ev = connectionEvents.take();
368                     ConnectionMgmtEventType eType = ev.getEvent();
369                     switch (eType) {
370                     case NODE_DISCONNECTED_FROM_MASTER:
371                         Node node = (Node) ev.getData();
372                         connectionService.notifyNodeDisconnectFromMaster(node);
373                         break;
374                     case CLUSTER_VIEW_CHANGED:
375                         AbstractScheme scheme = schemes.get(activeScheme);
376                         if (scheme == null)
377                             return;
378                         scheme.handleClusterViewChanged();
379                         connectionService.notifyClusterViewChanged();
380                         break;
381                     default:
382                         logger.error("Unknown Connection event {}",
383                                 eType.ordinal());
384                     }
385                 } catch (InterruptedException e) {
386                     connectionEvents.clear();
387                     return;
388                 }
389             }
390         }
391     }
392
393     private void registerWithOSGIConsole() {
394         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
395                 .getBundleContext();
396         bundleContext.registerService(CommandProvider.class.getName(), this,
397                 null);
398     }
399
400     public void _scheme(CommandInterpreter ci) {
401         String schemeStr = ci.nextArgument();
402         if (schemeStr == null) {
403             ci.println("Please enter valid Scheme name");
404             ci.println("Current Scheme : " + activeScheme.name());
405             return;
406         }
407         ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr);
408         if (scheme == null) {
409             ci.println("Please enter a valid Scheme name");
410             return;
411         }
412         activeScheme = scheme;
413     }
414
415     public void _printNodes(CommandInterpreter ci) {
416         String controller = ci.nextArgument();
417         if (controller == null) {
418             ci.println("Nodes connected to this controller : ");
419             if (this.getLocalNodes() == null) {
420                 ci.println("None");
421             } else {
422                 ci.println(this.getLocalNodes().toString());
423             }
424             return;
425         }
426         try {
427             InetAddress address = InetAddress.getByName(controller);
428             ci.println("Nodes connected to controller " + controller);
429             if (this.getNodes(address) == null) {
430                 ci.println("None");
431             } else {
432                 ci.println(this.getNodes(address).toString());
433             }
434         } catch (UnknownHostException e) {
435             logger.error("An error occured", e);
436         }
437     }
438
439     @Override
440     public String getHelp() {
441         StringBuffer help = new StringBuffer();
442         help.append("---Connection Manager---\n");
443         help.append("\t scheme [<name>]                      - Print / Set scheme\n");
444         help.append("\t printNodes [<controller>]            - Print connected nodes\n");
445         return help.toString();
446     }
447
448     @Override
449     public Set<InetAddress> getControllers(Node node) {
450         AbstractScheme scheme = schemes.get(activeScheme);
451         if (scheme == null)
452             return Collections.emptySet();
453         return scheme.getControllers(node);
454     }
455 }