Merge changes I52e0dd0d,I6dbf3316,Iafae27bc,Ibbb1250b,Icdb56d14,I7ede1482,Ib335fd1d...
[controller.git] / opendaylight / md-sal / remoterpc-routingtable / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / impl / RoutingTableImpl.java
1 /*
2  * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.controller.sal.connector.remoterpc.impl;
10
11 import java.util.Collections;
12 import java.util.EnumSet;
13 import java.util.Iterator;
14 import java.util.LinkedHashSet;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.ConcurrentMap;
18
19 import javax.transaction.HeuristicMixedException;
20 import javax.transaction.HeuristicRollbackException;
21 import javax.transaction.NotSupportedException;
22 import javax.transaction.RollbackException;
23
24 import org.apache.felix.dm.Component;
25 import org.opendaylight.controller.clustering.services.CacheConfigException;
26 import org.opendaylight.controller.clustering.services.CacheExistException;
27 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
28 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
29 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
30 import org.opendaylight.controller.clustering.services.IClusterServices;
31 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
32 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
33 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.base.Preconditions;
38 import com.google.common.collect.ImmutableSet;
39
40 public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
41
42     private final Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
43
44     private IClusterGlobalServices clusterGlobalServices = null;
45
46     private ConcurrentMap<I,R> globalRpcCache = null;
47     private ConcurrentMap<I, LinkedHashSet<R>> rpcCache = null;  //need routes to ordered by insert-order
48
49     public static final String GLOBALRPC_CACHE = "remoterpc_routingtable.globalrpc_cache";
50     public static final String RPC_CACHE = "remoterpc_routingtable.rpc_cache";
51
52     public RoutingTableImpl() {
53     }
54
55     @Override
56     public R getGlobalRoute(final I routeId) throws RoutingTableException, SystemException {
57         Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
58         return globalRpcCache.get(routeId);
59     }
60
61     @Override
62     public void addGlobalRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
63         Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
64         Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
65         try {
66
67             log.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
68             clusterGlobalServices.tbegin();
69             if (globalRpcCache.putIfAbsent(routeId, route) != null) {
70                 throw new DuplicateRouteException(" There is already existing route " + routeId);
71             }
72             clusterGlobalServices.tcommit();
73
74         } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
75             throw new RoutingTableException("Transaction error - while trying to create route id="
76                     + routeId + "with route" + route, e);
77         } catch (javax.transaction.SystemException e) {
78             throw new SystemException("System error occurred - while trying to create with value", e);
79         }
80
81     }
82
83     @Override
84     public void removeGlobalRoute(final I routeId) throws RoutingTableException, SystemException {
85         Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
86         try {
87             log.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
88
89             clusterGlobalServices.tbegin();
90             globalRpcCache.remove(routeId);
91             clusterGlobalServices.tcommit();
92
93         } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
94             throw new RoutingTableException("Transaction error - while trying to remove route id="
95                     + routeId, e);
96         } catch (javax.transaction.SystemException e) {
97             throw new SystemException("System error occurred - while trying to remove with value", e);
98         }
99     }
100
101
102     @Override
103     public Set<R> getRoutes(final I routeId) {
104         Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
105         Set<R> routes = rpcCache.get(routeId);
106
107         if (routes == null) {
108             return Collections.emptySet();
109         }
110
111         return ImmutableSet.copyOf(routes);
112     }
113
114
115
116     @Override
117     public R getLastAddedRoute(final I routeId) {
118
119         Set<R> routes = getRoutes(routeId);
120
121         if (routes.isEmpty()) {
122             return null;
123         }
124
125         R route = null;
126         Iterator<R> iter = routes.iterator();
127         while (iter.hasNext()) {
128             route = iter.next();
129         }
130
131         return route;
132     }
133
134     @Override
135     public void addRoute(final I routeId, final R route)  throws RoutingTableException, SystemException {
136         Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
137         Preconditions.checkNotNull(route, "addRoute: route cannot be null");
138
139         try{
140             clusterGlobalServices.tbegin();
141             log.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
142             threadSafeAdd(routeId, route);
143             clusterGlobalServices.tcommit();
144
145         } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
146             throw new RoutingTableException("Transaction error - while trying to remove route id="
147                     + routeId, e);
148         } catch (javax.transaction.SystemException e) {
149             throw new SystemException("System error occurred - while trying to remove with value", e);
150         }
151     }
152
153     @Override
154     public void addRoutes(final Set<I> routeIds, final R route) throws RoutingTableException, SystemException {
155         Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
156         for (I routeId : routeIds){
157             addRoute(routeId, route);
158         }
159     }
160
161     @Override
162     public void removeRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
163         Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
164         Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
165
166         LinkedHashSet<R> routes = rpcCache.get(routeId);
167         if (routes == null) {
168             return;
169         }
170
171         try {
172             log.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
173
174             clusterGlobalServices.tbegin();
175             threadSafeRemove(routeId, route);
176             clusterGlobalServices.tcommit();
177
178         } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
179             throw new RoutingTableException("Transaction error - while trying to remove route id="
180                     + routeId, e);
181         } catch (javax.transaction.SystemException e) {
182             throw new SystemException("System error occurred - while trying to remove with value", e);
183         }
184     }
185
186     @Override
187     public void removeRoutes(final Set<I> routeIds, final R route) throws RoutingTableException, SystemException {
188         Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
189         for (I routeId : routeIds){
190             removeRoute(routeId, route);
191         }
192     }
193
194     /**
195      * This method guarantees that no 2 thread over write each other's changes.
196      * Just so that we dont end up in infinite loop, it tries for 100 times then throw
197      */
198     private void threadSafeAdd(final I routeId, final R route) {
199
200         for (int i=0;i<100;i++){
201
202             LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
203             updatedRoutes.add(route);
204             LinkedHashSet<R> oldRoutes = rpcCache.putIfAbsent(routeId, updatedRoutes);
205             if (oldRoutes == null) {
206                 return;
207             }
208
209             updatedRoutes = new LinkedHashSet<>(oldRoutes);
210             updatedRoutes.add(route);
211
212             if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) {
213                 return;
214             }
215         }
216         //the method did not already return means it failed to add route in 10 attempts
217         throw new IllegalStateException("Failed to add route [" + routeId + "]");
218     }
219
220     /**
221      * This method guarantees that no 2 thread over write each other's changes.
222      * Just so that we dont end up in infinite loop, it tries for 10 times then throw
223      */
224     private void threadSafeRemove(final I routeId, final R route) {
225         LinkedHashSet<R> updatedRoutes = null;
226         for (int i=0;i<10;i++){
227             LinkedHashSet<R> oldRoutes = rpcCache.get(routeId);
228
229             // if route to be deleted is the only entry in the set then remove routeId from the cache
230             if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
231                 rpcCache.remove(routeId);
232                 return;
233             }
234
235             // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
236             updatedRoutes = new LinkedHashSet<>(oldRoutes);
237             updatedRoutes.remove(route);
238             if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) {
239                 return;
240             }
241
242         }
243         //the method did not already return means it failed to remove route in 10 attempts
244         throw new IllegalStateException("Failed to remove route [" + routeId + "]");
245     }
246
247
248     //    /**
249     //     * @deprecated doesn't do anything will be removed once listeners used
250     //     *             whiteboard pattern Registers listener for sending any change
251     //     *             notification
252     //     * @param listener
253     //     */
254     //    @Override
255     //    public void registerRouteChangeListener(RouteChangeListener listener) {
256     //
257     //    }
258
259     //    public void setRouteChangeListener(RouteChangeListener rcl) {
260     //        if(rcl != null){
261     //            routeChangeListeners.add(rcl);
262     //        }else{
263     //            log.warn("setRouteChangeListener called with null listener");
264     //        }
265     //    }
266     //
267     //    public void unSetRouteChangeListener(RouteChangeListener rcl) {
268     //        if(rcl != null){
269     //         routeChangeListeners.remove(rcl);
270     //        }else{
271     //            log.warn("unSetRouteChangeListener called with null listener");
272     //        }
273     //    }
274
275     /**
276      * Returning the set of route change listeners for Unit testing Note: the
277      * package scope is default
278      *
279      * @return List of registered RouteChangeListener<I,R> listeners
280      */
281     //    Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
282     //        return routeChangeListeners;
283     //    }
284     public void setClusterGlobalServices(final IClusterGlobalServices clusterGlobalServices) {
285         this.clusterGlobalServices = clusterGlobalServices;
286     }
287
288     public void unsetClusterGlobalServices(final IClusterGlobalServices clusterGlobalServices) {
289         if ((clusterGlobalServices != null) && (this.clusterGlobalServices.equals(clusterGlobalServices))) {
290             this.clusterGlobalServices = null;
291         }
292     }
293
294     /**
295      * Finds OR Creates clustered cache for Global RPCs
296      *
297      * @throws CacheExistException       -- cluster global services exception when cache exist
298      * @throws CacheConfigException      -- cluster global services exception during cache config
299      * @throws CacheListenerAddException -- cluster global services exception during adding of listener
300      */
301
302     @SuppressWarnings("unchecked")
303     void findOrCreateGlobalRpcCache() throws CacheExistException, CacheConfigException,
304     CacheListenerAddException {
305         // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
306         // should be caching?
307
308         // let us check here if the cache already exists -- if so don't create
309         if (!clusterGlobalServices.existCache(GLOBALRPC_CACHE)) {
310
311             globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.createCache(GLOBALRPC_CACHE,
312                     EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
313             log.debug("Cache created [{}] ", GLOBALRPC_CACHE);
314
315         } else {
316             globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.getCache(GLOBALRPC_CACHE);
317             log.debug("Cache exists [{}] ", GLOBALRPC_CACHE);
318         }
319     }
320
321     /**
322      * Finds OR Creates clustered cache for Routed RPCs
323      *
324      * @throws CacheExistException       -- cluster global services exception when cache exist
325      * @throws CacheConfigException      -- cluster global services exception during cache config
326      * @throws CacheListenerAddException -- cluster global services exception during adding of listener
327      */
328
329     @SuppressWarnings("unchecked")
330     void findOrCreateRpcCache() throws CacheExistException, CacheConfigException,
331     CacheListenerAddException {
332         // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
333         // should be caching?
334
335         if (clusterGlobalServices.existCache(RPC_CACHE)){
336             rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.getCache(RPC_CACHE);
337             log.debug("Cache exists [{}] ", RPC_CACHE);
338             return;
339         }
340
341         //cache doesnt exist, create one
342         rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
343                 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
344         log.debug("Cache created [{}] ", RPC_CACHE);
345     }
346
347
348     /**
349      * Function called by the dependency manager when all the required
350      * dependencies are satisfied
351      */
352     void init(final Component c) {
353         try {
354
355             findOrCreateGlobalRpcCache();
356             findOrCreateRpcCache();
357
358         } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
359             throw new IllegalStateException("could not construct routing table cache");
360         }
361     }
362
363     /**
364      * Useful for unit testing <note>It has package
365      * scope</note>
366      */
367     ConcurrentMap<I, R> getGlobalRpcCache() {
368         return this.globalRpcCache;
369     }
370
371     /**
372      * Useful for unit testing <note>It has package
373      * scope</note>
374      */
375     ConcurrentMap<I, LinkedHashSet<R>> getRpcCache() {
376         return this.rpcCache;
377     }
378
379     /**
380      * This is used from integration test NP rest API to check out the result of the
381      * cache population
382      * <Note> For testing purpose only-- use it wisely</Note>
383      *
384      * @return
385      */
386     public String dumpGlobalRpcCache() {
387         Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
388         StringBuilder sb = new StringBuilder();
389         for (Map.Entry<I, R> entry : cacheEntrySet) {
390             sb.append("Key:").append(entry.getKey()).append("---->Value:")
391             .append((entry.getValue() != null) ? entry.getValue() : "null")
392             .append("\n");
393         }
394         return sb.toString();
395     }
396
397     public String dumpRpcCache() {
398         Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
399         StringBuilder sb = new StringBuilder();
400         for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
401             sb.append("Key:").append(entry.getKey()).append("---->Value:")
402             .append((entry.getValue() != null) ? entry.getValue() : "null")
403             .append("\n");
404         }
405         return sb.toString();
406     }
407     /**
408      * Invoked when a new entry is available in the cache, the key is only
409      * provided, the value will come as an entryUpdate invocation
410      *
411      * @param key         Key for the entry just created
412      * @param cacheName   name of the cache for which update has been received
413      * @param originLocal true if the event is generated from this node
414      */
415     @Override
416     public void entryCreated(final I key, final String cacheName, final boolean originLocal) {
417         // TBD: do we require this.
418         if (log.isDebugEnabled()) {
419             log.debug("RoutingTableUpdates: entryCreated  routeId = " + key + " cacheName=" + cacheName);
420         }
421     }
422
423     /**
424      * Called anytime a given entry is updated
425      *
426      * @param key         Key for the entry modified
427      * @param new_value   the new value the key will have
428      * @param cacheName   name of the cache for which update has been received
429      * @param originLocal true if the event is generated from this node
430      */
431     @Override
432     public void entryUpdated(final I key, final R new_value, final String cacheName, final boolean originLocal) {
433         if (log.isDebugEnabled()) {
434             log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + ",value = " + new_value
435                     + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
436         }
437         //        if (!originLocal) {
438         //            for (RouteChangeListener rcl : routeChangeListeners) {
439         //                rcl.onRouteUpdated(key, new_value);
440         //            }
441         //        }
442     }
443
444     /**
445      * Called anytime a given key is removed from the ConcurrentHashMap we are
446      * listening to.
447      *
448      * @param key         Key of the entry removed
449      * @param cacheName   name of the cache for which update has been received
450      * @param originLocal true if the event is generated from this node
451      */
452     @Override
453     public void entryDeleted(final I key, final String cacheName, final boolean originLocal) {
454         if (log.isDebugEnabled()) {
455             log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + " local = " + originLocal
456                     + " cacheName=" + cacheName + " originLocal=" + originLocal);
457         }
458         //        if (!originLocal) {
459         //            for (RouteChangeListener rcl : routeChangeListeners) {
460         //                rcl.onRouteDeleted(key);
461         //            }
462         //        }
463     }
464 }