package org.opendaylight.controller.sal.connector.remoterpc.impl;
-import com.google.common.base.Preconditions;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+
import org.apache.felix.dm.Component;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
-/**
- * @author: syedbahm
- */
public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
- public static final String ROUTING_TABLE_GLOBAL_CACHE = "routing_table_global_cache";
- private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
+ private final Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
private IClusterGlobalServices clusterGlobalServices = null;
- private RoutingTableImpl routingTableInstance = null;
- private ConcurrentMap routingTableCache = null;
- private Set<RouteChangeListener> routeChangeListeners = Collections
- .synchronizedSet(new HashSet<RouteChangeListener>());
+
+ private ConcurrentMap<I,R> globalRpcCache = null;
+ private ConcurrentMap<I, LinkedHashSet<R>> rpcCache = null; //need routes to ordered by insert-order
+
+ public static final String GLOBALRPC_CACHE = "remoterpc_routingtable.globalrpc_cache";
+ public static final String RPC_CACHE = "remoterpc_routingtable.rpc_cache";
public RoutingTableImpl() {
}
@Override
- public void addRoute(I routeId, R route) throws RoutingTableException {
- throw new UnsupportedOperationException(" Not implemented yet!");
+ public R getGlobalRoute(final I routeId) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+ return globalRpcCache.get(routeId);
}
@Override
- public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
+ public void addGlobalRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
try {
- Set<R> existingRoute = null;
- // ok does the global route is already registered ?
- if ((existingRoute = getRoutes(routeId)) == null) {
-
- if (log.isDebugEnabled()) {
- log.debug("addGlobalRoute: adding a new route with id" + routeId + " and value = "
- + route);
- }
- // lets start a transaction
- clusterGlobalServices.tbegin();
-
- routingTableCache.put(routeId, route);
- clusterGlobalServices.tcommit();
- } else {
- throw new DuplicateRouteException(" There is already existing route " + existingRoute);
+ log.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route);
+ clusterGlobalServices.tbegin();
+ if (globalRpcCache.putIfAbsent(routeId, route) != null) {
+ throw new DuplicateRouteException(" There is already existing route " + routeId);
}
+ clusterGlobalServices.tcommit();
- } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
throw new RoutingTableException("Transaction error - while trying to create route id="
+ routeId + "with route" + route, e);
} catch (javax.transaction.SystemException e) {
}
@Override
- public void removeRoute(I routeId, R route) {
- throw new UnsupportedOperationException("Not implemented yet!");
- }
-
- @Override
- public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
+ public void removeGlobalRoute(final I routeId) throws RoutingTableException, SystemException {
Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
try {
- if (log.isDebugEnabled()) {
- log.debug("removeGlobalRoute: removing a new route with id" + routeId);
- }
- // lets start a transaction
- clusterGlobalServices.tbegin();
+ log.debug("removeGlobalRoute: removing a new route with id [{}]", routeId);
- routingTableCache.remove(routeId);
+ clusterGlobalServices.tbegin();
+ globalRpcCache.remove(routeId);
clusterGlobalServices.tcommit();
- } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
throw new RoutingTableException("Transaction error - while trying to remove route id="
+ routeId, e);
} catch (javax.transaction.SystemException e) {
}
}
+
@Override
- public Set<R> getRoutes(I routeId) {
-
- // Note: currently works for global routes only wherein there is just single
- // route
- Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!");
- R route = (R)routingTableCache.get(routeId);
- Set<R>routes = null;
- if(route !=null){
- routes = new HashSet<R>();
- routes.add(route);
+ public Set<R> getRoutes(final I routeId) {
+ Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+ Set<R> routes = rpcCache.get(routeId);
+
+ if (routes == null) {
+ return Collections.emptySet();
}
- return routes;
+ return ImmutableSet.copyOf(routes);
}
- @Override
- public Set<Map.Entry> getAllRoutes() {
- return routingTableCache.entrySet();
- }
- @Override
- public R getARoute(I routeId) {
- throw new UnsupportedOperationException("Not implemented yet!");
+
+ @Override
+ public R getLastAddedRoute(final I routeId) {
+
+ Set<R> routes = getRoutes(routeId);
+
+ if (routes.isEmpty()) {
+ return null;
+ }
+
+ R route = null;
+ Iterator<R> iter = routes.iterator();
+ while (iter.hasNext()) {
+ route = iter.next();
+ }
+
+ return route;
}
- /**
- * @deprecated doesn't do anything will be removed once listeners used
- * whiteboard pattern Registers listener for sending any change
- * notification
- * @param listener
- */
@Override
- public void registerRouteChangeListener(RouteChangeListener listener) {
+ public void addRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+ Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+
+ try{
+ clusterGlobalServices.tbegin();
+ log.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+ threadSafeAdd(routeId, route);
+ clusterGlobalServices.tcommit();
+
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+ throw new RoutingTableException("Transaction error - while trying to remove route id="
+ + routeId, e);
+ } catch (javax.transaction.SystemException e) {
+ throw new SystemException("System error occurred - while trying to remove with value", e);
+ }
+ }
+
+ @Override
+ public void addRoutes(final Set<I> routeIds, final R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ addRoute(routeId, route);
+ }
+ }
+
+ @Override
+ public void removeRoute(final I routeId, final R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+ Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
+
+ LinkedHashSet<R> routes = rpcCache.get(routeId);
+ if (routes == null) {
+ return;
+ }
+ try {
+ log.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route);
+
+ clusterGlobalServices.tbegin();
+ threadSafeRemove(routeId, route);
+ clusterGlobalServices.tcommit();
+
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+ throw new RoutingTableException("Transaction error - while trying to remove route id="
+ + routeId, e);
+ } catch (javax.transaction.SystemException e) {
+ throw new SystemException("System error occurred - while trying to remove with value", e);
+ }
+ }
+
+ @Override
+ public void removeRoutes(final Set<I> routeIds, final R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ removeRoute(routeId, route);
+ }
}
- public void setRouteChangeListener(RouteChangeListener rcl) {
- if(rcl != null){
- routeChangeListeners.add(rcl);
- }else{
- log.warn("setRouteChangeListener called with null listener");
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+ */
+ private void threadSafeAdd(final I routeId, final R route) {
+
+ for (int i=0;i<100;i++){
+
+ LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+ updatedRoutes.add(route);
+ LinkedHashSet<R> oldRoutes = rpcCache.putIfAbsent(routeId, updatedRoutes);
+ if (oldRoutes == null) {
+ return;
+ }
+
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.add(route);
+
+ if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) {
+ return;
+ }
}
+ //the method did not already return means it failed to add route in 10 attempts
+ throw new IllegalStateException("Failed to add route [" + routeId + "]");
}
- public void unSetRouteChangeListener(RouteChangeListener rcl) {
- if(rcl != null){
- routeChangeListeners.remove(rcl);
- }else{
- log.warn("unSetRouteChangeListener called with null listener");
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 10 times then throw
+ */
+ private void threadSafeRemove(final I routeId, final R route) {
+ LinkedHashSet<R> updatedRoutes = null;
+ for (int i=0;i<10;i++){
+ LinkedHashSet<R> oldRoutes = rpcCache.get(routeId);
+
+ // if route to be deleted is the only entry in the set then remove routeId from the cache
+ if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+ rpcCache.remove(routeId);
+ return;
+ }
+
+ // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.remove(route);
+ if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) {
+ return;
+ }
+
}
+ //the method did not already return means it failed to remove route in 10 attempts
+ throw new IllegalStateException("Failed to remove route [" + routeId + "]");
}
+
+ // /**
+ // * @deprecated doesn't do anything will be removed once listeners used
+ // * whiteboard pattern Registers listener for sending any change
+ // * notification
+ // * @param listener
+ // */
+ // @Override
+ // public void registerRouteChangeListener(RouteChangeListener listener) {
+ //
+ // }
+
+ // public void setRouteChangeListener(RouteChangeListener rcl) {
+ // if(rcl != null){
+ // routeChangeListeners.add(rcl);
+ // }else{
+ // log.warn("setRouteChangeListener called with null listener");
+ // }
+ // }
+ //
+ // public void unSetRouteChangeListener(RouteChangeListener rcl) {
+ // if(rcl != null){
+ // routeChangeListeners.remove(rcl);
+ // }else{
+ // log.warn("unSetRouteChangeListener called with null listener");
+ // }
+ // }
+
/**
* Returning the set of route change listeners for Unit testing Note: the
* package scope is default
*
* @return List of registered RouteChangeListener<I,R> listeners
*/
- Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
- return routeChangeListeners;
- }
-
- public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
+ // Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
+ // return routeChangeListeners;
+ // }
+ public void setClusterGlobalServices(final IClusterGlobalServices clusterGlobalServices) {
this.clusterGlobalServices = clusterGlobalServices;
}
- public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
- if((clusterGlobalServices != null ) && (this.clusterGlobalServices.equals(clusterGlobalServices))){
+ public void unsetClusterGlobalServices(final IClusterGlobalServices clusterGlobalServices) {
+ if ((clusterGlobalServices != null) && (this.clusterGlobalServices.equals(clusterGlobalServices))) {
this.clusterGlobalServices = null;
}
}
/**
- * Creates the Routing Table clustered global services cache
+ * Finds OR Creates clustered cache for Global RPCs
*
- * @throws CacheExistException
- * -- cluster global services exception when cache exist
- * @throws CacheConfigException
- * -- cluster global services exception during cache config
- * @throws CacheListenerAddException
- * -- cluster global services exception during adding of listener
+ * @throws CacheExistException -- cluster global services exception when cache exist
+ * @throws CacheConfigException -- cluster global services exception during cache config
+ * @throws CacheListenerAddException -- cluster global services exception during adding of listener
*/
- void createRoutingTableCache() throws CacheExistException, CacheConfigException,
- CacheListenerAddException {
+ @SuppressWarnings("unchecked")
+ void findOrCreateGlobalRpcCache() throws CacheExistException, CacheConfigException,
+ CacheListenerAddException {
// TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
// should be caching?
// let us check here if the cache already exists -- if so don't create
- if (!clusterGlobalServices.existCache(ROUTING_TABLE_GLOBAL_CACHE)) {
+ if (!clusterGlobalServices.existCache(GLOBALRPC_CACHE)) {
- if (log.isDebugEnabled()) {
- log.debug("createRoutingTableCache: creating a new routing table cache "
- + ROUTING_TABLE_GLOBAL_CACHE);
- }
- routingTableCache = clusterGlobalServices.createCache(ROUTING_TABLE_GLOBAL_CACHE,
+ globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.createCache(GLOBALRPC_CACHE,
EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ log.debug("Cache created [{}] ", GLOBALRPC_CACHE);
+
} else {
- if (log.isDebugEnabled()) {
- log.debug("createRoutingTableCache: found existing routing table cache "
- + ROUTING_TABLE_GLOBAL_CACHE);
- }
- routingTableCache = clusterGlobalServices.getCache(ROUTING_TABLE_GLOBAL_CACHE);
+ globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.getCache(GLOBALRPC_CACHE);
+ log.debug("Cache exists [{}] ", GLOBALRPC_CACHE);
+ }
+ }
+
+ /**
+ * Finds OR Creates clustered cache for Routed RPCs
+ *
+ * @throws CacheExistException -- cluster global services exception when cache exist
+ * @throws CacheConfigException -- cluster global services exception during cache config
+ * @throws CacheListenerAddException -- cluster global services exception during adding of listener
+ */
+
+ @SuppressWarnings("unchecked")
+ void findOrCreateRpcCache() throws CacheExistException, CacheConfigException,
+ CacheListenerAddException {
+ // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
+ // should be caching?
+
+ if (clusterGlobalServices.existCache(RPC_CACHE)){
+ rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.getCache(RPC_CACHE);
+ log.debug("Cache exists [{}] ", RPC_CACHE);
+ return;
}
+ //cache doesnt exist, create one
+ rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ log.debug("Cache created [{}] ", RPC_CACHE);
}
+
/**
* Function called by the dependency manager when all the required
* dependencies are satisfied
- *
*/
- void init(Component c) {
+ void init(final Component c) {
try {
- createRoutingTableCache();
- } catch (CacheExistException e) {
- throw new IllegalStateException("could not construct routing table cache");
- } catch (CacheConfigException e) {
- throw new IllegalStateException("could not construct routing table cache");
- } catch (CacheListenerAddException e) {
+ findOrCreateGlobalRpcCache();
+ findOrCreateRpcCache();
+
+ } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
throw new IllegalStateException("could not construct routing table cache");
}
}
/**
- * Get routing table method is useful for unit testing <note>It has package
+ * Useful for unit testing <note>It has package
+ * scope</note>
+ */
+ ConcurrentMap<I, R> getGlobalRpcCache() {
+ return this.globalRpcCache;
+ }
+
+ /**
+ * Useful for unit testing <note>It has package
* scope</note>
*/
- ConcurrentMap getRoutingTableCache() {
- return this.routingTableCache;
+ ConcurrentMap<I, LinkedHashSet<R>> getRpcCache() {
+ return this.rpcCache;
}
/**
* This is used from integration test NP rest API to check out the result of the
* cache population
* <Note> For testing purpose only-- use it wisely</Note>
+ *
* @return
*/
- public String dumpRoutingTableCache(){
- Set<Map.Entry<I, R>> cacheEntrySet = this.routingTableCache.entrySet();
- StringBuilder sb = new StringBuilder();
- for(Map.Entry<I,R> entry:cacheEntrySet){
- sb.append("Key:").append(entry.getKey()).append("---->Value:")
- .append((entry.getValue() != null)?entry.getValue():"null")
- .append("\n");
- }
- return sb.toString();
+ public String dumpGlobalRpcCache() {
+ Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<I, R> entry : cacheEntrySet) {
+ sb.append("Key:").append(entry.getKey()).append("---->Value:")
+ .append((entry.getValue() != null) ? entry.getValue() : "null")
+ .append("\n");
+ }
+ return sb.toString();
}
+ public String dumpRpcCache() {
+ Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
+ sb.append("Key:").append(entry.getKey()).append("---->Value:")
+ .append((entry.getValue() != null) ? entry.getValue() : "null")
+ .append("\n");
+ }
+ return sb.toString();
+ }
/**
* Invoked when a new entry is available in the cache, the key is only
* provided, the value will come as an entryUpdate invocation
*
- * @param key
- * Key for the entry just created
- * @param cacheName
- * name of the cache for which update has been received
- * @param originLocal
- * true if the event is generated from this node
+ * @param key Key for the entry just created
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
*/
@Override
- public void entryCreated(I key, String cacheName, boolean originLocal) {
+ public void entryCreated(final I key, final String cacheName, final boolean originLocal) {
// TBD: do we require this.
if (log.isDebugEnabled()) {
log.debug("RoutingTableUpdates: entryCreated routeId = " + key + " cacheName=" + cacheName);
/**
* Called anytime a given entry is updated
*
- * @param key
- * Key for the entry modified
- * @param new_value
- * the new value the key will have
- * @param cacheName
- * name of the cache for which update has been received
- * @param originLocal
- * true if the event is generated from this node
+ * @param key Key for the entry modified
+ * @param new_value the new value the key will have
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
*/
@Override
- public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
+ public void entryUpdated(final I key, final R new_value, final String cacheName, final boolean originLocal) {
if (log.isDebugEnabled()) {
log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + ",value = " + new_value
- + " ,cacheName=" + cacheName + " originLocal="+originLocal);
- }
- if (!originLocal) {
- for (RouteChangeListener rcl : routeChangeListeners) {
- rcl.onRouteUpdated(key, new_value);
- }
+ + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
}
+ // if (!originLocal) {
+ // for (RouteChangeListener rcl : routeChangeListeners) {
+ // rcl.onRouteUpdated(key, new_value);
+ // }
+ // }
}
/**
* Called anytime a given key is removed from the ConcurrentHashMap we are
* listening to.
*
- * @param key
- * Key of the entry removed
- * @param cacheName
- * name of the cache for which update has been received
- * @param originLocal
- * true if the event is generated from this node
+ * @param key Key of the entry removed
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
*/
@Override
- public void entryDeleted(I key, String cacheName, boolean originLocal) {
+ public void entryDeleted(final I key, final String cacheName, final boolean originLocal) {
if (log.isDebugEnabled()) {
log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + " local = " + originLocal
- + " cacheName=" + cacheName + " originLocal="+originLocal);
- }
- if (!originLocal) {
- for (RouteChangeListener rcl : routeChangeListeners) {
- rcl.onRouteDeleted(key);
- }
+ + " cacheName=" + cacheName + " originLocal=" + originLocal);
}
+ // if (!originLocal) {
+ // for (RouteChangeListener rcl : routeChangeListeners) {
+ // rcl.onRouteDeleted(key);
+ // }
+ // }
}
-}
\ No newline at end of file
+}