Connection Manager infrastructure supporting the Clustering Active-Active requirement.
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / internal / ConnectionManager.java
1
2 /*
3  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
4  *
5  * This program and the accompanying materials are made available under the
6  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
7  * and is available at http://www.eclipse.org/legal/epl-v10.html
8  */
9
10 /**
11  * Connection Manager provides south-bound connectivity services.
12  * The APIs are currently focused towards Active-Active Clustering support
13  * wherein the node can connect to any of the Active Controller in the Cluster.
14  * This component can also host the necessary logic for south-bound connectivity
15  * when partial cluster is identified during Partition scenarios.
16  *
17  * But this (and its corresponding implementation) component can also be used for
18  * basic connectivity mechansims for various south-bound plugins.
19  */
20
21 package org.opendaylight.controller.connectionmanager.internal;
22
23 import java.net.InetAddress;
24 import java.net.UnknownHostException;
25 import java.util.HashSet;
26 import java.util.Map;
27 import java.util.Set;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ConcurrentHashMap;
30 import java.util.concurrent.ConcurrentMap;
31 import java.util.concurrent.LinkedBlockingQueue;
32
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import org.eclipse.osgi.framework.console.CommandInterpreter;
37 import org.eclipse.osgi.framework.console.CommandProvider;
38 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
39 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
40 import org.opendaylight.controller.clustering.services.ICoordinatorChangeAware;
41 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
42 import org.opendaylight.controller.connectionmanager.IConnectionManager;
43 import org.opendaylight.controller.connectionmanager.scheme.AbstractScheme;
44 import org.opendaylight.controller.connectionmanager.scheme.SchemeFactory;
45 import org.opendaylight.controller.sal.connection.ConnectionConstants;
46 import org.opendaylight.controller.sal.connection.IConnectionListener;
47 import org.opendaylight.controller.sal.connection.IConnectionService;
48 import org.opendaylight.controller.sal.core.Node;
49 import org.opendaylight.controller.sal.core.NodeConnector;
50 import org.opendaylight.controller.sal.core.Property;
51 import org.opendaylight.controller.sal.core.UpdateType;
52 import org.opendaylight.controller.sal.inventory.IListenInventoryUpdates;
53 import org.opendaylight.controller.sal.utils.Status;
54 import org.opendaylight.controller.sal.utils.StatusCode;
55 import org.osgi.framework.BundleContext;
56 import org.osgi.framework.FrameworkUtil;
57
58 public class ConnectionManager implements IConnectionManager, IConnectionListener,
59                                           ICoordinatorChangeAware, IListenInventoryUpdates,
60                                           ICacheUpdateAware<Node, Set<InetAddress>>,
61                                           CommandProvider {
62     private static final Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
63     private ConnectionMgmtScheme activeScheme = ConnectionMgmtScheme.ANY_CONTROLLER_ONE_MASTER;
64     private IClusterGlobalServices clusterServices;
65     private ConcurrentMap<ConnectionMgmtScheme, AbstractScheme> schemes;
66     private IConnectionService connectionService;
67     private Thread connectionEventThread;
68     private BlockingQueue<ConnectionMgmtEvent> connectionEvents;
69
70     public void setClusterServices(IClusterGlobalServices i) {
71         this.clusterServices = i;
72     }
73
74     public void unsetClusterServices(IClusterGlobalServices i) {
75         if (this.clusterServices == i) {
76             this.clusterServices = null;
77         }
78     }
79
80     public void setConnectionService(IConnectionService i) {
81         this.connectionService = i;
82     }
83
84     public void unsetConnectionService(IConnectionService i) {
85         if (this.connectionService == i) {
86             this.connectionService = null;
87         }
88     }
89
90     public void started() {
91         connectionEventThread = new Thread(new EventHandler(), "ConnectionEvent Thread");
92         connectionEventThread.start();
93
94         registerWithOSGIConsole();
95         notifyClusterViewChanged();
96     }
97
98     public void init() {
99         this.connectionEvents = new LinkedBlockingQueue<ConnectionMgmtEvent>();
100         schemes = new ConcurrentHashMap<ConnectionMgmtScheme, AbstractScheme>();
101         for (ConnectionMgmtScheme scheme : ConnectionMgmtScheme.values()) {
102             AbstractScheme schemeImpl = SchemeFactory.getScheme(scheme, clusterServices);
103             if (schemeImpl != null) schemes.put(scheme, schemeImpl);
104         }
105     }
106
107     public void stop() {
108         connectionEventThread.interrupt();
109         Set<Node> localNodes = getLocalNodes();
110         if (localNodes != null) {
111             AbstractScheme scheme = schemes.get(activeScheme);
112             for (Node localNode : localNodes) {
113                 connectionService.disconnect(localNode);
114                 if (scheme != null) scheme.removeNode(localNode);
115             }
116         }
117     }
118
119     @Override
120     public ConnectionMgmtScheme getActiveScheme() {
121         return activeScheme;
122     }
123
124     @Override
125     public Set<Node> getNodes(InetAddress controller) {
126         AbstractScheme scheme = schemes.get(activeScheme);
127         if (scheme == null) return null;
128         return scheme.getNodes(controller);
129     }
130
131     @Override
132     public Set<Node> getLocalNodes() {
133         AbstractScheme scheme = schemes.get(activeScheme);
134         if (scheme == null) return null;
135         return scheme.getNodes();
136     }
137
138     @Override
139     public boolean isLocal(Node node) {
140         AbstractScheme scheme = schemes.get(activeScheme);
141         if (scheme == null) return false;
142         return scheme.isLocal(node);
143     }
144
145     @Override
146     public void updateNode(Node node, UpdateType type, Set<Property> props) {
147         logger.debug("updateNode: {} type {} props {}", node, type, props);
148         AbstractScheme scheme = schemes.get(activeScheme);
149         if (scheme == null) return;
150         switch (type) {
151         case ADDED:
152             scheme.addNode(node);
153             break;
154         case REMOVED:
155             scheme.removeNode(node);
156             break;
157         default:
158                 break;
159         }
160     }
161
162     @Override
163     public void updateNodeConnector(NodeConnector nodeConnector,
164             UpdateType type, Set<Property> props) {
165         logger.debug("updateNodeConnector: {} type {} props {}", nodeConnector, type, props);
166         AbstractScheme scheme = schemes.get(activeScheme);
167         if (scheme == null) return;
168         switch (type) {
169         case ADDED:
170             scheme.addNode(nodeConnector.getNode());
171             break;
172         default:
173                 break;
174         }
175     }
176
177     @Override
178     public void coordinatorChanged() {
179         AbstractScheme scheme = schemes.get(activeScheme);
180         if (scheme == null) return;
181         scheme.handleClusterViewChanged();
182         notifyClusterViewChanged();
183     }
184
185     @Override
186     public Node connect(String connectionIdentifier, Map<ConnectionConstants, String> params) {
187         if (connectionService == null) return null;
188         return connectionService.connect(connectionIdentifier, params);
189     }
190
191     @Override
192     public Node connect(String type, String connectionIdentifier, Map<ConnectionConstants, String> params) {
193         if (connectionService == null) return null;
194         return connectionService.connect(type, connectionIdentifier, params);
195     }
196
197     @Override
198     public Status disconnect (Node node) {
199         if (connectionService == null) return new Status(StatusCode.NOSERVICE);
200         return connectionService.disconnect(node);
201     }
202
203     @Override
204     public void entryCreated(Node key, String cacheName, boolean originLocal) {
205         AbstractScheme scheme = schemes.get(activeScheme);
206         logger.debug("Created : {} cache : {} existingValue : {}", key, cacheName, scheme.getNodeConnections().get(key));
207     }
208
209     /*
210      * Clustering Services' doesnt provide the existing states in the cache update callbacks.
211      * Hence, using a scratch local cache to maintain the existing state.
212      *
213      */
214     private ConcurrentMap<Node, Set<InetAddress>> existingConnections = new ConcurrentHashMap<Node, Set<InetAddress>>();
215
216     @Override
217     public void entryUpdated(Node node, Set<InetAddress> newControllers, String cacheName, boolean originLocal) {
218         if (originLocal) return;
219         Set<InetAddress> existingControllers = existingConnections.get(node);
220         if (existingControllers != null) {
221             logger.debug("Processing Update for : {} NewControllers : {} existingControllers : {}", node,
222                     newControllers.toString(), existingControllers.toString());
223             if (newControllers.size() < existingControllers.size()) {
224                 Set<InetAddress> removed = new HashSet<InetAddress>(existingControllers);
225                 if (removed.removeAll(newControllers)) {
226                     logger.debug("notifyNodeDisconnectFromMaster({})", node);
227                     notifyNodeDisconnectedEvent(node);
228                 }
229             }
230         } else {
231             logger.debug("Ignoring the Update for : {} NewControllers : {}", node, newControllers.toString());
232         }
233         existingConnections.put(node, newControllers);
234     }
235
236     @Override
237     public void entryDeleted(Node key, String cacheName, boolean originLocal) {
238         if (originLocal) return;
239         logger.debug("Deleted : {} cache : {}", key, cacheName);
240         notifyNodeDisconnectedEvent(key);
241     }
242
243     private void enqueueConnectionEvent(ConnectionMgmtEvent event) {
244         try {
245             if (!connectionEvents.contains(event)) {
246                 this.connectionEvents.put(event);
247             }
248         } catch (InterruptedException e) {
249             logger.debug("enqueueConnectionEvent caught Interrupt Exception for event {}", event);
250         }
251     }
252
253     private void notifyClusterViewChanged() {
254         ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.CLUSTER_VIEW_CHANGED, null);
255         enqueueConnectionEvent(event);
256     }
257
258     private void notifyNodeDisconnectedEvent(Node node) {
259         ConnectionMgmtEvent event = new ConnectionMgmtEvent(ConnectionMgmtEventType.NODE_DISCONNECTED_FROM_MASTER, node);
260         enqueueConnectionEvent(event);
261     }
262
263     /*
264      * this thread monitors the connectionEvent queue for new incoming events from
265      */
266     private class EventHandler implements Runnable {
267         @Override
268         public void run() {
269
270             while (true) {
271                 try {
272                     ConnectionMgmtEvent ev = connectionEvents.take();
273                     ConnectionMgmtEventType eType = ev.getEvent();
274                     switch (eType) {
275                     case NODE_DISCONNECTED_FROM_MASTER:
276                         Node node = (Node)ev.getData();
277                         connectionService.notifyNodeDisconnectFromMaster(node);
278                         break;
279                     case CLUSTER_VIEW_CHANGED:
280                         connectionService.notifyClusterViewChanged();
281                         break;
282                     default:
283                         logger.error("Unknown Connection event {}", eType.ordinal());
284                     }
285                 } catch (InterruptedException e) {
286                     connectionEvents.clear();
287                     return;
288                 }
289             }
290         }
291     }
292
293     private void registerWithOSGIConsole() {
294         BundleContext bundleContext = FrameworkUtil.getBundle(this.getClass())
295                 .getBundleContext();
296         bundleContext.registerService(CommandProvider.class.getName(), this,
297                 null);
298     }
299
300     public void _scheme (CommandInterpreter ci) {
301         String schemeStr = ci.nextArgument();
302         if (schemeStr == null) {
303             ci.println("Please enter valid Scheme name");
304             ci.println("Current Scheme : " + activeScheme.name());
305             return;
306         }
307         ConnectionMgmtScheme scheme = ConnectionMgmtScheme.valueOf(schemeStr);
308         if (scheme == null) {
309             ci.println("Please enter a valid Scheme name");
310             return;
311         }
312         activeScheme = scheme;
313     }
314
315     public void _printNodes (CommandInterpreter ci) {
316         String controller = ci.nextArgument();
317         if (controller == null) {
318             ci.println("Nodes connected to this controller : ");
319             if (this.getLocalNodes() == null) ci.println("None");
320             else ci.println(this.getLocalNodes().toString());
321             return;
322         }
323         try {
324             InetAddress address = InetAddress.getByName(controller);
325             ci.println("Nodes connected to controller "+controller);
326             if (this.getNodes(address) == null) ci.println("None");
327             else ci.println(this.getNodes(address).toString());
328             return;
329         } catch (UnknownHostException e) {
330             e.printStackTrace();
331         }
332     }
333
334     @Override
335     public String getHelp() {
336         StringBuffer help = new StringBuffer();
337         help.append("---Connection Manager---\n");
338         help.append("\t scheme [<name>]                      - Print / Set scheme\n");
339         help.append("\t printNodes [<controller>]            - Print connected nodes\n");
340         return help.toString();
341     }
342 }