Merge "Add some log messages in case controller failed to add connected node."
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.Collections;
6 import java.util.EnumSet;
7 import java.util.HashSet;
8 import java.util.Iterator;
9 import java.util.List;
10 import java.util.Set;
11 import java.util.concurrent.ConcurrentHashMap;
12 import java.util.concurrent.ConcurrentMap;
13
14 import org.opendaylight.controller.clustering.services.CacheConfigException;
15 import org.opendaylight.controller.clustering.services.CacheExistException;
16 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
17 import org.opendaylight.controller.clustering.services.IClusterServices;
18 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
19 import org.opendaylight.controller.sal.connection.ConnectionLocality;
20 import org.opendaylight.controller.sal.core.Node;
21 import org.opendaylight.controller.sal.utils.Status;
22 import org.opendaylight.controller.sal.utils.StatusCode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public abstract class AbstractScheme {
27     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
28     protected IClusterGlobalServices clusterServices = null;
29     /*
30      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
31      * But, such a data-structure results in some complex event processing during the Cluster operations
32      * to sync up the Connection states.
33      *
34      * A data-structure with Node as the key and set of controllers provides a good balance
35      * between the needed functionality and simpler clustering implementation for Connection Manager.
36      */
37     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
38     protected abstract boolean isConnectionAllowedInternal(Node node);
39     private final String name;
40     private final String nodeConnectionsCacheName;
41
42     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
43         this.clusterServices = clusterServices;
44         name = (type != null ? type.name() : "UNKNOWN");
45         nodeConnectionsCacheName = "connectionmanager."+name+".nodeconnections";
46         if (clusterServices != null) {
47             allocateCaches();
48             retrieveCaches();
49         } else {
50             log.error("Couldn't retrieve caches for scheme %s. Clustering service unavailable", name);
51         }
52     }
53
54     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
55         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
56         for (Node node : nodeConnections.keySet()) {
57             Set<InetAddress> controllers = nodeConnections.get(node);
58             if (controllers == null) continue;
59             for (InetAddress controller : controllers) {
60                 Set<Node> nodes = controllerNodesMap.get(controller);
61                 if (nodes == null) {
62                     nodes = new HashSet<Node>();
63                 }
64                 nodes.add(node);
65                 controllerNodesMap.put(controller, nodes);
66             }
67         }
68         return controllerNodesMap;
69     }
70
71     public boolean isConnectionAllowed (Node node) {
72         if (clusterServices == null || nodeConnections == null) {
73             return false;
74         }
75
76         return isConnectionAllowedInternal(node);
77     }
78
79     public void handleClusterViewChanged() {
80         log.debug("Handling Cluster View changed notification");
81         List<InetAddress> controllers = clusterServices.getClusteredControllers();
82         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
83         List<InetAddress> toRemove = new ArrayList<InetAddress>();
84         for (InetAddress c : controllerNodesMap.keySet()) {
85             if (!controllers.contains(c)) {
86                 toRemove.add(c);
87             }
88         }
89
90         boolean retry = false;
91         for (InetAddress c : toRemove) {
92             log.debug("Removing Controller : {} from the Connections table", c);
93             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
94                 Node node = nodeIterator.next();
95                 Set <InetAddress> oldControllers = nodeConnections.get(node);
96                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
97                 if (newControllers.remove(c)) {
98                     try {
99                         clusterServices.tbegin();
100                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
101                             log.debug("Replace Failed for {} ", node.toString());
102                             retry = true;
103                             clusterServices.trollback();
104                             break;
105                         } else {
106                             clusterServices.tcommit();
107                         }
108                     } catch (Exception e) {
109                         log.debug("Exception in replacing nodeConnections ", e);
110                         retry = true;
111                         try {
112                             clusterServices.trollback();
113                         } catch (Exception e1) {}
114                         break;
115                     }
116                 }
117             }
118         }
119         if (retry) {
120             try {
121                 Thread.sleep(1000);
122             } catch (InterruptedException e) {}
123             handleClusterViewChanged();
124         }
125     }
126
127     public Set<Node> getNodes(InetAddress controller) {
128         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
129         return controllerNodesMap.get(controller);
130     }
131
132     public Set<Node> getNodes() {
133         return getNodes(clusterServices.getMyAddress());
134     }
135
136     public Set<InetAddress> getControllers(Node node) {
137         if (nodeConnections != null) return nodeConnections.get(node);
138         return Collections.emptySet();
139     }
140
141     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
142         return nodeConnections;
143     }
144
145     public boolean isLocal(Node node) {
146         if (nodeConnections == null) return false;
147         InetAddress myController = clusterServices.getMyAddress();
148         Set<InetAddress> controllers = nodeConnections.get(node);
149         return (controllers != null && controllers.contains(myController));
150     }
151
152     public ConnectionLocality getLocalityStatus(Node node) {
153         if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
154         Set<InetAddress> controllers = nodeConnections.get(node);
155         if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
156         InetAddress myController = clusterServices.getMyAddress();
157         return controllers.contains(myController) ? ConnectionLocality.LOCAL:
158                                                     ConnectionLocality.NOT_LOCAL;
159     }
160
161     public Status removeNode (Node node) {
162         return removeNodeFromController(node, clusterServices.getMyAddress());
163     }
164
165     protected Status removeNodeFromController (Node node, InetAddress controller) {
166         if (node == null || controller == null) {
167             return new Status(StatusCode.BADREQUEST, "Invalid Node or Controller Address Specified.");
168         }
169
170         if (clusterServices == null || nodeConnections == null) {
171             return new Status(StatusCode.SUCCESS);
172         }
173
174         Set<InetAddress> oldControllers = nodeConnections.get(node);
175
176         if (oldControllers != null && oldControllers.contains(controller)) {
177             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
178             if (newControllers.remove(controller)) {
179                 try {
180                     clusterServices.tbegin();
181                     if (newControllers.size() > 0) {
182                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
183                             clusterServices.trollback();
184                             try {
185                                 Thread.sleep(100);
186                             } catch ( InterruptedException e) {}
187                             return removeNodeFromController(node, controller);
188                         }
189                     } else {
190                         nodeConnections.remove(node);
191                     }
192                     clusterServices.tcommit();
193                 } catch (Exception e) {
194                     log.error("Exception in removing Controller from a Node", e);
195                     try {
196                         clusterServices.trollback();
197                     } catch (Exception e1) {
198                         log.error("Error Rolling back the node Connections Changes ", e);
199                     }
200                     return new Status(StatusCode.INTERNALERROR);
201                 }
202
203             }
204         }
205         return new Status(StatusCode.SUCCESS);
206
207     }
208
209     /*
210      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
211      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
212      */
213     private Status putNodeToController (Node node, InetAddress controller) {
214         if (clusterServices == null || nodeConnections == null) {
215             return new Status(StatusCode.INTERNALERROR, "Cluster service unavailable, or node connections info missing.");
216         }
217         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
218
219         Set <InetAddress> oldControllers = nodeConnections.get(node);
220         Set <InetAddress> newControllers = null;
221         if (oldControllers == null) {
222             newControllers = new HashSet<InetAddress>();
223         } else {
224             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
225                 /*
226                  * In certain race conditions, the putIfAbsent fails to be atomic.
227                  * This check is added to identify such cases and report an warning
228                  * for debugging.
229                  */
230                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
231             }
232             newControllers = new HashSet<InetAddress>(oldControllers);
233         }
234         newControllers.add(controller);
235
236         try {
237             clusterServices.tbegin();
238             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
239                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
240                 /*
241                  * This check is needed again to take care of the case where some schemes
242                  * would not allow nodes to be connected to multiple controllers.
243                  * Hence, if putIfAbsent fails, that means, some other controller is competing
244                  * with this controller to take hold of a Node.
245                  */
246                 if (isConnectionAllowed(node)) {
247                     if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
248                         clusterServices.trollback();
249                         try {
250                             Thread.sleep(100);
251                         } catch ( InterruptedException e) {}
252                         log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
253                         return putNodeToController(node, controller);
254                     } else {
255                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
256                                 controller.getHostAddress(), node.toString());
257                     }
258                 } else {
259                     clusterServices.trollback();
260                     return new Status(StatusCode.CONFLICT);
261                 }
262             } else {
263                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
264             }
265             clusterServices.tcommit();
266         } catch (Exception e) {
267             log.error("Excepion in adding Controller to a Node", e);
268             try {
269                 clusterServices.trollback();
270             } catch (Exception e1) {
271                 log.error("Error Rolling back the node Connections Changes ", e);
272             }
273             return new Status(StatusCode.INTERNALERROR);
274         }
275         return new Status(StatusCode.SUCCESS);
276     }
277
278     public Status addNode (Node node, InetAddress controller) {
279         if (node == null || controller == null) {
280             if (node == null) {
281                 log.warn("addNode: node is null");
282             } else if (controller == null) {
283                 log.error("Failed to add node {}. The controller address retrieved from clusterServices is null.", node);
284             }
285             return new Status(StatusCode.BADREQUEST);
286         }
287         if (isLocal(node))  {
288             return new Status(StatusCode.SUCCESS);
289         }
290         if (isConnectionAllowed(node)) {
291             return putNodeToController(node, controller);
292         } else {
293             return new Status(StatusCode.NOTALLOWED);
294         }
295     }
296
297     public Status addNode (Node node) {
298         return addNode(node, clusterServices.getMyAddress());
299     }
300
301     @SuppressWarnings({ "unchecked" })
302     private void retrieveCaches() {
303         if (this.clusterServices == null) {
304             log.error("Un-initialized Cluster Services, can't retrieve caches for scheme: {}", name);
305             return;
306         }
307
308         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache(nodeConnectionsCacheName);
309
310         if (nodeConnections == null) {
311             log.error("\nFailed to get cache: {}", nodeConnectionsCacheName);
312         }
313     }
314
315     private void allocateCaches() {
316         if (this.clusterServices == null) {
317             log.error("Un-initialized clusterServices, can't create cache");
318             return;
319         }
320
321         try {
322             clusterServices.createCache(nodeConnectionsCacheName, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
323         } catch (CacheExistException cee) {
324             log.debug("\nCache already exists: {}", nodeConnectionsCacheName);
325         } catch (CacheConfigException cce) {
326             log.error("\nCache configuration invalid - check cache mode");
327         } catch (Exception e) {
328             log.error("An error occured",e);
329         }
330     }
331 }