a490916da2f6bb65a8e3c2ba288b71f356320386
[controller.git] / opendaylight / connectionmanager / implementation / src / main / java / org / opendaylight / controller / connectionmanager / scheme / AbstractScheme.java
1 package org.opendaylight.controller.connectionmanager.scheme;
2
3 import java.net.InetAddress;
4 import java.util.ArrayList;
5 import java.util.EnumSet;
6 import java.util.HashSet;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Set;
10 import java.util.concurrent.ConcurrentHashMap;
11 import java.util.concurrent.ConcurrentMap;
12
13 import org.opendaylight.controller.clustering.services.CacheConfigException;
14 import org.opendaylight.controller.clustering.services.CacheExistException;
15 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
16 import org.opendaylight.controller.clustering.services.IClusterServices;
17 import org.opendaylight.controller.connectionmanager.ConnectionMgmtScheme;
18 import org.opendaylight.controller.sal.core.Node;
19 import org.opendaylight.controller.sal.utils.Status;
20 import org.opendaylight.controller.sal.utils.StatusCode;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 public abstract class AbstractScheme {
25     private static final Logger log = LoggerFactory.getLogger(AbstractScheme.class);
26     protected IClusterGlobalServices clusterServices = null;
27     /*
28      * A more natural Map data-structure is to have a Key=Controller IP-address with value = a set of Nodes.
29      * But, such a data-structure results in some complex event processing during the Cluster operations
30      * to sync up the Connection states.
31      *
32      * A data-structure with Node as the key and set of controllers provides a good balance
33      * between the needed functionality and simpler clustering implementation for Connection Manager.
34      */
35     protected ConcurrentMap <Node, Set<InetAddress>> nodeConnections;
36     protected abstract boolean isConnectionAllowedInternal(Node node);
37     private String name;
38
39     protected AbstractScheme(IClusterGlobalServices clusterServices, ConnectionMgmtScheme type) {
40         this.clusterServices = clusterServices;
41         if (type != null) name = type.name();
42         else name = "UNKNOWN";
43         if (clusterServices != null) {
44             allocateCaches();
45             retrieveCaches();
46         }
47     }
48
49     protected ConcurrentMap <InetAddress, Set<Node>> getControllerToNodesMap() {
50         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = new ConcurrentHashMap <InetAddress, Set<Node>>();
51         for (Node node : nodeConnections.keySet()) {
52             Set<InetAddress> controllers = nodeConnections.get(node);
53             if (controllers == null) continue;
54             for (InetAddress controller : controllers) {
55                 Set<Node> nodes = controllerNodesMap.get(controller);
56                 if (nodes == null) {
57                     nodes = new HashSet<Node>();
58                 }
59                 nodes.add(node);
60                 controllerNodesMap.put(controller, nodes);
61             }
62         }
63         return controllerNodesMap;
64     }
65
66     public boolean isConnectionAllowed (Node node) {
67         if (clusterServices == null || nodeConnections == null) {
68             return false;
69         }
70
71         return isConnectionAllowedInternal(node);
72     }
73
74     @SuppressWarnings("deprecation")
75     public void handleClusterViewChanged() {
76         List<InetAddress> controllers = clusterServices.getClusteredControllers();
77         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
78         List<InetAddress> toRemove = new ArrayList<InetAddress>();
79         for (InetAddress c : controllerNodesMap.keySet()) {
80             if (!controllers.contains(c)) {
81                 toRemove.add(c);
82             }
83         }
84
85         for (InetAddress c : toRemove) {
86             log.debug("Removing Controller : {} from the Connections table", c);
87             for (Iterator<Node> nodeIterator = nodeConnections.keySet().iterator();nodeIterator.hasNext();) {
88                 Node node = nodeIterator.next();
89                 Set <InetAddress> oldControllers = nodeConnections.get(node);
90                 Set <InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
91                 if (newControllers.remove(c)) {
92                     boolean replaced = false;
93                     try {
94                         replaced = nodeConnections.replace(node, oldControllers, newControllers);
95                     } catch (Exception e) {
96                         log.debug("Replace exception : ", e);
97                         replaced = false;
98                     }
99                     if (!replaced) {
100                         try {
101                             Thread.sleep(10);
102                         } catch (InterruptedException e) {
103                         }
104                         handleClusterViewChanged();
105                     }
106                 }
107             }
108         }
109     }
110
111     public Set<Node> getNodes(InetAddress controller) {
112         if (nodeConnections == null) return null;
113         ConcurrentMap <InetAddress, Set<Node>> controllerNodesMap = getControllerToNodesMap();
114         return controllerNodesMap.get(controller);
115     }
116
117     @SuppressWarnings("deprecation")
118     public Set<Node> getNodes() {
119         return getNodes(clusterServices.getMyAddress());
120     }
121
122     public Set<InetAddress> getControllers(Node node) {
123         if (nodeConnections != null) return nodeConnections.get(node);
124         return null;
125     }
126
127     public ConcurrentMap<Node, Set<InetAddress>> getNodeConnections() {
128         return nodeConnections;
129     }
130
131     @SuppressWarnings("deprecation")
132     public boolean isLocal(Node node) {
133         if (nodeConnections == null) return false;
134         InetAddress myController = clusterServices.getMyAddress();
135         Set<InetAddress> controllers = nodeConnections.get(node);
136         return (controllers != null && controllers.contains(myController));
137     }
138
139     @SuppressWarnings("deprecation")
140     public Status removeNode (Node node) {
141         return removeNodeFromController(node, clusterServices.getMyAddress());
142     }
143
144     protected Status removeNodeFromController (Node node, InetAddress controller) {
145         if (node == null || controller == null) {
146             return new Status(StatusCode.BADREQUEST);
147         }
148
149         if (clusterServices == null || nodeConnections == null) {
150             return new Status(StatusCode.SUCCESS);
151         }
152
153         Set<InetAddress> oldControllers = nodeConnections.get(node);
154
155         if (oldControllers != null && oldControllers.contains(controller)) {
156             Set<InetAddress> newControllers = new HashSet<InetAddress>(oldControllers);
157             if (newControllers.remove(controller)) {
158                 if (newControllers.size() > 0) {
159                     boolean replaced = false;
160                     try {
161                     replaced = nodeConnections.replace(node, oldControllers, newControllers);
162                     } catch (Exception e) {
163                         log.debug("Replace exception : ", e);
164                         replaced = false;
165                     }
166                     if (!replaced) {
167                         try {
168                             Thread.sleep(10);
169                         } catch (InterruptedException e) {
170                         }
171                         return removeNodeFromController(node, controller);
172                     }
173                 } else {
174                     nodeConnections.remove(node);
175                 }
176             }
177         }
178         return new Status(StatusCode.SUCCESS);
179
180     }
181
182     /*
183      * A few race-conditions were seen with the Clustered caches in putIfAbsent and replace
184      * functions. Leaving a few debug logs behind to assist in debugging if strange things happen.
185      */
186     private Status putNodeToController (Node node, InetAddress controller) {
187         if (clusterServices == null || nodeConnections == null) {
188             return new Status(StatusCode.SUCCESS);
189         }
190         log.debug("Trying to Put {} to {}", controller.getHostAddress(), node.toString());
191
192         Set <InetAddress> oldControllers = nodeConnections.get(node);
193         Set <InetAddress> newControllers = null;
194         if (oldControllers == null) {
195             newControllers = new HashSet<InetAddress>();
196         } else {
197             if (oldControllers.size() > 0 && !isConnectionAllowed(node)) {
198                 /*
199                  * In certain race conditions, the putIfAbsent fails to be atomic.
200                  * This check is added to identify such cases and report an warning
201                  * for debugging.
202                  */
203                 log.warn("States Exists for {} : {}", node, oldControllers.toString());
204             }
205             newControllers = new HashSet<InetAddress>(oldControllers);
206         }
207         newControllers.add(controller);
208
209         if (nodeConnections.putIfAbsent(node, newControllers) != null) {
210             log.debug("PutIfAbsent failed {} to {}", controller.getHostAddress(), node.toString());
211             /*
212              * This check is needed again to take care of the case where some schemes
213              * would not allow nodes to be connected to multiple controllers.
214              * Hence, if putIfAbsent fails, that means, some other controller is competing
215              * with this controller to take hold of a Node.
216              */
217             if (isConnectionAllowed(node)) {
218                 log.debug("Trying to replace old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
219                         controller.getHostAddress(), node.toString());
220                 if (!nodeConnections.replace(node, oldControllers, newControllers)) {
221                     try {
222                         Thread.sleep(10);
223                     } catch (InterruptedException e) {
224                     }
225                     log.debug("Replace failed... old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
226                             controller.getHostAddress(), node.toString());
227                     return putNodeToController(node, controller);
228                 } else {
229                     log.debug("Replace successful old={} with new={} for {} to {}", oldControllers.toString(), newControllers.toString(),
230                             controller.getHostAddress(), node.toString());
231                 }
232             } else {
233                 return new Status(StatusCode.CONFLICT);
234             }
235         } else {
236             log.debug("Added {} to {}", controller.getHostAddress(), node.toString());
237         }
238         return new Status(StatusCode.SUCCESS);
239     }
240
241     public Status addNode (Node node, InetAddress controller) {
242         if (node == null || controller == null) {
243             return new Status(StatusCode.BADREQUEST);
244         }
245         if (isLocal(node)) return new Status(StatusCode.SUCCESS);
246         if (isConnectionAllowed(node)) {
247             return putNodeToController(node, controller);
248         } else {
249             return new Status(StatusCode.NOTALLOWED);
250         }
251     }
252
253     @SuppressWarnings("deprecation")
254     public Status addNode (Node node) {
255         return addNode(node, clusterServices.getMyAddress());
256     }
257
258     @SuppressWarnings({ "unchecked", "deprecation" })
259     private void retrieveCaches() {
260         if (this.clusterServices == null) {
261             log.error("un-initialized clusterServices, can't retrieve cache");
262             return;
263         }
264
265         nodeConnections = (ConcurrentMap<Node, Set<InetAddress>>) clusterServices.getCache("connectionmanager."+name+".nodeconnections");
266
267         if (nodeConnections == null) {
268             log.error("\nFailed to get caches");
269         }
270     }
271
272     @SuppressWarnings("deprecation")
273     private void allocateCaches() {
274         if (this.clusterServices == null) {
275             log.error("un-initialized clusterServices, can't create cache");
276             return;
277         }
278
279         try {
280             clusterServices.createCache("connectionmanager."+name+".nodeconnections", EnumSet.of(IClusterServices.cacheMode.NON_TRANSACTIONAL));
281         } catch (CacheExistException cee) {
282             log.error("\nCache already exists - destroy and recreate if needed");
283         } catch (CacheConfigException cce) {
284             log.error("\nCache configuration invalid - check cache mode");
285         } catch (Exception e) {
286             e.printStackTrace();
287         }
288     }
289 }