Porting Tristate logic to SAL Connection Service and make use of it in Inventory...
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / internal / ConnectionManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * Connection Manager provides south-bound connectivity services.
12  * The APIs are currently focused towards Active-Active Clustering support
13  * wherein the node can connect to any of the Active Controller in the Cluster.
14  * This component can also host the necessary logic for south-bound connectivity
15  * when partial cluster is identified during Partition scenarios.
16  *
17  * But this (and its corresponding implementation) component can also be used for
18  * basic connectivity mechansims for various south-bound plugins.
19  */
20
21 package org.opendaylight.controller.connectionmanager.internal;
22
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.LinkedBlockingQueue;
32
33 import org.eclipse.osgi.framework.console.CommandInterpreter;
34 import org.eclipse.osgi.framework.console.CommandProvider;
35 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
36 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
37 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
38 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
39 import org.opendaylight.controller.connectionmanager.IConnectionManager;
40 import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
41 import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
42 import org.opendaylight.controller.sal.connection.ConnectionConstants;
43 import org.opendaylight.controller.sal.connection.ConnectionLocality;
44 import org.opendaylight.controller.sal.connection.IConnectionListener;
45 import org.opendaylight.controller.sal.connection.IConnectionService;
46 import org.opendaylight.controller.sal.core.Node;
47 import org.opendaylight.controller.sal.core.NodeConnector;
48 import org.opendaylight.controller.sal.core.Property;
49 import org.opendaylight.controller.sal.core.UpdateType;
50 import org.opendaylight.controller.sal.inventory.IInventoryService;
51 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
52 import org.opendaylight.controller.sal.utils.Status;
53 import org.opendaylight.controller.sal.utils.StatusCode;
54 import org.osgi.framework.BundleContext;
55 import org.osgi.framework.FrameworkUtil;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59 public class ConnectionManager implements IConnectionManager, IConnectionListener,
60                                           ICoordinatorChangeAware, IListenInventoryUpdates,
61                                           ICacheUpdateAware<Node, Set<InetAddress>>,
62                                           CommandProvider {
63     private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
64     private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
65     private IClusterGlobalServices clusterServices;
66     private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
67     private IConnectionService connectionService;
68     private Thread connectionEventThread;
69     private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
70     private IInventoryService inventoryService;
71
72     public void setClusterServices(IClusterGlobalServices i) {
73         this.clusterServices = i;
74     }
75
76     public void unsetClusterServices(IClusterGlobalServices i) {
77         if (this.clusterServices == i) {
78             this.clusterServices = null;
79         }
80     }
81
82     public void setConnectionService(IConnectionService i) {
83         this.connectionService = i;
84     }
85
86     public void unsetConnectionService(IConnectionService i) {
87         if (this.connectionService == i) {
88             this.connectionService = null;
89         }
90     }
91
92     public void setInventoryService(IInventoryService service) {
93         logger.trace("Got inventory service set request {}", service);
94         this.inventoryService = service;
95     }
96
97     public void unsetInventoryService(IInventoryService service) {
98         logger.trace("Got a service UNset request");
99         this.inventoryService = null;
100     }
101
102     private void getInventories() {
103         Map<Node, Map<String, Property>> nodeProp = this.inventoryService.getNodeProps();
104         for (Map.Entry<Node, Map<String, Property>> entry : nodeProp.entrySet()) {
105             Node node = entry.getKey();
106             logger.debug("getInventories for node:{}", new Object[] { node });
107             Map<String, Property> propMap = entry.getValue();
108             Set<Property> props = new HashSet<Property>();
109             for (Property property : propMap.values()) {
110                 props.add(property);
111             }
112             updateNode(node, UpdateType.ADDED, props);
113         }
114
115         Map<NodeConnector, Map<String, Property>> nodeConnectorProp = this.inventoryService.getNodeConnectorProps();
116         for (Map.Entry<NodeConnector, Map<String, Property>> entry : nodeConnectorProp.entrySet()) {
117             Map<String, Property> propMap = entry.getValue();
118             Set<Property> props = new HashSet<Property>();
119             for (Property property : propMap.values()) {
120                 props.add(property);
121             }
122             updateNodeConnector(entry.getKey(), UpdateType.ADDED, props);
123         }
124     }
125
126
127    public void started() {
128        String schemeStr = System.getProperty("connection.scheme");
129        for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
130            AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
131            if (schemeImpl != null) {
132                schemes.put(scheme, schemeImpl);
133                if (scheme.name().equalsIgnoreCase(schemeStr)) {
134                    activeScheme = scheme;
135                }
136            }
137        }
138
139         connectionEventThread.start();
140
141         registerWithOSGIConsole();
142         notifyClusterViewChanged();
143         // Should pull the Inventory updates in case we missed it
144         getInventories();
145     }
146
147     public void init() {
148         connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
149         this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
150         schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
151     }
152
153     public void stopping() {
154         connectionEventThread.interrupt();
155         Set<Node> localNodes = getLocalNodes();
156         if (localNodes != null) {
157             AbstractScheme scheme = schemes.get(activeScheme);
158             for (Node localNode : localNodes) {
159                 connectionService.disconnect(localNode);
160                 if (scheme != null) scheme.removeNode(localNode);
161             }
162         }
163     }
164
165     @Override
166     public ConnectionMgmtScheme getActiveScheme() {
167         return activeScheme;
168     }
169
170     @Override
171     public Set<Node> getNodes(InetAddress controller) {
172         AbstractScheme scheme = schemes.get(activeScheme);
173         if (scheme == null) return null;
174         return scheme.getNodes(controller);
175     }
176
177     @Override
178     public Set<Node> getLocalNodes() {
179         AbstractScheme scheme = schemes.get(activeScheme);
180         if (scheme == null) return null;
181         return scheme.getNodes();
182     }
183
184     @Override
185     public boolean isLocal(Node node) {
186         AbstractScheme scheme = schemes.get(activeScheme);
187         if (scheme == null) return false;
188         return scheme.isLocal(node);
189     }
190
191     @Override
192     public ConnectionLocality getLocalityStatus(Node node) {
193         AbstractScheme scheme = schemes.get(activeScheme);
194         if (scheme == null) return ConnectionLocality.NOT_CONNECTED;
195         return scheme.getLocalityStatus(node);
196     }
197
198     @Override
199     public void updateNode(Node node, UpdateType type, Set<Property> props) {
200         logger.debug("updateNode: {} type {} props {}", node, type, props);
201         AbstractScheme scheme = schemes.get(activeScheme);
202         if (scheme == null) return;
203         switch (type) {
204         case ADDED:
205             scheme.addNode(node);
206             break;
207         case REMOVED:
208             scheme.removeNode(node);
209             break;
210         default:
211                 break;
212         }
213     }
214
215     @Override
216     public void updateNodeConnector(NodeConnector nodeConnector,
217             UpdateType type, Set<Property> props) {
218         logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props);
219         AbstractScheme scheme = schemes.get(activeScheme);
220         if (scheme == null) return;
221         switch (type) {
222         case ADDED:
223             scheme.addNode(nodeConnector.getNode());
224             break;
225         default:
226                 break;
227         }
228     }
229
230     @Override
231     public void coordinatorChanged() {
232         notifyClusterViewChanged();
233     }
234
235     @Override
236     public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
237         if (connectionService == null) return null;
238         return connectionService.connect(connectionIdentifier, params);
239     }
240
241     @Override
242     public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
243         if (connectionService == null) return null;
244         return connectionService.connect(type, connectionIdentifier, params);
245     }
246
247     @Override
248     public Status disconnect (Node node) {
249         if (connectionService == null) return new Status(StatusCode.NOSERVICE);
250         return connectionService.disconnect(node);
251     }
252
253     @Override
254     public void entryCreated(Node key, String cacheName, boolean originLocal) {
255         if (originLocal) return;
256     }
257
258     /*
259      * Clustering Services' doesnt provide the existing states in the cache update callbacks.
260      * Hence, using a scratch local cache to maintain the existing state.
261      *
262      */
263     private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
264
265     @Override
266     public void entryUpdated(Node node, Set<InetAddress> newControllers, String cacheName, boolean originLocal) {
267         if (originLocal) return;
268         Set<InetAddress> existingControllers = existingConnections.get(node);
269         if (existingControllers != null) {
270             logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node,
271                     newControllers.toString(), existingControllers.toString());
272             if (newControllers.size() < existingControllers.size()) {
273                 Set<InetAddress> removed = new HashSet<InetAddress>(existingControllers);
274                 if (removed.removeAll(newControllers)) {
275                     logger.debug("notifyNodeDisconnectFromMaster({})", node);
276                     notifyNodeDisconnectedEvent(node);
277                 }
278             }
279         } else {
280             logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString());
281         }
282         existingConnections.put(node, newControllers);
283     }
284
285     @Override
286     public void entryDeleted(Node key, String cacheName, boolean originLocal) {
287         if (originLocal) return;
288         logger.debug("Deleted : {} cache : {}", key, cacheName);
289         notifyNodeDisconnectedEvent(key);
290     }
291
292     private void enqueueConnectionEvent(ConnectionMgmtEvent event) {
293         try {
294             if (!connectionEvents.contains(event)) {
295                 this.connectionEvents.put(event);
296             }
297         } catch (InterruptedException e) {
298             logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event);
299         }
300     }
301
302     private void notifyClusterViewChanged() {
303         ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
304         enqueueConnectionEvent(event);
305     }
306
307     private void notifyNodeDisconnectedEvent(Node node) {
308         ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
309         enqueueConnectionEvent(event);
310     }
311
312     /*
313      * this thread monitors the connectionEvent queue for new incoming events from
314      */
315     private class EventHandler implements Runnable {
316         @Override
317         public void run() {
318
319             while (true) {
320                 try {
321                     ConnectionMgmtEvent ev = connectionEvents.take();
322                     ConnectionMgmtEventType eType = ev.getEvent();
323                     switch (eType) {
324                     case NODE_DISCONNECTED_FROM_MASTER:
325                         Node node = (Node)ev.getData();
326                         connectionService.notifyNodeDisconnectFromMaster(node);
327                         break;
328                     case CLUSTER_VIEW_CHANGED:
329                         AbstractScheme scheme = schemes.get(activeScheme);
330                         if (scheme == null) return;
331                         scheme.handleClusterViewChanged();
332                         connectionService.notifyClusterViewChanged();
333                         break;
334                     default:
335                         logger.error("Unknown Connection event {}", eType.ordinal());
336                     }
337                 } catch (InterruptedException e) {
338                     connectionEvents.clear();
339                     return;
340                 }
341             }
342         }
343     }
344
345     private void registerWithOSGIConsole() {
346         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
347                 .getBundleContext();
348         bundleContext.registerService(CommandProvider.class.getName(), this,
349                 null);
350     }
351
352     public void _scheme (CommandInterpreter ci) {
353         String schemeStr = ci.nextArgument();
354         if (schemeStr == null) {
355             ci.println("Please enter valid Scheme name");
356             ci.println("Current Scheme : " + activeScheme.name());
357             return;
358         }
359         ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr);
360         if (scheme == null) {
361             ci.println("Please enter a valid Scheme name");
362             return;
363         }
364         activeScheme = scheme;
365     }
366
367     public void _printNodes (CommandInterpreter ci) {
368         String controller = ci.nextArgument();
369         if (controller == null) {
370             ci.println("Nodes connected to this controller : ");
371             if (this.getLocalNodes() == null) {
372                 ci.println("None");
373             } else {
374                 ci.println(this.getLocalNodes().toString());
375             }
376             return;
377         }
378         try {
379             InetAddress address = InetAddress.getByName(controller);
380             ci.println("Nodes connected to controller "+controller);
381             if (this.getNodes(address) == null) {
382                 ci.println("None");
383             } else {
384                 ci.println(this.getNodes(address).toString());
385             }
386         } catch (UnknownHostException e) {
387            logger.error("An error occured",e);
388         }
389     }
390
391     @Override
392     public String getHelp() {
393         StringBuffer help = new StringBuffer();
394         help.append("---Connection Manager---\n");
395         help.append("\t scheme [<name>]                      - Print / Set scheme\n");
396         help.append("\t printNodes [<controller>]            - Print connected nodes\n");
397         return help.toString();
398     }
399 }