Merge changes I63383291,I1c9f10e9,I9cac529f,I269d373b,I7ede3ba5,I4afc1e15
[controller.git] / opendaylight / md-sal / remoterpc-routingtable / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / impl / RoutingTableImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc.impl;
10
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableSet;
14 import org.apache.felix.dm.Component;
15 import org.opendaylight.controller.clustering.services.*;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
21
22 import javax.transaction.HeuristicMixedException;
23 import javax.transaction.HeuristicRollbackException;
24 import javax.transaction.NotSupportedException;
25 import javax.transaction.RollbackException;
26 import java.util.*;
27 import java.util.concurrent.ConcurrentMap;
28
29 public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
30
31   private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
32
33   private IClusterGlobalServices clusterGlobalServices = null;
34
35   private ConcurrentMap<I,R> globalRpcCache = null;
36   private ConcurrentMap<I, LinkedHashSet<R>> rpcCache = null;  //need routes to ordered by insert-order
37
38   public static final String GLOBALRPC_CACHE = "remoterpc_routingtable.globalrpc_cache";
39   public static final String RPC_CACHE = "remoterpc_routingtable.rpc_cache";
40
41   public RoutingTableImpl() {
42   }
43
44   @Override
45   public R getGlobalRoute(I routeId) throws RoutingTableException, SystemException {
46     Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
47     return globalRpcCache.get(routeId);
48   }
49
50   @Override
51   public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
52     Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
53     Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
54     try {
55
56       log.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
57       clusterGlobalServices.tbegin();
58       if (globalRpcCache.putIfAbsent(routeId, route) != null) {
59         throw new DuplicateRouteException(" There is already existing route " + routeId);
60       }
61       clusterGlobalServices.tcommit();
62
63     } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
64       throw new RoutingTableException("Transaction error - while trying to create route id="
65           + routeId + "with route" + route, e);
66     } catch (javax.transaction.SystemException e) {
67       throw new SystemException("System error occurred - while trying to create with value", e);
68     }
69
70   }
71
72   @Override
73   public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
74     Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
75     try {
76       log.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
77
78       clusterGlobalServices.tbegin();
79       globalRpcCache.remove(routeId);
80       clusterGlobalServices.tcommit();
81
82     } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
83       throw new RoutingTableException("Transaction error - while trying to remove route id="
84           + routeId, e);
85     } catch (javax.transaction.SystemException e) {
86       throw new SystemException("System error occurred - while trying to remove with value", e);
87     }
88   }
89
90
91   @Override
92   public Set<R> getRoutes(I routeId) {
93     Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
94     Set<R> routes = rpcCache.get(routeId);
95
96     if (routes == null) return Collections.emptySet();
97
98     return ImmutableSet.copyOf(routes);
99   }
100
101
102
103   public R getLastAddedRoute(I routeId) {
104
105     Set<R> routes = getRoutes(routeId);
106
107     if (routes.isEmpty()) return null;
108
109     R route = null;
110     Iterator<R> iter = routes.iterator();
111     while (iter.hasNext())
112       route = iter.next();
113
114     return route;
115   }
116
117   @Override
118   public void addRoute(I routeId, R route)  throws RoutingTableException, SystemException {
119     Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
120     Preconditions.checkNotNull(route, "addRoute: route cannot be null");
121
122     try{
123       clusterGlobalServices.tbegin();
124       log.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
125       threadSafeAdd(routeId, route);
126       clusterGlobalServices.tcommit();
127
128     } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
129       throw new RoutingTableException("Transaction error - while trying to remove route id="
130           + routeId, e);
131     } catch (javax.transaction.SystemException e) {
132       throw new SystemException("System error occurred - while trying to remove with value", e);
133     }
134   }
135
136   @Override
137   public void addRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
138     Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
139     for (I routeId : routeIds){
140       addRoute(routeId, route);
141     }
142   }
143
144   @Override
145   public void removeRoute(I routeId, R route) throws RoutingTableException, SystemException {
146     Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
147     Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
148
149     LinkedHashSet<R> routes = rpcCache.get(routeId);
150     if (routes == null) return;
151
152     try {
153       log.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
154
155       clusterGlobalServices.tbegin();
156       threadSafeRemove(routeId, route);
157       clusterGlobalServices.tcommit();
158
159     } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
160       throw new RoutingTableException("Transaction error - while trying to remove route id="
161           + routeId, e);
162     } catch (javax.transaction.SystemException e) {
163       throw new SystemException("System error occurred - while trying to remove with value", e);
164     }
165   }
166
167   @Override
168   public void removeRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
169     Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
170     for (I routeId : routeIds){
171       removeRoute(routeId, route);
172     }
173   }
174
175   /**
176    * This method guarantees that no 2 thread over write each other's changes.
177    * Just so that we dont end up in infinite loop, it tries for 100 times then throw
178    */
179   private void threadSafeAdd(I routeId, R route) {
180
181     for (int i=0;i<100;i++){
182
183       LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
184       updatedRoutes.add(route);
185       LinkedHashSet<R> oldRoutes = rpcCache.putIfAbsent(routeId, updatedRoutes);
186       if (oldRoutes == null) return;
187
188       updatedRoutes = new LinkedHashSet<>(oldRoutes);
189       updatedRoutes.add(route);
190
191       if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
192     }
193     //the method did not already return means it failed to add route in 10 attempts
194     throw new IllegalStateException("Failed to add route [" + routeId + "]");
195   }
196
197   /**
198    * This method guarantees that no 2 thread over write each other's changes.
199    * Just so that we dont end up in infinite loop, it tries for 10 times then throw
200    */
201   private void threadSafeRemove(I routeId, R route) {
202     LinkedHashSet<R> updatedRoutes = null;
203     for (int i=0;i<10;i++){
204       LinkedHashSet<R> oldRoutes = rpcCache.get(routeId);
205
206       // if route to be deleted is the only entry in the set then remove routeId from the cache
207       if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
208         rpcCache.remove(routeId);
209         return;
210       }
211
212       // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
213       updatedRoutes = new LinkedHashSet<>(oldRoutes);
214       updatedRoutes.remove(route);
215       if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
216
217     }
218     //the method did not already return means it failed to remove route in 10 attempts
219     throw new IllegalStateException("Failed to remove route [" + routeId + "]");
220   }
221
222
223 //    /**
224 //     * @deprecated doesn't do anything will be removed once listeners used
225 //     *             whiteboard pattern Registers listener for sending any change
226 //     *             notification
227 //     * @param listener
228 //     */
229 //    @Override
230 //    public void registerRouteChangeListener(RouteChangeListener listener) {
231 //
232 //    }
233
234 //    public void setRouteChangeListener(RouteChangeListener rcl) {
235 //        if(rcl != null){
236 //            routeChangeListeners.add(rcl);
237 //        }else{
238 //            log.warn("setRouteChangeListener called with null listener");
239 //        }
240 //    }
241 //
242 //    public void unSetRouteChangeListener(RouteChangeListener rcl) {
243 //        if(rcl != null){
244 //         routeChangeListeners.remove(rcl);
245 //        }else{
246 //            log.warn("unSetRouteChangeListener called with null listener");
247 //        }
248 //    }
249
250   /**
251    * Returning the set of route change listeners for Unit testing Note: the
252    * package scope is default
253    *
254    * @return List of registered RouteChangeListener<I,R> listeners
255    */
256 //    Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
257 //        return routeChangeListeners;
258 //    }
259   public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
260     this.clusterGlobalServices = clusterGlobalServices;
261   }
262
263   public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
264     if ((clusterGlobalServices != null) && (this.clusterGlobalServices.equals(clusterGlobalServices))) {
265       this.clusterGlobalServices = null;
266     }
267   }
268
269   /**
270    * Finds OR Creates clustered cache for Global RPCs
271    *
272    * @throws CacheExistException       -- cluster global services exception when cache exist
273    * @throws CacheConfigException      -- cluster global services exception during cache config
274    * @throws CacheListenerAddException -- cluster global services exception during adding of listener
275    */
276
277   void findOrCreateGlobalRpcCache() throws CacheExistException, CacheConfigException,
278       CacheListenerAddException {
279     // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
280     // should be caching?
281
282     // let us check here if the cache already exists -- if so don't create
283     if (!clusterGlobalServices.existCache(GLOBALRPC_CACHE)) {
284
285       globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.createCache(GLOBALRPC_CACHE,
286           EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
287       log.debug("Cache created [{}] ", GLOBALRPC_CACHE);
288
289     } else {
290       globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.getCache(GLOBALRPC_CACHE);
291       log.debug("Cache exists [{}] ", GLOBALRPC_CACHE);
292     }
293   }
294
295   /**
296    * Finds OR Creates clustered cache for Routed RPCs
297    *
298    * @throws CacheExistException       -- cluster global services exception when cache exist
299    * @throws CacheConfigException      -- cluster global services exception during cache config
300    * @throws CacheListenerAddException -- cluster global services exception during adding of listener
301    */
302
303   void findOrCreateRpcCache() throws CacheExistException, CacheConfigException,
304       CacheListenerAddException {
305     // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
306     // should be caching?
307
308     if (clusterGlobalServices.existCache(RPC_CACHE)){
309       rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.getCache(RPC_CACHE);
310       log.debug("Cache exists [{}] ", RPC_CACHE);
311       return;
312     }
313
314     //cache doesnt exist, create one
315     rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
316           EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
317     log.debug("Cache created [{}] ", RPC_CACHE);
318   }
319
320
321   /**
322    * Function called by the dependency manager when all the required
323    * dependencies are satisfied
324    */
325   void init(Component c) {
326     try {
327
328       findOrCreateGlobalRpcCache();
329       findOrCreateRpcCache();
330
331     } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
332       throw new IllegalStateException("could not construct routing table cache");
333     }
334   }
335
336   /**
337    * Useful for unit testing <note>It has package
338    * scope</note>
339    */
340   ConcurrentMap getGlobalRpcCache() {
341     return this.globalRpcCache;
342   }
343
344   /**
345    * Useful for unit testing <note>It has package
346    * scope</note>
347    */
348   ConcurrentMap getRpcCache() {
349     return this.rpcCache;
350   }
351
352   /**
353    * This is used from integration test NP rest API to check out the result of the
354    * cache population
355    * <Note> For testing purpose only-- use it wisely</Note>
356    *
357    * @return
358    */
359   public String dumpGlobalRpcCache() {
360     Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
361     StringBuilder sb = new StringBuilder();
362     for (Map.Entry<I, R> entry : cacheEntrySet) {
363       sb.append("Key:").append(entry.getKey()).append("---->Value:")
364           .append((entry.getValue() != null) ? entry.getValue() : "null")
365           .append("\n");
366     }
367     return sb.toString();
368   }
369
370   public String dumpRpcCache() {
371     Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
372     StringBuilder sb = new StringBuilder();
373     for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
374       sb.append("Key:").append(entry.getKey()).append("---->Value:")
375           .append((entry.getValue() != null) ? entry.getValue() : "null")
376           .append("\n");
377     }
378     return sb.toString();
379   }
380   /**
381    * Invoked when a new entry is available in the cache, the key is only
382    * provided, the value will come as an entryUpdate invocation
383    *
384    * @param key         Key for the entry just created
385    * @param cacheName   name of the cache for which update has been received
386    * @param originLocal true if the event is generated from this node
387    */
388   @Override
389   public void entryCreated(I key, String cacheName, boolean originLocal) {
390     // TBD: do we require this.
391     if (log.isDebugEnabled()) {
392       log.debug("RoutingTableUpdates: entryCreated  routeId = " + key + " cacheName=" + cacheName);
393     }
394   }
395
396   /**
397    * Called anytime a given entry is updated
398    *
399    * @param key         Key for the entry modified
400    * @param new_value   the new value the key will have
401    * @param cacheName   name of the cache for which update has been received
402    * @param originLocal true if the event is generated from this node
403    */
404   @Override
405   public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
406     if (log.isDebugEnabled()) {
407       log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + ",value = " + new_value
408           + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
409     }
410 //        if (!originLocal) {
411 //            for (RouteChangeListener rcl : routeChangeListeners) {
412 //                rcl.onRouteUpdated(key, new_value);
413 //            }
414 //        }
415   }
416
417   /**
418    * Called anytime a given key is removed from the ConcurrentHashMap we are
419    * listening to.
420    *
421    * @param key         Key of the entry removed
422    * @param cacheName   name of the cache for which update has been received
423    * @param originLocal true if the event is generated from this node
424    */
425   @Override
426   public void entryDeleted(I key, String cacheName, boolean originLocal) {
427     if (log.isDebugEnabled()) {
428       log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + " local = " + originLocal
429           + " cacheName=" + cacheName + " originLocal=" + originLocal);
430     }
431 //        if (!originLocal) {
432 //            for (RouteChangeListener rcl : routeChangeListeners) {
433 //                rcl.onRouteDeleted(key);
434 //            }
435 //        }
436   }
437 }