Fix NPEs on switch disconnect in cluster mode
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import org.opendaylight.controller.clustering.services.CacheConfigException;
14 import org.opendaylight.controller.clustering.services.CacheExistException;
15 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
16 import org.opendaylight.controller.clustering.services.IClusterServices;
17 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
18 import org.opendaylight.controller.sal.core.Node;
19 import org.opendaylight.controller.sal.utils.Status;
20 import org.opendaylight.controller.sal.utils.StatusCode;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public abstract class AbstractScheme {
25     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
26     protected IClusterGlobalServices clusterServices = null;
27     /*
28      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
29      * But, such a data-structure results in some complex event processing during the Cluster operations
30      * to sync up the Connection states.
31      *
32      * A data-structure with Node as the key and set of controllers provides a good balance
33      * between the needed functionality and simpler clustering implementation for Connection Manager.
34      */
35     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
36     protected abstract boolean isConnectionAllowedInternal(Node node);
37     private String name;
38
39     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
40         this.clusterServices = clusterServices;
41         if (type != null) name = type.name();
42         else name = "UNKNOWN";
43         if (clusterServices != null) {
44             allocateCaches();
45             retrieveCaches();
46         }
47     }
48
49     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
50         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
51         for (Node node : nodeConnections.keySet()) {
52             Set<InetAddress> controllers = nodeConnections.get(node);
53             if (controllers == null) continue;
54             for (InetAddress controller : controllers) {
55                 Set<Node> nodes = controllerNodesMap.get(controller);
56                 if (nodes == null) {
57                     nodes = new HashSet<Node>();
58                 }
59                 nodes.add(node);
60                 controllerNodesMap.put(controller, nodes);
61             }
62         }
63         return controllerNodesMap;
64     }
65
66     public boolean isConnectionAllowed (Node node) {
67         if (clusterServices == null || nodeConnections == null) {
68             return false;
69         }
70
71         return isConnectionAllowedInternal(node);
72     }
73
74     @SuppressWarnings("deprecation")
75     public void handleClusterViewChanged() {
76         log.debug("Handling Cluster View changed notification");
77         List<InetAddress> controllers = clusterServices.getClusteredControllers();
78         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
79         List<InetAddress> toRemove = new ArrayList<InetAddress>();
80         for (InetAddress c : controllerNodesMap.keySet()) {
81             if (!controllers.contains(c)) {
82                 toRemove.add(c);
83             }
84         }
85
86         boolean retry = false;
87         for (InetAddress c : toRemove) {
88             log.debug("Removing Controller : {} from the Connections table", c);
89             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
90                 Node node = nodeIterator.next();
91                 Set <InetAddress> oldControllers = nodeConnections.get(node);
92                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
93                 if (newControllers.remove(c)) {
94                     try {
95                         clusterServices.tbegin();
96                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
97                             log.debug("Replace Failed for {} ", node.toString());
98                             retry = true;
99                             clusterServices.trollback();
100                             break;
101                         } else {
102                             clusterServices.tcommit();
103                         }
104                     } catch (Exception e) {
105                         log.debug("Exception in replacing nodeConnections ", e);
106                         retry = true;
107                         try {
108                             clusterServices.trollback();
109                         } catch (Exception e1) {}
110                         break;
111                     }
112                 }
113             }
114         }
115         if (retry) {
116             try {
117                 Thread.sleep(1000);
118             } catch (InterruptedException e) {}
119             handleClusterViewChanged();
120         }
121     }
122
123     public Set<Node> getNodes(InetAddress controller) {
124         if (nodeConnections == null) return null;
125         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
126         return controllerNodesMap.get(controller);
127     }
128
129     public Set<Node> getNodes() {
130         return getNodes(clusterServices.getMyAddress());
131     }
132
133     public Set<InetAddress> getControllers(Node node) {
134         if (nodeConnections != null) return nodeConnections.get(node);
135         return null;
136     }
137
138     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
139         return nodeConnections;
140     }
141
142     public boolean isLocal(Node node) {
143         if (nodeConnections == null) return false;
144         InetAddress myController = clusterServices.getMyAddress();
145         Set<InetAddress> controllers = nodeConnections.get(node);
146         return (controllers != null && controllers.contains(myController));
147     }
148
149     public Status removeNode (Node node) {
150         return removeNodeFromController(node, clusterServices.getMyAddress());
151     }
152
153     protected Status removeNodeFromController (Node node, InetAddress controller) {
154         if (node == null || controller == null) {
155             return new Status(StatusCode.BADREQUEST);
156         }
157
158         if (clusterServices == null || nodeConnections == null) {
159             return new Status(StatusCode.SUCCESS);
160         }
161
162         Set<InetAddress> oldControllers = nodeConnections.get(node);
163
164         if (oldControllers != null && oldControllers.contains(controller)) {
165             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
166             if (newControllers.remove(controller)) {
167                 try {
168                     clusterServices.tbegin();
169                     if (newControllers.size() > 0) {
170                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
171                             clusterServices.trollback();
172                             try {
173                                 Thread.sleep(100);
174                             } catch ( InterruptedException e) {}
175                             return removeNodeFromController(node, controller);
176                         }
177                     } else {
178                         nodeConnections.remove(node);
179                     }
180                     clusterServices.tcommit();
181                 } catch (Exception e) {
182                     log.error("Excepion in removing Controller from a Node", e);
183                     try {
184                         clusterServices.trollback();
185                     } catch (Exception e1) {
186                         log.error("Error Rolling back the node Connections Changes ", e);
187                     }
188                     return new Status(StatusCode.INTERNALERROR);
189                 }
190
191             }
192         }
193         return new Status(StatusCode.SUCCESS);
194
195     }
196
197     /*
198      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
199      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
200      */
201     private Status putNodeToController (Node node, InetAddress controller) {
202         if (clusterServices == null || nodeConnections == null) {
203             return new Status(StatusCode.SUCCESS);
204         }
205         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
206
207         Set <InetAddress> oldControllers = nodeConnections.get(node);
208         Set <InetAddress> newControllers = null;
209         if (oldControllers == null) {
210             newControllers = new HashSet<InetAddress>();
211         } else {
212             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
213                 /*
214                  * In certain race conditions, the putIfAbsent fails to be atomic.
215                  * This check is added to identify such cases and report an warning
216                  * for debugging.
217                  */
218                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
219             }
220             newControllers = new HashSet<InetAddress>(oldControllers);
221         }
222         newControllers.add(controller);
223
224         try {
225             clusterServices.tbegin();
226             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
227                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
228                 /*
229                  * This check is needed again to take care of the case where some schemes
230                  * would not allow nodes to be connected to multiple controllers.
231                  * Hence, if putIfAbsent fails, that means, some other controller is competing
232                  * with this controller to take hold of a Node.
233                  */
234                 if (isConnectionAllowed(node)) {
235                     if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
236                         clusterServices.trollback();
237                         try {
238                             Thread.sleep(100);
239                         } catch ( InterruptedException e) {}
240                         log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
241                         return putNodeToController(node, controller);
242                     } else {
243                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
244                                 controller.getHostAddress(), node.toString());
245                     }
246                 } else {
247                     clusterServices.trollback();
248                     return new Status(StatusCode.CONFLICT);
249                 }
250             } else {
251                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
252             }
253             clusterServices.tcommit();
254         } catch (Exception e) {
255             log.error("Excepion in adding Controller to a Node", e);
256             try {
257                 clusterServices.trollback();
258             } catch (Exception e1) {
259                 log.error("Error Rolling back the node Connections Changes ", e);
260             }
261             return new Status(StatusCode.INTERNALERROR);
262         }
263         return new Status(StatusCode.SUCCESS);
264     }
265
266     public Status addNode (Node node, InetAddress controller) {
267         if (node == null || controller == null) {
268             return new Status(StatusCode.BADREQUEST);
269         }
270         if (isLocal(node)) return new Status(StatusCode.SUCCESS);
271         if (isConnectionAllowed(node)) {
272             return putNodeToController(node, controller);
273         } else {
274             return new Status(StatusCode.NOTALLOWED);
275         }
276     }
277
278     @SuppressWarnings("deprecation")
279     public Status addNode (Node node) {
280         return addNode(node, clusterServices.getMyAddress());
281     }
282
283     @SuppressWarnings({ "unchecked", "deprecation" })
284     private void retrieveCaches() {
285         if (this.clusterServices == null) {
286             log.error("un-initialized clusterServices, can't retrieve cache");
287             return;
288         }
289
290         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
291
292         if (nodeConnections == null) {
293             log.error("\nFailed to get caches");
294         }
295     }
296
297     @SuppressWarnings("deprecation")
298     private void allocateCaches() {
299         if (this.clusterServices == null) {
300             log.error("un-initialized clusterServices, can't create cache");
301             return;
302         }
303
304         try {
305             clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
306         } catch (CacheExistException cee) {
307             log.error("\nCache already exists - destroy and recreate if needed");
308         } catch (CacheConfigException cce) {
309             log.error("\nCache configuration invalid - check cache mode");
310         } catch (Exception e) {
311             log.error("An error occured",e);
312         }
313     }
314 }