Porting Tristate logic to SAL Connection Service and make use of it in Inventory...
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import org.opendaylight.controller.clustering.services.CacheConfigException;
14 import org.opendaylight.controller.clustering.services.CacheExistException;
15 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
16 import org.opendaylight.controller.clustering.services.IClusterServices;
17 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
18 import org.opendaylight.controller.sal.connection.ConnectionLocality;
19 import org.opendaylight.controller.sal.core.Node;
20 import org.opendaylight.controller.sal.utils.Status;
21 import org.opendaylight.controller.sal.utils.StatusCode;
22 import org.slf4j.Logger;
23 import org.slf4j.LoggerFactory;
24
25 public abstract class AbstractScheme {
26     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
27     protected IClusterGlobalServices clusterServices = null;
28     /*
29      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
30      * But, such a data-structure results in some complex event processing during the Cluster operations
31      * to sync up the Connection states.
32      *
33      * A data-structure with Node as the key and set of controllers provides a good balance
34      * between the needed functionality and simpler clustering implementation for Connection Manager.
35      */
36     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
37     protected abstract boolean isConnectionAllowedInternal(Node node);
38     private final String name;
39     private final String nodeConnectionsCacheName;
40
41     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
42         this.clusterServices = clusterServices;
43         name = (type != null ? type.name() : "UNKNOWN");
44         nodeConnectionsCacheName = "connectionmanager."+name+".nodeconnections";
45         if (clusterServices != null) {
46             allocateCaches();
47             retrieveCaches();
48         } else {
49             log.error("Couldn't retrieve caches for scheme %s. Clustering service unavailable", name);
50         }
51     }
52
53     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
54         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
55         for (Node node : nodeConnections.keySet()) {
56             Set<InetAddress> controllers = nodeConnections.get(node);
57             if (controllers == null) continue;
58             for (InetAddress controller : controllers) {
59                 Set<Node> nodes = controllerNodesMap.get(controller);
60                 if (nodes == null) {
61                     nodes = new HashSet<Node>();
62                 }
63                 nodes.add(node);
64                 controllerNodesMap.put(controller, nodes);
65             }
66         }
67         return controllerNodesMap;
68     }
69
70     public boolean isConnectionAllowed (Node node) {
71         if (clusterServices == null || nodeConnections == null) {
72             return false;
73         }
74
75         return isConnectionAllowedInternal(node);
76     }
77
78     public void handleClusterViewChanged() {
79         log.debug("Handling Cluster View changed notification");
80         List<InetAddress> controllers = clusterServices.getClusteredControllers();
81         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
82         List<InetAddress> toRemove = new ArrayList<InetAddress>();
83         for (InetAddress c : controllerNodesMap.keySet()) {
84             if (!controllers.contains(c)) {
85                 toRemove.add(c);
86             }
87         }
88
89         boolean retry = false;
90         for (InetAddress c : toRemove) {
91             log.debug("Removing Controller : {} from the Connections table", c);
92             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
93                 Node node = nodeIterator.next();
94                 Set <InetAddress> oldControllers = nodeConnections.get(node);
95                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
96                 if (newControllers.remove(c)) {
97                     try {
98                         clusterServices.tbegin();
99                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
100                             log.debug("Replace Failed for {} ", node.toString());
101                             retry = true;
102                             clusterServices.trollback();
103                             break;
104                         } else {
105                             clusterServices.tcommit();
106                         }
107                     } catch (Exception e) {
108                         log.debug("Exception in replacing nodeConnections ", e);
109                         retry = true;
110                         try {
111                             clusterServices.trollback();
112                         } catch (Exception e1) {}
113                         break;
114                     }
115                 }
116             }
117         }
118         if (retry) {
119             try {
120                 Thread.sleep(1000);
121             } catch (InterruptedException e) {}
122             handleClusterViewChanged();
123         }
124     }
125
126     public Set<Node> getNodes(InetAddress controller) {
127         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
128         return controllerNodesMap.get(controller);
129     }
130
131     public Set<Node> getNodes() {
132         return getNodes(clusterServices.getMyAddress());
133     }
134
135     public Set<InetAddress> getControllers(Node node) {
136         if (nodeConnections != null) return nodeConnections.get(node);
137         return null;
138     }
139
140     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
141         return nodeConnections;
142     }
143
144     public boolean isLocal(Node node) {
145         if (nodeConnections == null) return false;
146         InetAddress myController = clusterServices.getMyAddress();
147         Set<InetAddress> controllers = nodeConnections.get(node);
148         return (controllers != null && controllers.contains(myController));
149     }
150
151     public ConnectionLocality getLocalityStatus(Node node) {
152         if (nodeConnections == null) return ConnectionLocality.NOT_CONNECTED;
153         Set<InetAddress> controllers = nodeConnections.get(node);
154         if (controllers == null || controllers.size() == 0) return ConnectionLocality.NOT_CONNECTED;
155         InetAddress myController = clusterServices.getMyAddress();
156         return controllers.contains(myController) ? ConnectionLocality.LOCAL:
157                                                     ConnectionLocality.NOT_LOCAL;
158     }
159
160     public Status removeNode (Node node) {
161         return removeNodeFromController(node, clusterServices.getMyAddress());
162     }
163
164     protected Status removeNodeFromController (Node node, InetAddress controller) {
165         if (node == null || controller == null) {
166             return new Status(StatusCode.BADREQUEST, "Invalid Node or Controller Address Specified.");
167         }
168
169         if (clusterServices == null || nodeConnections == null) {
170             return new Status(StatusCode.SUCCESS);
171         }
172
173         Set<InetAddress> oldControllers = nodeConnections.get(node);
174
175         if (oldControllers != null && oldControllers.contains(controller)) {
176             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
177             if (newControllers.remove(controller)) {
178                 try {
179                     clusterServices.tbegin();
180                     if (newControllers.size() > 0) {
181                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
182                             clusterServices.trollback();
183                             try {
184                                 Thread.sleep(100);
185                             } catch ( InterruptedException e) {}
186                             return removeNodeFromController(node, controller);
187                         }
188                     } else {
189                         nodeConnections.remove(node);
190                     }
191                     clusterServices.tcommit();
192                 } catch (Exception e) {
193                     log.error("Exception in removing Controller from a Node", e);
194                     try {
195                         clusterServices.trollback();
196                     } catch (Exception e1) {
197                         log.error("Error Rolling back the node Connections Changes ", e);
198                     }
199                     return new Status(StatusCode.INTERNALERROR);
200                 }
201
202             }
203         }
204         return new Status(StatusCode.SUCCESS);
205
206     }
207
208     /*
209      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
210      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
211      */
212     private Status putNodeToController (Node node, InetAddress controller) {
213         if (clusterServices == null || nodeConnections == null) {
214             return new Status(StatusCode.INTERNALERROR, "Cluster service unavailable, or node connections info missing.");
215         }
216         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
217
218         Set <InetAddress> oldControllers = nodeConnections.get(node);
219         Set <InetAddress> newControllers = null;
220         if (oldControllers == null) {
221             newControllers = new HashSet<InetAddress>();
222         } else {
223             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
224                 /*
225                  * In certain race conditions, the putIfAbsent fails to be atomic.
226                  * This check is added to identify such cases and report an warning
227                  * for debugging.
228                  */
229                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
230             }
231             newControllers = new HashSet<InetAddress>(oldControllers);
232         }
233         newControllers.add(controller);
234
235         try {
236             clusterServices.tbegin();
237             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
238                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
239                 /*
240                  * This check is needed again to take care of the case where some schemes
241                  * would not allow nodes to be connected to multiple controllers.
242                  * Hence, if putIfAbsent fails, that means, some other controller is competing
243                  * with this controller to take hold of a Node.
244                  */
245                 if (isConnectionAllowed(node)) {
246                     if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
247                         clusterServices.trollback();
248                         try {
249                             Thread.sleep(100);
250                         } catch ( InterruptedException e) {}
251                         log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
252                         return putNodeToController(node, controller);
253                     } else {
254                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
255                                 controller.getHostAddress(), node.toString());
256                     }
257                 } else {
258                     clusterServices.trollback();
259                     return new Status(StatusCode.CONFLICT);
260                 }
261             } else {
262                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
263             }
264             clusterServices.tcommit();
265         } catch (Exception e) {
266             log.error("Excepion in adding Controller to a Node", e);
267             try {
268                 clusterServices.trollback();
269             } catch (Exception e1) {
270                 log.error("Error Rolling back the node Connections Changes ", e);
271             }
272             return new Status(StatusCode.INTERNALERROR);
273         }
274         return new Status(StatusCode.SUCCESS);
275     }
276
277     public Status addNode (Node node, InetAddress controller) {
278         if (node == null || controller == null) {
279             return new Status(StatusCode.BADREQUEST);
280         }
281         if (isLocal(node))  {
282             return new Status(StatusCode.SUCCESS);
283         }
284         if (isConnectionAllowed(node)) {
285             return putNodeToController(node, controller);
286         } else {
287             return new Status(StatusCode.NOTALLOWED);
288         }
289     }
290
291     public Status addNode (Node node) {
292         return addNode(node, clusterServices.getMyAddress());
293     }
294
295     @SuppressWarnings({ "unchecked" })
296     private void retrieveCaches() {
297         if (this.clusterServices == null) {
298             log.error("Un-initialized Cluster Services, can't retrieve caches for scheme: {}", name);
299             return;
300         }
301
302         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache(nodeConnectionsCacheName);
303
304         if (nodeConnections == null) {
305             log.error("\nFailed to get cache: {}", nodeConnectionsCacheName);
306         }
307     }
308
309     private void allocateCaches() {
310         if (this.clusterServices == null) {
311             log.error("Un-initialized clusterServices, can't create cache");
312             return;
313         }
314
315         try {
316             clusterServices.createCache(nodeConnectionsCacheName, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
317         } catch (CacheExistException cee) {
318             log.debug("\nCache already exists: {}", nodeConnectionsCacheName);
319         } catch (CacheConfigException cce) {
320             log.error("\nCache configuration invalid - check cache mode");
321         } catch (Exception e) {
322             log.error("An error occured",e);
323         }
324     }
325 }