6b20909a877ce81de268c585c5e309be9fe7e2cf
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import javax.transaction.SystemException;
14
15 import org.opendaylight.controller.clustering.services.CacheConfigException;
16 import org.opendaylight.controller.clustering.services.CacheExistException;
17 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
18 import org.opendaylight.controller.clustering.services.IClusterServices;
19 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
20 import org.opendaylight.controller.sal.core.Node;
21 import org.opendaylight.controller.sal.utils.Status;
22 import org.opendaylight.controller.sal.utils.StatusCode;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public abstract class AbstractScheme {
27     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
28     protected IClusterGlobalServices clusterServices = null;
29     /*
30      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
31      * But, such a data-structure results in some complex event processing during the Cluster operations
32      * to sync up the Connection states.
33      *
34      * A data-structure with Node as the key and set of controllers provides a good balance
35      * between the needed functionality and simpler clustering implementation for Connection Manager.
36      */
37     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
38     protected abstract boolean isConnectionAllowedInternal(Node node);
39     private String name;
40
41     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
42         this.clusterServices = clusterServices;
43         if (type != null) name = type.name();
44         else name = "UNKNOWN";
45         if (clusterServices != null) {
46             allocateCaches();
47             retrieveCaches();
48         }
49     }
50
51     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
52         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
53         for (Node node : nodeConnections.keySet()) {
54             Set<InetAddress> controllers = nodeConnections.get(node);
55             if (controllers == null) continue;
56             for (InetAddress controller : controllers) {
57                 Set<Node> nodes = controllerNodesMap.get(controller);
58                 if (nodes == null) {
59                     nodes = new HashSet<Node>();
60                 }
61                 nodes.add(node);
62                 controllerNodesMap.put(controller, nodes);
63             }
64         }
65         return controllerNodesMap;
66     }
67
68     public boolean isConnectionAllowed (Node node) {
69         if (clusterServices == null || nodeConnections == null) {
70             return false;
71         }
72
73         return isConnectionAllowedInternal(node);
74     }
75
76     @SuppressWarnings("deprecation")
77     public void handleClusterViewChanged() {
78         List<InetAddress> controllers = clusterServices.getClusteredControllers();
79         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
80         List<InetAddress> toRemove = new ArrayList<InetAddress>();
81         for (InetAddress c : controllerNodesMap.keySet()) {
82             if (!controllers.contains(c)) {
83                 toRemove.add(c);
84             }
85         }
86
87         boolean retry = false;
88         for (InetAddress c : toRemove) {
89             log.debug("Removing Controller : {} from the Connections table", c);
90             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
91                 Node node = nodeIterator.next();
92                 Set <InetAddress> oldControllers = nodeConnections.get(node);
93                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
94                 if (newControllers.remove(c)) {
95                     try {
96                         clusterServices.tbegin();
97                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
98                             log.debug("Replace Failed for {} ", node.toString());
99                             retry = true;
100                             clusterServices.trollback();
101                             break;
102                         } else {
103                             clusterServices.tcommit();
104                         }
105                     } catch (Exception e) {
106                         log.error("Exception in replacing nodeConnections ", e);
107                         retry = false;
108                         try {
109                             clusterServices.trollback();
110                         } catch (Exception e1) {}
111                         break;
112                     }
113                 }
114             }
115         }
116         if (retry) {
117             try {
118                 Thread.sleep(100);
119             } catch (InterruptedException e) {
120                 e.printStackTrace();
121             }
122             handleClusterViewChanged();
123         }
124     }
125
126     public Set<Node> getNodes(InetAddress controller) {
127         if (nodeConnections == null) return null;
128         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
129         return controllerNodesMap.get(controller);
130     }
131
132     @SuppressWarnings("deprecation")
133     public Set<Node> getNodes() {
134         return getNodes(clusterServices.getMyAddress());
135     }
136
137     public Set<InetAddress> getControllers(Node node) {
138         if (nodeConnections != null) return nodeConnections.get(node);
139         return null;
140     }
141
142     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
143         return nodeConnections;
144     }
145
146     @SuppressWarnings("deprecation")
147     public boolean isLocal(Node node) {
148         if (nodeConnections == null) return false;
149         InetAddress myController = clusterServices.getMyAddress();
150         Set<InetAddress> controllers = nodeConnections.get(node);
151         return (controllers != null && controllers.contains(myController));
152     }
153
154     @SuppressWarnings("deprecation")
155     public Status removeNode (Node node) {
156         return removeNodeFromController(node, clusterServices.getMyAddress());
157     }
158
159     protected Status removeNodeFromController (Node node, InetAddress controller) {
160         if (node == null || controller == null) {
161             return new Status(StatusCode.BADREQUEST);
162         }
163
164         if (clusterServices == null || nodeConnections == null) {
165             return new Status(StatusCode.SUCCESS);
166         }
167
168         Set<InetAddress> oldControllers = nodeConnections.get(node);
169
170         if (oldControllers != null && oldControllers.contains(controller)) {
171             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
172             if (newControllers.remove(controller)) {
173                 try {
174                     clusterServices.tbegin();
175                     if (newControllers.size() > 0) {
176                         if (!nodeConnections.replace(node, oldControllers, newControllers)) {
177                             clusterServices.trollback();
178                             try {
179                                 Thread.sleep(100);
180                             } catch ( InterruptedException e) {}
181                             return removeNodeFromController(node, controller);
182                         }
183                     } else {
184                         nodeConnections.remove(node);
185                     }
186                     clusterServices.tcommit();
187                 } catch (Exception e) {
188                     log.error("Excepion in removing Controller from a Node", e);
189                     try {
190                         clusterServices.trollback();
191                     } catch (Exception e1) {
192                         log.error("Error Rolling back the node Connections Changes ", e);
193                     }
194                     return new Status(StatusCode.INTERNALERROR);
195                 }
196
197             }
198         }
199         return new Status(StatusCode.SUCCESS);
200
201     }
202
203     /*
204      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
205      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
206      */
207     private Status putNodeToController (Node node, InetAddress controller) {
208         if (clusterServices == null || nodeConnections == null) {
209             return new Status(StatusCode.SUCCESS);
210         }
211         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
212
213         Set <InetAddress> oldControllers = nodeConnections.get(node);
214         Set <InetAddress> newControllers = null;
215         if (oldControllers == null) {
216             newControllers = new HashSet<InetAddress>();
217         } else {
218             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
219                 /*
220                  * In certain race conditions, the putIfAbsent fails to be atomic.
221                  * This check is added to identify such cases and report an warning
222                  * for debugging.
223                  */
224                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
225             }
226             newControllers = new HashSet<InetAddress>(oldControllers);
227         }
228         newControllers.add(controller);
229
230         try {
231             clusterServices.tbegin();
232             if (nodeConnections.putIfAbsent(node, newControllers) != null) {
233                 log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
234                 /*
235                  * This check is needed again to take care of the case where some schemes
236                  * would not allow nodes to be connected to multiple controllers.
237                  * Hence, if putIfAbsent fails, that means, some other controller is competing
238                  * with this controller to take hold of a Node.
239                  */
240                 if (isConnectionAllowed(node)) {
241                     if (!nodeConnections.replace(node, oldControllers, newControllers)) {
242                         clusterServices.trollback();
243                         try {
244                             Thread.sleep(100);
245                         } catch ( InterruptedException e) {}
246                         log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
247                                 controller.getHostAddress(), node.toString());
248                         return putNodeToController(node, controller);
249                     } else {
250                         log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
251                                 controller.getHostAddress(), node.toString());
252                     }
253                 } else {
254                     clusterServices.trollback();
255                     return new Status(StatusCode.CONFLICT);
256                 }
257             } else {
258                 log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
259             }
260             clusterServices.tcommit();
261         } catch (Exception e) {
262             log.error("Excepion in adding Controller to a Node", e);
263             try {
264                 clusterServices.trollback();
265             } catch (Exception e1) {
266                 log.error("Error Rolling back the node Connections Changes ", e);
267             }
268             return new Status(StatusCode.INTERNALERROR);
269         }
270         return new Status(StatusCode.SUCCESS);
271     }
272
273     public Status addNode (Node node, InetAddress controller) {
274         if (node == null || controller == null) {
275             return new Status(StatusCode.BADREQUEST);
276         }
277         if (isLocal(node)) return new Status(StatusCode.SUCCESS);
278         if (isConnectionAllowed(node)) {
279             return putNodeToController(node, controller);
280         } else {
281             return new Status(StatusCode.NOTALLOWED);
282         }
283     }
284
285     @SuppressWarnings("deprecation")
286     public Status addNode (Node node) {
287         return addNode(node, clusterServices.getMyAddress());
288     }
289
290     @SuppressWarnings({ "unchecked", "deprecation" })
291     private void retrieveCaches() {
292         if (this.clusterServices == null) {
293             log.error("un-initialized clusterServices, can't retrieve cache");
294             return;
295         }
296
297         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
298
299         if (nodeConnections == null) {
300             log.error("\nFailed to get caches");
301         }
302     }
303
304     @SuppressWarnings("deprecation")
305     private void allocateCaches() {
306         if (this.clusterServices == null) {
307             log.error("un-initialized clusterServices, can't create cache");
308             return;
309         }
310
311         try {
312             clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
313         } catch (CacheExistException cee) {
314             log.error("\nCache already exists - destroy and recreate if needed");
315         } catch (CacheConfigException cce) {
316             log.error("\nCache configuration invalid - check cache mode");
317         } catch (Exception e) {
318             e.printStackTrace();
319         }
320     }
321 }