2 * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
9 package org.opendaylight.controller.sal.connector.remoterpc.impl;
11 import com.google.common.base.Optional;
12 import com.google.common.base.Preconditions;
13 import com.google.common.collect.ImmutableSet;
14 import org.apache.felix.dm.Component;
15 import org.opendaylight.controller.clustering.services.*;
16 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
17 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
18 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
19 import org.slf4j.Logger;
20 import org.slf4j.LoggerFactory;
22 import javax.transaction.HeuristicMixedException;
23 import javax.transaction.HeuristicRollbackException;
24 import javax.transaction.NotSupportedException;
25 import javax.transaction.RollbackException;
27 import java.util.concurrent.ConcurrentMap;
29 public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
31 private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
33 private IClusterGlobalServices clusterGlobalServices = null;
35 private ConcurrentMap<I,R> globalRpcCache = null;
36 private ConcurrentMap<I, LinkedHashSet<R>> rpcCache = null; //need routes to ordered by insert-order
38 public static final String GLOBALRPC_CACHE = "remoterpc_routingtable.globalrpc_cache";
39 public static final String RPC_CACHE = "remoterpc_routingtable.rpc_cache";
41 public RoutingTableImpl() {
45 public R getGlobalRoute(I routeId) throws RoutingTableException, SystemException {
46 Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
47 return globalRpcCache.get(routeId);
51 public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
52 Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
53 Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
56 log.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route);
57 clusterGlobalServices.tbegin();
58 if (globalRpcCache.putIfAbsent(routeId, route) != null) {
59 throw new DuplicateRouteException(" There is already existing route " + routeId);
61 clusterGlobalServices.tcommit();
63 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
64 throw new RoutingTableException("Transaction error - while trying to create route id="
65 + routeId + "with route" + route, e);
66 } catch (javax.transaction.SystemException e) {
67 throw new SystemException("System error occurred - while trying to create with value", e);
73 public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
74 Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
76 log.debug("removeGlobalRoute: removing a new route with id [{}]", routeId);
78 clusterGlobalServices.tbegin();
79 globalRpcCache.remove(routeId);
80 clusterGlobalServices.tcommit();
82 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
83 throw new RoutingTableException("Transaction error - while trying to remove route id="
85 } catch (javax.transaction.SystemException e) {
86 throw new SystemException("System error occurred - while trying to remove with value", e);
92 public Set<R> getRoutes(I routeId) {
93 Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
94 Set<R> routes = rpcCache.get(routeId);
96 if (routes == null) return Collections.emptySet();
98 return ImmutableSet.copyOf(routes);
103 public R getLastAddedRoute(I routeId) {
105 Set<R> routes = getRoutes(routeId);
107 if (routes.isEmpty()) return null;
110 Iterator<R> iter = routes.iterator();
111 while (iter.hasNext())
118 public void addRoute(I routeId, R route) throws RoutingTableException, SystemException {
119 Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
120 Preconditions.checkNotNull(route, "addRoute: route cannot be null");
123 clusterGlobalServices.tbegin();
124 log.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
125 threadSafeAdd(routeId, route);
126 clusterGlobalServices.tcommit();
128 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
129 throw new RoutingTableException("Transaction error - while trying to remove route id="
131 } catch (javax.transaction.SystemException e) {
132 throw new SystemException("System error occurred - while trying to remove with value", e);
137 public void addRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
138 Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
139 for (I routeId : routeIds){
140 addRoute(routeId, route);
145 public void removeRoute(I routeId, R route) throws RoutingTableException, SystemException {
146 Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
147 Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
149 LinkedHashSet<R> routes = rpcCache.get(routeId);
150 if (routes == null) return;
153 log.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route);
155 clusterGlobalServices.tbegin();
156 threadSafeRemove(routeId, route);
157 clusterGlobalServices.tcommit();
159 } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
160 throw new RoutingTableException("Transaction error - while trying to remove route id="
162 } catch (javax.transaction.SystemException e) {
163 throw new SystemException("System error occurred - while trying to remove with value", e);
168 public void removeRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
169 Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
170 for (I routeId : routeIds){
171 removeRoute(routeId, route);
176 * This method guarantees that no 2 thread over write each other's changes.
177 * Just so that we dont end up in infinite loop, it tries for 100 times then throw
179 private void threadSafeAdd(I routeId, R route) {
181 for (int i=0;i<100;i++){
183 LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
184 updatedRoutes.add(route);
185 LinkedHashSet<R> oldRoutes = rpcCache.putIfAbsent(routeId, updatedRoutes);
186 if (oldRoutes == null) return;
188 updatedRoutes = new LinkedHashSet<>(oldRoutes);
189 updatedRoutes.add(route);
191 if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
193 //the method did not already return means it failed to add route in 10 attempts
194 throw new IllegalStateException("Failed to add route [" + routeId + "]");
198 * This method guarantees that no 2 thread over write each other's changes.
199 * Just so that we dont end up in infinite loop, it tries for 10 times then throw
201 private void threadSafeRemove(I routeId, R route) {
202 LinkedHashSet<R> updatedRoutes = null;
203 for (int i=0;i<10;i++){
204 LinkedHashSet<R> oldRoutes = rpcCache.get(routeId);
206 // if route to be deleted is the only entry in the set then remove routeId from the cache
207 if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
208 rpcCache.remove(routeId);
212 // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
213 updatedRoutes = new LinkedHashSet<>(oldRoutes);
214 updatedRoutes.remove(route);
215 if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
218 //the method did not already return means it failed to remove route in 10 attempts
219 throw new IllegalStateException("Failed to remove route [" + routeId + "]");
224 // * @deprecated doesn't do anything will be removed once listeners used
225 // * whiteboard pattern Registers listener for sending any change
230 // public void registerRouteChangeListener(RouteChangeListener listener) {
234 // public void setRouteChangeListener(RouteChangeListener rcl) {
236 // routeChangeListeners.add(rcl);
238 // log.warn("setRouteChangeListener called with null listener");
242 // public void unSetRouteChangeListener(RouteChangeListener rcl) {
244 // routeChangeListeners.remove(rcl);
246 // log.warn("unSetRouteChangeListener called with null listener");
251 * Returning the set of route change listeners for Unit testing Note: the
252 * package scope is default
254 * @return List of registered RouteChangeListener<I,R> listeners
256 // Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
257 // return routeChangeListeners;
259 public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
260 this.clusterGlobalServices = clusterGlobalServices;
263 public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
264 if ((clusterGlobalServices != null) && (this.clusterGlobalServices.equals(clusterGlobalServices))) {
265 this.clusterGlobalServices = null;
270 * Finds OR Creates clustered cache for Global RPCs
272 * @throws CacheExistException -- cluster global services exception when cache exist
273 * @throws CacheConfigException -- cluster global services exception during cache config
274 * @throws CacheListenerAddException -- cluster global services exception during adding of listener
277 void findOrCreateGlobalRpcCache() throws CacheExistException, CacheConfigException,
278 CacheListenerAddException {
279 // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
280 // should be caching?
282 // let us check here if the cache already exists -- if so don't create
283 if (!clusterGlobalServices.existCache(GLOBALRPC_CACHE)) {
285 globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.createCache(GLOBALRPC_CACHE,
286 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
287 log.debug("Cache created [{}] ", GLOBALRPC_CACHE);
290 globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.getCache(GLOBALRPC_CACHE);
291 log.debug("Cache exists [{}] ", GLOBALRPC_CACHE);
296 * Finds OR Creates clustered cache for Routed RPCs
298 * @throws CacheExistException -- cluster global services exception when cache exist
299 * @throws CacheConfigException -- cluster global services exception during cache config
300 * @throws CacheListenerAddException -- cluster global services exception during adding of listener
303 void findOrCreateRpcCache() throws CacheExistException, CacheConfigException,
304 CacheListenerAddException {
305 // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
306 // should be caching?
308 if (clusterGlobalServices.existCache(RPC_CACHE)){
309 rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.getCache(RPC_CACHE);
310 log.debug("Cache exists [{}] ", RPC_CACHE);
314 //cache doesnt exist, create one
315 rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
316 EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
317 log.debug("Cache created [{}] ", RPC_CACHE);
322 * Function called by the dependency manager when all the required
323 * dependencies are satisfied
325 void init(Component c) {
328 findOrCreateGlobalRpcCache();
329 findOrCreateRpcCache();
331 } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
332 throw new IllegalStateException("could not construct routing table cache");
337 * Useful for unit testing <note>It has package
340 ConcurrentMap getGlobalRpcCache() {
341 return this.globalRpcCache;
345 * Useful for unit testing <note>It has package
348 ConcurrentMap getRpcCache() {
349 return this.rpcCache;
353 * This is used from integration test NP rest API to check out the result of the
355 * <Note> For testing purpose only-- use it wisely</Note>
359 public String dumpGlobalRpcCache() {
360 Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
361 StringBuilder sb = new StringBuilder();
362 for (Map.Entry<I, R> entry : cacheEntrySet) {
363 sb.append("Key:").append(entry.getKey()).append("---->Value:")
364 .append((entry.getValue() != null) ? entry.getValue() : "null")
367 return sb.toString();
370 public String dumpRpcCache() {
371 Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
372 StringBuilder sb = new StringBuilder();
373 for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
374 sb.append("Key:").append(entry.getKey()).append("---->Value:")
375 .append((entry.getValue() != null) ? entry.getValue() : "null")
378 return sb.toString();
381 * Invoked when a new entry is available in the cache, the key is only
382 * provided, the value will come as an entryUpdate invocation
384 * @param key Key for the entry just created
385 * @param cacheName name of the cache for which update has been received
386 * @param originLocal true if the event is generated from this node
389 public void entryCreated(I key, String cacheName, boolean originLocal) {
390 // TBD: do we require this.
391 if (log.isDebugEnabled()) {
392 log.debug("RoutingTableUpdates: entryCreated routeId = " + key + " cacheName=" + cacheName);
397 * Called anytime a given entry is updated
399 * @param key Key for the entry modified
400 * @param new_value the new value the key will have
401 * @param cacheName name of the cache for which update has been received
402 * @param originLocal true if the event is generated from this node
405 public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
406 if (log.isDebugEnabled()) {
407 log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + ",value = " + new_value
408 + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
410 // if (!originLocal) {
411 // for (RouteChangeListener rcl : routeChangeListeners) {
412 // rcl.onRouteUpdated(key, new_value);
418 * Called anytime a given key is removed from the ConcurrentHashMap we are
421 * @param key Key of the entry removed
422 * @param cacheName name of the cache for which update has been received
423 * @param originLocal true if the event is generated from this node
426 public void entryDeleted(I key, String cacheName, boolean originLocal) {
427 if (log.isDebugEnabled()) {
428 log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + " local = " + originLocal
429 + " cacheName=" + cacheName + " originLocal=" + originLocal);
431 // if (!originLocal) {
432 // for (RouteChangeListener rcl : routeChangeListeners) {
433 // rcl.onRouteDeleted(key);