Merge "Take advantage of MultipartTransactionAware"
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 /*
2  * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.connectionmanager.scheme;
9
10 import java.net.InetAddress;
11 import java.util.ArrayList;
12 import java.util.Collections;
13 import java.util.EnumSet;
14 import java.util.HashSet;
15 import java.util.Iterator;
16 import java.util.List;
17 import java.util.Set;
18 import java.util.concurrent.ConcurrentHashMap;
19 import java.util.concurrent.ConcurrentMap;
20
21 import org.opendaylight.controller.clustering.services.CacheConfigException;
22 import org.opendaylight.controller.clustering.services.CacheExistException;
23 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
24 import org.opendaylight.controller.clustering.services.IClusterServices;
25 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
26 import org.opendaylight.controller.sal.connection.ConnectionLocality;
27 import org.opendaylight.controller.sal.core.Node;
28 import org.opendaylight.controller.sal.utils.Status;
29 import org.opendaylight.controller.sal.utils.StatusCode;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public abstract class AbstractScheme {
34     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
35     protected IClusterGlobalServices clusterServices = null;
36     /*
37      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
38      * But, such a data-structure results in some complex event processing during the Cluster operations
39      * to sync up the Connection states.
40      *
41      * A data-structure with Node as the key and set of controllers provides a good balance
42      * between the needed functionality and simpler clustering implementation for Connection Manager.
43      */
44     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
45     protected abstract boolean isConnectionAllowedInternal(Node node);
46     private final String name;
47     private final String nodeConnectionsCacheName;
48
49     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
50         this.clusterServices = clusterServices;
51         name = (type != null ? type.name() : "UNKNOWN");
52         nodeConnectionsCacheName = "connectionmanager."+name+".nodeconnections";
53         if (clusterServices != null) {
54             allocateCaches();
55             retrieveCaches();
56         } else {
57             log.error("Couldn't retrieve caches for scheme {}. Clustering service unavailable", name);
58         }
59     }
60
61     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
62         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
63         for (Node node : nodeConnections.keySet()) {
64             Set<InetAddress> controllers = nodeConnections.get(node);
65             if (controllers == null) continue;
66             for (InetAddress controller : controllers) {
67                 Set<Node> nodes = controllerNodesMap.get(controller);
68                 if (nodes == null) {
69                     nodes = new HashSet<Node>();
70                 }
71                 nodes.add(node);
72                 controllerNodesMap.put(controller, nodes);
73             }
74         }
75         return controllerNodesMap;
76     }
77
78     public boolean isConnectionAllowed (Node node) {
79         if (clusterServices == null || nodeConnections == null) {
80             return false;
81         }
82
83         return isConnectionAllowedInternal(node);
84     }
85
86     public void handleClusterViewChanged() {
87         log.debug("Handling Cluster View changed notification");
88         List<InetAddress> controllers = clusterServices.getClusteredControllers();
89         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
90         List<InetAddress> toRemove = new ArrayList<InetAddress>();
91         for (InetAddress c : controllerNodesMap.keySet()) {
92             if (!controllers.contains(c)) {
93                 toRemove.add(c);
94             }
95         }
96
97         boolean retry = false;
98         for (InetAddress c : toRemove) {
99             log.debug("Removing Controller : {} from the Connections table", c);
100             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
101                 Node node = nodeIterator.next();
102                 Set <InetAddress> oldControllers = nodeConnections.get(node);
103                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
104                 if (newControllers.remove(c)) {
105                     try {
106                         clusterServices.tbegin();
107                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
108                             log.debug("Replace Failed for {} ", node.toString());
109                             retry = true;
110                             clusterServices.trollback();
111                             break;
112                         } else {
113                             clusterServices.tcommit();
114                         }
115                     } catch (Exception e) {
116                         log.debug("Exception in replacing nodeConnections ", e);
117                         retry = true;
118                         try {
119                             clusterServices.trollback();
120                         } catch (Exception e1) {}
121                         break;
122                     }
123                 }
124             }
125         }
126         if (retry) {
127             try {
128                 Thread.sleep(1000);
129             } catch (InterruptedException e) {}
130             handleClusterViewChanged();
131         }
132     }
133
134     public Set<Node> getNodes(InetAddress controller) {
135         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
136         return controllerNodesMap.get(controller);
137     }
138
139     public Set<Node> getNodes() {
140         return getNodes(clusterServices.getMyAddress());
141     }
142
143     public Set<InetAddress> getControllers(Node node) {
144         if (nodeConnections != null) return nodeConnections.get(node);
145         return Collections.emptySet();
146     }
147
148     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
149         return nodeConnections;
150     }
151
152     public boolean isLocal(Node node) {
153         if (nodeConnections == null) return false;
154         InetAddress myController = clusterServices.getMyAddress();
155         Set<InetAddress> controllers = nodeConnections.get(node);
156         return (controllers != null && controllers.contains(myController));
157     }
158
159     public ConnectionLocality getLocalityStatus(Node node) {
160         if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
161         Set<InetAddress> controllers = nodeConnections.get(node);
162         if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
163         InetAddress myController = clusterServices.getMyAddress();
164         return controllers.contains(myController) ? ConnectionLocality.LOCAL:
165                                                     ConnectionLocality.NOT_LOCAL;
166     }
167
168     public Status removeNode (Node node) {
169         return removeNodeFromController(node, clusterServices.getMyAddress());
170     }
171
172     protected Status removeNodeFromController (Node node, InetAddress controller) {
173         if (node == null || controller == null) {
174             return new Status(StatusCode.BADREQUEST, "Invalid Node or Controller Address Specified.");
175         }
176
177         if (clusterServices == null || nodeConnections == null) {
178             return new Status(StatusCode.SUCCESS);
179         }
180
181         Set<InetAddress> oldControllers = nodeConnections.get(node);
182
183         if (oldControllers != null && oldControllers.contains(controller)) {
184             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
185             if (newControllers.remove(controller)) {
186                 try {
187                     clusterServices.tbegin();
188                     if (newControllers.size() > 0) {
189                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
190                             clusterServices.trollback();
191                             try {
192                                 Thread.sleep(100);
193                             } catch ( InterruptedException e) {}
194                             return removeNodeFromController(node, controller);
195                         }
196                     } else {
197                         nodeConnections.remove(node);
198                     }
199                     clusterServices.tcommit();
200                 } catch (Exception e) {
201                     log.error("Exception in removing Controller from a Node", e);
202                     try {
203                         clusterServices.trollback();
204                     } catch (Exception e1) {
205                         log.error("Error Rolling back the node Connections Changes ", e);
206                     }
207                     return new Status(StatusCode.INTERNALERROR);
208                 }
209
210             }
211         }
212         return new Status(StatusCode.SUCCESS);
213
214     }
215
216     /*
217      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
218      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
219      */
220     private Status putNodeToController (Node node, InetAddress controller) {
221         if (clusterServices == null || nodeConnections == null) {
222             return new Status(StatusCode.INTERNALERROR, "Cluster service unavailable, or node connections info missing.");
223         }
224         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
225
226         Set <InetAddress> oldControllers = nodeConnections.get(node);
227         Set <InetAddress> newControllers = null;
228         if (oldControllers == null) {
229             newControllers = new HashSet<InetAddress>();
230         } else {
231             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
232                 /*
233                  * In certain race conditions, the putIfAbsent fails to be atomic.
234                  * This check is added to identify such cases and report an warning
235                  * for debugging.
236                  */
237                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
238             }
239             newControllers = new HashSet<InetAddress>(oldControllers);
240         }
241         newControllers.add(controller);
242
243         try {
244             clusterServices.tbegin();
245             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
246                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
247                 /*
248                  * This check is needed again to take care of the case where some schemes
249                  * would not allow nodes to be connected to multiple controllers.
250                  * Hence, if putIfAbsent fails, that means, some other controller is competing
251                  * with this controller to take hold of a Node.
252                  */
253                 if (isConnectionAllowed(node)) {
254                     if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
255                         clusterServices.trollback();
256                         try {
257                             Thread.sleep(100);
258                         } catch ( InterruptedException e) {}
259                         log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
260                         return putNodeToController(node, controller);
261                     } else {
262                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
263                                 controller.getHostAddress(), node.toString());
264                     }
265                 } else {
266                     clusterServices.trollback();
267                     return new Status(StatusCode.CONFLICT);
268                 }
269             } else {
270                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
271             }
272             clusterServices.tcommit();
273         } catch (Exception e) {
274             log.error("Excepion in adding Controller to a Node", e);
275             try {
276                 clusterServices.trollback();
277             } catch (Exception e1) {
278                 log.error("Error Rolling back the node Connections Changes ", e);
279             }
280             return new Status(StatusCode.INTERNALERROR);
281         }
282         return new Status(StatusCode.SUCCESS);
283     }
284
285     public Status addNode (Node node, InetAddress controller) {
286         if (node == null || controller == null) {
287             if (node == null) {
288                 log.warn("addNode: node is null");
289             } else if (controller == null) {
290                 log.error("Failed to add node {}. The controller address retrieved from clusterServices is null.", node);
291             }
292             return new Status(StatusCode.BADREQUEST);
293         }
294         if (isLocal(node))  {
295             return new Status(StatusCode.SUCCESS);
296         }
297         if (isConnectionAllowed(node)) {
298             return putNodeToController(node, controller);
299         } else {
300             return new Status(StatusCode.NOTALLOWED);
301         }
302     }
303
304     public Status addNode (Node node) {
305         return addNode(node, clusterServices.getMyAddress());
306     }
307
308     @SuppressWarnings({ "unchecked" })
309     private void retrieveCaches() {
310         if (this.clusterServices == null) {
311             log.error("Un-initialized Cluster Services, can't retrieve caches for scheme: {}", name);
312             return;
313         }
314
315         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache(nodeConnectionsCacheName);
316
317         if (nodeConnections == null) {
318             log.error("\nFailed to get cache: {}", nodeConnectionsCacheName);
319         }
320     }
321
322     private void allocateCaches() {
323         if (this.clusterServices == null) {
324             log.error("Un-initialized clusterServices, can't create cache");
325             return;
326         }
327
328         try {
329             clusterServices.createCache(nodeConnectionsCacheName, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
330         } catch (CacheExistException cee) {
331             log.debug("\nCache already exists: {}", nodeConnectionsCacheName);
332         } catch (CacheConfigException cce) {
333             log.error("\nCache configuration invalid - check cache mode");
334         } catch (Exception e) {
335             log.error("An error occured",e);
336         }
337     }
338
339     /* (non-Javadoc)
340      * @see java.lang.Object#hashCode()
341      */
342     @Override
343     public int hashCode() {
344         final int prime = 31;
345         int result = 1;
346         result = prime * result + ((name == null) ? 0 : name.hashCode());
347         result = prime * result + ((nodeConnections == null) ? 0 : nodeConnections.hashCode());
348         result = prime * result + ((nodeConnectionsCacheName == null) ? 0 : nodeConnectionsCacheName.hashCode());
349         return result;
350     }
351
352     /* (non-Javadoc)
353      * @see java.lang.Object#equals(java.lang.Object)
354      */
355     @Override
356     public boolean equals(Object obj) {
357         if (this == obj) {
358             return true;
359         }
360         if (obj == null) {
361             return false;
362         }
363         if (!(obj instanceof AbstractScheme)) {
364             return false;
365         }
366         AbstractScheme other = (AbstractScheme) obj;
367         if (name == null) {
368             if (other.name != null) {
369                 return false;
370             }
371         } else if (!name.equals(other.name)) {
372             return false;
373         }
374         if (nodeConnections == null) {
375             if (other.nodeConnections != null) {
376                 return false;
377             }
378         } else if (!nodeConnections.equals(other.nodeConnections)) {
379             return false;
380         }
381         if (nodeConnectionsCacheName == null) {
382             if (other.nodeConnectionsCacheName != null) {
383                 return false;
384             }
385         } else if (!nodeConnectionsCacheName.equals(other.nodeConnectionsCacheName)) {
386             return false;
387         }
388         return true;
389     }
390 }