2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc.impl;
11 import java.util.Collections;
12 import java.util.EnumSet;
13 import java.util.Iterator;
14 import java.util.LinkedHashSet;
17 import java.util.concurrent.ConcurrentMap;
19 import javax.transaction.HeuristicMixedException;
20 import javax.transaction.HeuristicRollbackException;
21 import javax.transaction.NotSupportedException;
22 import javax.transaction.RollbackException;
24 import org.apache.felix.dm.Component;
25 import org.opendaylight.controller.clustering.services.CacheConfigException;
26 import org.opendaylight.controller.clustering.services.CacheExistException;
27 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
28 import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
29 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
30 import org.opendaylight.controller.clustering.services.IClusterServices;
31 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
32 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
33 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
37 import com.google.common.base.Preconditions;
38 import com.google.common.collect.ImmutableSet;
40 public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
42 private final Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
44 private IClusterGlobalServices clusterGlobalServices = null;
46 private ConcurrentMap<I,R> globalRpcCache = null;
47 private ConcurrentMap<I, LinkedHashSet<R>> rpcCache = null; //need routes to ordered by insert-order
49 public static final String GLOBALRPC_CACHE = "remoterpc_routingtable.globalrpc_cache";
50 public static final String RPC_CACHE = "remoterpc_routingtable.rpc_cache";
52 public RoutingTableImpl() {
56 public R getGlobalRoute(final I routeId) throws RoutingTableException, SystemException {
57 Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
58 return globalRpcCache.get(routeId);
62 public void addGlobalRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
63 Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
64 Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
67 log.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route);
68 clusterGlobalServices.tbegin();
69 if (globalRpcCache.putIfAbsent(routeId, route) != null) {
70 throw new DuplicateRouteException(" There is already existing route " + routeId);
72 clusterGlobalServices.tcommit();
74 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
75 throw new RoutingTableException("Transaction error - while trying to create route id="
76 + routeId + "with route" + route, e);
77 } catch (javax.transaction.SystemException e) {
78 throw new SystemException("System error occurred - while trying to create with value", e);
84 public void removeGlobalRoute(final I routeId) throws RoutingTableException, SystemException {
85 Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
87 log.debug("removeGlobalRoute: removing a new route with id [{}]", routeId);
89 clusterGlobalServices.tbegin();
90 globalRpcCache.remove(routeId);
91 clusterGlobalServices.tcommit();
93 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
94 throw new RoutingTableException("Transaction error - while trying to remove route id="
96 } catch (javax.transaction.SystemException e) {
97 throw new SystemException("System error occurred - while trying to remove with value", e);
103 public Set<R> getRoutes(final I routeId) {
104 Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
105 Set<R> routes = rpcCache.get(routeId);
107 if (routes == null) {
108 return Collections.emptySet();
111 return ImmutableSet.copyOf(routes);
117 public R getLastAddedRoute(final I routeId) {
119 Set<R> routes = getRoutes(routeId);
121 if (routes.isEmpty()) {
126 Iterator<R> iter = routes.iterator();
127 while (iter.hasNext()) {
135 public void addRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
136 Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
137 Preconditions.checkNotNull(route, "addRoute: route cannot be null");
140 clusterGlobalServices.tbegin();
141 log.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
142 threadSafeAdd(routeId, route);
143 clusterGlobalServices.tcommit();
145 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
146 throw new RoutingTableException("Transaction error - while trying to remove route id="
148 } catch (javax.transaction.SystemException e) {
149 throw new SystemException("System error occurred - while trying to remove with value", e);
154 public void addRoutes(final Set<I> routeIds, final R route) throws RoutingTableException, SystemException {
155 Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
156 for (I routeId : routeIds){
157 addRoute(routeId, route);
162 public void removeRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
163 Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
164 Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
166 LinkedHashSet<R> routes = rpcCache.get(routeId);
167 if (routes == null) {
172 log.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route);
174 clusterGlobalServices.tbegin();
175 threadSafeRemove(routeId, route);
176 clusterGlobalServices.tcommit();
178 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
179 throw new RoutingTableException("Transaction error - while trying to remove route id="
181 } catch (javax.transaction.SystemException e) {
182 throw new SystemException("System error occurred - while trying to remove with value", e);
187 public void removeRoutes(final Set<I> routeIds, final R route) throws RoutingTableException, SystemException {
188 Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
189 for (I routeId : routeIds){
190 removeRoute(routeId, route);
195 * This method guarantees that no 2 thread over write each other's changes.
196 * Just so that we dont end up in infinite loop, it tries for 100 times then throw
198 private void threadSafeAdd(final I routeId, final R route) {
200 for (int i=0;i<100;i++){
202 LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
203 updatedRoutes.add(route);
204 LinkedHashSet<R> oldRoutes = rpcCache.putIfAbsent(routeId, updatedRoutes);
205 if (oldRoutes == null) {
209 updatedRoutes = new LinkedHashSet<>(oldRoutes);
210 updatedRoutes.add(route);
212 if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) {
216 //the method did not already return means it failed to add route in 10 attempts
217 throw new IllegalStateException("Failed to add route [" + routeId + "]");
221 * This method guarantees that no 2 thread over write each other's changes.
222 * Just so that we dont end up in infinite loop, it tries for 10 times then throw
224 private void threadSafeRemove(final I routeId, final R route) {
225 LinkedHashSet<R> updatedRoutes = null;
226 for (int i=0;i<10;i++){
227 LinkedHashSet<R> oldRoutes = rpcCache.get(routeId);
229 // if route to be deleted is the only entry in the set then remove routeId from the cache
230 if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
231 rpcCache.remove(routeId);
235 // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
236 updatedRoutes = new LinkedHashSet<>(oldRoutes);
237 updatedRoutes.remove(route);
238 if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) {
243 //the method did not already return means it failed to remove route in 10 attempts
244 throw new IllegalStateException("Failed to remove route [" + routeId + "]");
249 // * @deprecated doesn't do anything will be removed once listeners used
250 // * whiteboard pattern Registers listener for sending any change
255 // public void registerRouteChangeListener(RouteChangeListener listener) {
259 // public void setRouteChangeListener(RouteChangeListener rcl) {
261 // routeChangeListeners.add(rcl);
263 // log.warn("setRouteChangeListener called with null listener");
267 // public void unSetRouteChangeListener(RouteChangeListener rcl) {
269 // routeChangeListeners.remove(rcl);
271 // log.warn("unSetRouteChangeListener called with null listener");
276 * Returning the set of route change listeners for Unit testing Note: the
277 * package scope is default
279 * @return List of registered RouteChangeListener<I,R> listeners
281 // Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
282 // return routeChangeListeners;
284 public void setClusterGlobalServices(final IClusterGlobalServices clusterGlobalServices) {
285 this.clusterGlobalServices = clusterGlobalServices;
288 public void unsetClusterGlobalServices(final IClusterGlobalServices clusterGlobalServices) {
289 if ((clusterGlobalServices != null) && (this.clusterGlobalServices.equals(clusterGlobalServices))) {
290 this.clusterGlobalServices = null;
295 * Finds OR Creates clustered cache for Global RPCs
297 * @throws CacheExistException -- cluster global services exception when cache exist
298 * @throws CacheConfigException -- cluster global services exception during cache config
299 * @throws CacheListenerAddException -- cluster global services exception during adding of listener
302 @SuppressWarnings("unchecked")
303 void findOrCreateGlobalRpcCache() throws CacheExistException, CacheConfigException,
304 CacheListenerAddException {
305 // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
306 // should be caching?
308 // let us check here if the cache already exists -- if so don't create
309 if (!clusterGlobalServices.existCache(GLOBALRPC_CACHE)) {
311 globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.createCache(GLOBALRPC_CACHE,
312 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
313 log.debug("Cache created [{}] ", GLOBALRPC_CACHE);
316 globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.getCache(GLOBALRPC_CACHE);
317 log.debug("Cache exists [{}] ", GLOBALRPC_CACHE);
322 * Finds OR Creates clustered cache for Routed RPCs
324 * @throws CacheExistException -- cluster global services exception when cache exist
325 * @throws CacheConfigException -- cluster global services exception during cache config
326 * @throws CacheListenerAddException -- cluster global services exception during adding of listener
329 @SuppressWarnings("unchecked")
330 void findOrCreateRpcCache() throws CacheExistException, CacheConfigException,
331 CacheListenerAddException {
332 // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
333 // should be caching?
335 if (clusterGlobalServices.existCache(RPC_CACHE)){
336 rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.getCache(RPC_CACHE);
337 log.debug("Cache exists [{}] ", RPC_CACHE);
341 //cache doesnt exist, create one
342 rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
343 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
344 log.debug("Cache created [{}] ", RPC_CACHE);
349 * Function called by the dependency manager when all the required
350 * dependencies are satisfied
352 void init(final Component c) {
355 findOrCreateGlobalRpcCache();
356 findOrCreateRpcCache();
358 } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
359 throw new IllegalStateException("could not construct routing table cache");
364 * Useful for unit testing <note>It has package
367 ConcurrentMap<I, R> getGlobalRpcCache() {
368 return this.globalRpcCache;
372 * Useful for unit testing <note>It has package
375 ConcurrentMap<I, LinkedHashSet<R>> getRpcCache() {
376 return this.rpcCache;
380 * This is used from integration test NP rest API to check out the result of the
382 * <Note> For testing purpose only-- use it wisely</Note>
386 public String dumpGlobalRpcCache() {
387 Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
388 StringBuilder sb = new StringBuilder();
389 for (Map.Entry<I, R> entry : cacheEntrySet) {
390 sb.append("Key:").append(entry.getKey()).append("---->Value:")
391 .append((entry.getValue() != null) ? entry.getValue() : "null")
394 return sb.toString();
397 public String dumpRpcCache() {
398 Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
399 StringBuilder sb = new StringBuilder();
400 for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
401 sb.append("Key:").append(entry.getKey()).append("---->Value:")
402 .append((entry.getValue() != null) ? entry.getValue() : "null")
405 return sb.toString();
408 * Invoked when a new entry is available in the cache, the key is only
409 * provided, the value will come as an entryUpdate invocation
411 * @param key Key for the entry just created
412 * @param cacheName name of the cache for which update has been received
413 * @param originLocal true if the event is generated from this node
416 public void entryCreated(final I key, final String cacheName, final boolean originLocal) {
417 // TBD: do we require this.
418 if (log.isDebugEnabled()) {
419 log.debug("RoutingTableUpdates: entryCreated routeId = " + key + " cacheName=" + cacheName);
424 * Called anytime a given entry is updated
426 * @param key Key for the entry modified
427 * @param new_value the new value the key will have
428 * @param cacheName name of the cache for which update has been received
429 * @param originLocal true if the event is generated from this node
432 public void entryUpdated(final I key, final R new_value, final String cacheName, final boolean originLocal) {
433 if (log.isDebugEnabled()) {
434 log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + ",value = " + new_value
435 + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
437 // if (!originLocal) {
438 // for (RouteChangeListener rcl : routeChangeListeners) {
439 // rcl.onRouteUpdated(key, new_value);
445 * Called anytime a given key is removed from the ConcurrentHashMap we are
448 * @param key Key of the entry removed
449 * @param cacheName name of the cache for which update has been received
450 * @param originLocal true if the event is generated from this node
453 public void entryDeleted(final I key, final String cacheName, final boolean originLocal) {
454 if (log.isDebugEnabled()) {
455 log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + " local = " + originLocal
456 + " cacheName=" + cacheName + " originLocal=" + originLocal);
458 // if (!originLocal) {
459 // for (RouteChangeListener rcl : routeChangeListeners) {
460 // rcl.onRouteDeleted(key);