Merge "Fixing a bug to show node name if present instead of node id while adding...
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import javax.transaction.SystemException;
14
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
20 import org.opendaylight.controller.sal.core.Node;
21 import org.opendaylight.controller.sal.utils.Status;
22 import org.opendaylight.controller.sal.utils.StatusCode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public abstract class AbstractScheme {
27     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
28     protected IClusterGlobalServices clusterServices = null;
29     /*
30      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
31      * But, such a data-structure results in some complex event processing during the Cluster operations
32      * to sync up the Connection states.
33      *
34      * A data-structure with Node as the key and set of controllers provides a good balance
35      * between the needed functionality and simpler clustering implementation for Connection Manager.
36      */
37     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
38     protected abstract boolean isConnectionAllowedInternal(Node node);
39     private String name;
40
41     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
42         this.clusterServices = clusterServices;
43         if (type != null) name = type.name();
44         else name = "UNKNOWN";
45         if (clusterServices != null) {
46             allocateCaches();
47             retrieveCaches();
48         }
49     }
50
51     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
52         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
53         for (Node node : nodeConnections.keySet()) {
54             Set<InetAddress> controllers = nodeConnections.get(node);
55             if (controllers == null) continue;
56             for (InetAddress controller : controllers) {
57                 Set<Node> nodes = controllerNodesMap.get(controller);
58                 if (nodes == null) {
59                     nodes = new HashSet<Node>();
60                 }
61                 nodes.add(node);
62                 controllerNodesMap.put(controller, nodes);
63             }
64         }
65         return controllerNodesMap;
66     }
67
68     public boolean isConnectionAllowed (Node node) {
69         if (clusterServices == null || nodeConnections == null) {
70             return false;
71         }
72
73         return isConnectionAllowedInternal(node);
74     }
75
76     @SuppressWarnings("deprecation")
77     public void handleClusterViewChanged() {
78         log.debug("Handling Cluster View changed notification");
79         List<InetAddress> controllers = clusterServices.getClusteredControllers();
80         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
81         List<InetAddress> toRemove = new ArrayList<InetAddress>();
82         for (InetAddress c : controllerNodesMap.keySet()) {
83             if (!controllers.contains(c)) {
84                 toRemove.add(c);
85             }
86         }
87
88         boolean retry = false;
89         for (InetAddress c : toRemove) {
90             log.debug("Removing Controller : {} from the Connections table", c);
91             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
92                 Node node = nodeIterator.next();
93                 Set <InetAddress> oldControllers = nodeConnections.get(node);
94                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
95                 if (newControllers.remove(c)) {
96                     try {
97                         clusterServices.tbegin();
98                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
99                             log.debug("Replace Failed for {} ", node.toString());
100                             retry = true;
101                             clusterServices.trollback();
102                             break;
103                         } else {
104                             clusterServices.tcommit();
105                         }
106                     } catch (Exception e) {
107                         log.debug("Exception in replacing nodeConnections ", e);
108                         retry = true;
109                         try {
110                             clusterServices.trollback();
111                         } catch (Exception e1) {}
112                         break;
113                     }
114                 }
115             }
116         }
117         if (retry) {
118             try {
119                 Thread.sleep(1000);
120             } catch (InterruptedException e) {}
121             handleClusterViewChanged();
122         }
123     }
124
125     public Set<Node> getNodes(InetAddress controller) {
126         if (nodeConnections == null) return null;
127         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
128         return controllerNodesMap.get(controller);
129     }
130
131     @SuppressWarnings("deprecation")
132     public Set<Node> getNodes() {
133         return getNodes(clusterServices.getMyAddress());
134     }
135
136     public Set<InetAddress> getControllers(Node node) {
137         if (nodeConnections != null) return nodeConnections.get(node);
138         return null;
139     }
140
141     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
142         return nodeConnections;
143     }
144
145     @SuppressWarnings("deprecation")
146     public boolean isLocal(Node node) {
147         if (nodeConnections == null) return false;
148         InetAddress myController = clusterServices.getMyAddress();
149         Set<InetAddress> controllers = nodeConnections.get(node);
150         return (controllers != null && controllers.contains(myController));
151     }
152
153     @SuppressWarnings("deprecation")
154     public Status removeNode (Node node) {
155         return removeNodeFromController(node, clusterServices.getMyAddress());
156     }
157
158     protected Status removeNodeFromController (Node node, InetAddress controller) {
159         if (node == null || controller == null) {
160             return new Status(StatusCode.BADREQUEST);
161         }
162
163         if (clusterServices == null || nodeConnections == null) {
164             return new Status(StatusCode.SUCCESS);
165         }
166
167         Set<InetAddress> oldControllers = nodeConnections.get(node);
168
169         if (oldControllers != null && oldControllers.contains(controller)) {
170             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
171             if (newControllers.remove(controller)) {
172                 try {
173                     clusterServices.tbegin();
174                     if (newControllers.size() > 0) {
175                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
176                             clusterServices.trollback();
177                             try {
178                                 Thread.sleep(100);
179                             } catch ( InterruptedException e) {}
180                             return removeNodeFromController(node, controller);
181                         }
182                     } else {
183                         nodeConnections.remove(node);
184                     }
185                     clusterServices.tcommit();
186                 } catch (Exception e) {
187                     log.error("Excepion in removing Controller from a Node", e);
188                     try {
189                         clusterServices.trollback();
190                     } catch (Exception e1) {
191                         log.error("Error Rolling back the node Connections Changes ", e);
192                     }
193                     return new Status(StatusCode.INTERNALERROR);
194                 }
195
196             }
197         }
198         return new Status(StatusCode.SUCCESS);
199
200     }
201
202     /*
203      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
204      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
205      */
206     private Status putNodeToController (Node node, InetAddress controller) {
207         if (clusterServices == null || nodeConnections == null) {
208             return new Status(StatusCode.SUCCESS);
209         }
210         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
211
212         Set <InetAddress> oldControllers = nodeConnections.get(node);
213         Set <InetAddress> newControllers = null;
214         if (oldControllers == null) {
215             newControllers = new HashSet<InetAddress>();
216         } else {
217             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
218                 /*
219                  * In certain race conditions, the putIfAbsent fails to be atomic.
220                  * This check is added to identify such cases and report an warning
221                  * for debugging.
222                  */
223                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
224             }
225             newControllers = new HashSet<InetAddress>(oldControllers);
226         }
227         newControllers.add(controller);
228
229         try {
230             clusterServices.tbegin();
231             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
232                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
233                 /*
234                  * This check is needed again to take care of the case where some schemes
235                  * would not allow nodes to be connected to multiple controllers.
236                  * Hence, if putIfAbsent fails, that means, some other controller is competing
237                  * with this controller to take hold of a Node.
238                  */
239                 if (isConnectionAllowed(node)) {
240                     if (oldControllers == null || !nodeConnections.replace(node, oldControllers, newControllers)) {
241                         clusterServices.trollback();
242                         try {
243                             Thread.sleep(100);
244                         } catch ( InterruptedException e) {}
245                         log.debug("Retrying ... {} with {}", controller.getHostAddress(), node.toString());
246                         return putNodeToController(node, controller);
247                     } else {
248                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
249                                 controller.getHostAddress(), node.toString());
250                     }
251                 } else {
252                     clusterServices.trollback();
253                     return new Status(StatusCode.CONFLICT);
254                 }
255             } else {
256                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
257             }
258             clusterServices.tcommit();
259         } catch (Exception e) {
260             log.error("Excepion in adding Controller to a Node", e);
261             try {
262                 clusterServices.trollback();
263             } catch (Exception e1) {
264                 log.error("Error Rolling back the node Connections Changes ", e);
265             }
266             return new Status(StatusCode.INTERNALERROR);
267         }
268         return new Status(StatusCode.SUCCESS);
269     }
270
271     public Status addNode (Node node, InetAddress controller) {
272         if (node == null || controller == null) {
273             return new Status(StatusCode.BADREQUEST);
274         }
275         if (isLocal(node)) return new Status(StatusCode.SUCCESS);
276         if (isConnectionAllowed(node)) {
277             return putNodeToController(node, controller);
278         } else {
279             return new Status(StatusCode.NOTALLOWED);
280         }
281     }
282
283     @SuppressWarnings("deprecation")
284     public Status addNode (Node node) {
285         return addNode(node, clusterServices.getMyAddress());
286     }
287
288     @SuppressWarnings({ "unchecked", "deprecation" })
289     private void retrieveCaches() {
290         if (this.clusterServices == null) {
291             log.error("un-initialized clusterServices, can't retrieve cache");
292             return;
293         }
294
295         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
296
297         if (nodeConnections == null) {
298             log.error("\nFailed to get caches");
299         }
300     }
301
302     @SuppressWarnings("deprecation")
303     private void allocateCaches() {
304         if (this.clusterServices == null) {
305             log.error("un-initialized clusterServices, can't create cache");
306             return;
307         }
308
309         try {
310             clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
311         } catch (CacheExistException cee) {
312             log.error("\nCache already exists - destroy and recreate if needed");
313         } catch (CacheConfigException cce) {
314             log.error("\nCache configuration invalid - check cache mode");
315         } catch (Exception e) {
316             e.printStackTrace();
317         }
318     }
319 }