*/
package org.opendaylight.controller.sal.connector.remoterpc.api;
-import java.util.Map;
import java.util.Set;
public interface RoutingTable<I,R> {
-
-
/**
- * Adds a network address for the route. If address for route
- * exists, appends the address to the list
+ * Adds a network address for the route. If the route already exists,
+ * it throws <code>DuplicateRouteException</code>.
+ * This method would be used when registering a global service.
+ *
*
* @param routeId route identifier
* @param route network address
- * @throws RoutingTableException for any logical exception
+ * @throws DuplicateRouteException
+ * @throws RoutingTableException
+ */
+ public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException;
+
+ /**
+ * Remove the route.
+ * This method would be used when registering a global service.
+ * @param routeId
+ * @throws RoutingTableException
* @throws SystemException
*/
- public void addRoute(I routeId, R route) throws RoutingTableException,SystemException;
+ public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException;
- /**
+ /**
* Adds a network address for the route. If the route already exists,
* it throws <code>DuplicateRouteException</code>.
* This method would be used when registering a global service.
* @throws DuplicateRouteException
* @throws RoutingTableException
*/
- public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException;
-
+ public R getGlobalRoute(I routeId) throws RoutingTableException, SystemException;
+ /**
+ * Adds a network address for the route. If address for route
+ * exists, appends the address to the list
+ *
+ * @param routeId route identifier
+ * @param route network address
+ * @throws RoutingTableException for any logical exception
+ * @throws SystemException
+ */
+ public void addRoute(I routeId, R route) throws RoutingTableException,SystemException;
/**
* @param routeId
* @param route
*/
- public void removeRoute(I routeId, R route);
+ public void removeRoute(I routeId, R route) throws RoutingTableException,SystemException;
+ /**
+ * Adds address for a set of route identifiers. If address for route
+ * exists, appends the address to the set.
+ *
+ * @param routeIds a set of routeIds
+ * @param route network address
+ * @throws RoutingTableException for any logical exception
+ * @throws SystemException
+ */
+ public void addRoutes(Set<I> routeIds, R route) throws RoutingTableException,SystemException;
- /**
- * Remove the route.
- * This method would be used when registering a global service.
- * @param routeId
- * @throws RoutingTableException
- * @throws SystemException
- */
- public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException;
+ /**
+ * Removes address for a set of route identifiers.
+ *
+ * @param routeIds a set of routeIds
+ * @param route network address
+ * @throws RoutingTableException for any logical exception
+ * @throws SystemException
+ */
+ public void removeRoutes(Set<I> routeIds, R route) throws RoutingTableException,SystemException;
/**
* Returns a set of network addresses associated with this route
*/
public Set<R> getRoutes(I routeId);
- /**
- * Returns all network addresses stored in the table
- * @return
- */
- public Set<Map.Entry> getAllRoutes();
/**
- * Returns only one address from the list of network addresses
- * associated with the route. The algorithm to determine that
- * one address is upto the implementer
+ * Returns the last inserted address from the list of network addresses
+ * associated with the route.
* @param routeId
* @return
*/
- public R getARoute(I routeId);
-
- /**
- *
- * This will be removed after listeners
- * have made change on their end to use whiteboard pattern
- * @deprecated
- */
-
- public void registerRouteChangeListener(RouteChangeListener listener);
+ public R getLastAddedRoute(I routeId);
public class DuplicateRouteException extends RoutingTableException {
public DuplicateRouteException(String message) {
if (imp.equals(RoutingTableImpl.class)) {
Dictionary<String, Set<String>> props = new Hashtable<String, Set<String>>();
Set<String> propSet = new HashSet<String>();
- propSet.add(RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE);
+ propSet.add(RoutingTableImpl.GLOBALRPC_CACHE);
+ propSet.add(RoutingTableImpl.RPC_CACHE);
props.put(CACHE_UPDATE_AWARE_REGISTRY_KEY, propSet);
c.setInterface(new String[] { RoutingTable.class.getName(),ICacheUpdateAware.class.getName() }, props);
package org.opendaylight.controller.sal.connector.remoterpc.impl;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import org.apache.felix.dm.Component;
-import org.opendaylight.controller.clustering.services.CacheConfigException;
-import org.opendaylight.controller.clustering.services.CacheExistException;
-import org.opendaylight.controller.clustering.services.CacheListenerAddException;
-import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
-import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
-import org.opendaylight.controller.clustering.services.IClusterServices;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.clustering.services.*;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import java.util.concurrent.ConcurrentMap;
-/**
- * @author: syedbahm
- */
public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
- public static final String ROUTING_TABLE_GLOBAL_CACHE = "routing_table_global_cache";
- private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
+ private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
- private IClusterGlobalServices clusterGlobalServices = null;
- private RoutingTableImpl routingTableInstance = null;
- private ConcurrentMap routingTableCache = null;
- private Set<RouteChangeListener> routeChangeListeners = Collections
- .synchronizedSet(new HashSet<RouteChangeListener>());
+ private IClusterGlobalServices clusterGlobalServices = null;
- public RoutingTableImpl() {
- }
+ private ConcurrentMap<I,R> globalRpcCache = null;
+ private ConcurrentMap<I, LinkedHashSet<R>> rpcCache = null; //need routes to ordered by insert-order
- @Override
- public void addRoute(I routeId, R route) throws RoutingTableException {
- throw new UnsupportedOperationException(" Not implemented yet!");
- }
+ public static final String GLOBALRPC_CACHE = "remoterpc_routingtable.globalrpc_cache";
+ public static final String RPC_CACHE = "remoterpc_routingtable.rpc_cache";
- @Override
- public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
- Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
- Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
- try {
-
- Set<R> existingRoute = null;
- // ok does the global route is already registered ?
- if ((existingRoute = getRoutes(routeId)) == null) {
-
- if (log.isDebugEnabled()) {
- log.debug("addGlobalRoute: adding a new route with id" + routeId + " and value = "
- + route);
- }
- // lets start a transaction
- clusterGlobalServices.tbegin();
-
- routingTableCache.put(routeId, route);
- clusterGlobalServices.tcommit();
- } else {
- throw new DuplicateRouteException(" There is already existing route " + existingRoute);
- }
-
- } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
- throw new RoutingTableException("Transaction error - while trying to create route id="
- + routeId + "with route" + route, e);
- } catch (javax.transaction.SystemException e) {
- throw new SystemException("System error occurred - while trying to create with value", e);
- }
+ public RoutingTableImpl() {
+ }
- }
+ @Override
+ public R getGlobalRoute(I routeId) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+ return globalRpcCache.get(routeId);
+ }
- @Override
- public void removeRoute(I routeId, R route) {
- throw new UnsupportedOperationException("Not implemented yet!");
+ @Override
+ public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+ Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+ try {
+
+ log.debug("addGlobalRoute: adding a new route with id[{}] and value [{}]", routeId, route);
+ clusterGlobalServices.tbegin();
+ if (globalRpcCache.putIfAbsent(routeId, route) != null) {
+ throw new DuplicateRouteException(" There is already existing route " + routeId);
+ }
+ clusterGlobalServices.tcommit();
+
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+ throw new RoutingTableException("Transaction error - while trying to create route id="
+ + routeId + "with route" + route, e);
+ } catch (javax.transaction.SystemException e) {
+ throw new SystemException("System error occurred - while trying to create with value", e);
}
- @Override
- public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
- Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
- try {
- if (log.isDebugEnabled()) {
- log.debug("removeGlobalRoute: removing a new route with id" + routeId);
- }
- // lets start a transaction
- clusterGlobalServices.tbegin();
-
- routingTableCache.remove(routeId);
- clusterGlobalServices.tcommit();
-
- } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
- throw new RoutingTableException("Transaction error - while trying to remove route id="
- + routeId, e);
- } catch (javax.transaction.SystemException e) {
- throw new SystemException("System error occurred - while trying to remove with value", e);
- }
+ }
+
+ @Override
+ public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+ try {
+ log.debug("removeGlobalRoute: removing a new route with id [{}]", routeId);
+
+ clusterGlobalServices.tbegin();
+ globalRpcCache.remove(routeId);
+ clusterGlobalServices.tcommit();
+
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+ throw new RoutingTableException("Transaction error - while trying to remove route id="
+ + routeId, e);
+ } catch (javax.transaction.SystemException e) {
+ throw new SystemException("System error occurred - while trying to remove with value", e);
}
+ }
+
- @Override
- public Set<R> getRoutes(I routeId) {
+ @Override
+ public Set<R> getRoutes(I routeId) {
+ Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+ Set<R> routes = rpcCache.get(routeId);
- // Note: currently works for global routes only wherein there is just single
- // route
- Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!");
- R route = (R)routingTableCache.get(routeId);
- Set<R>routes = null;
- if(route !=null){
- routes = new HashSet<R>();
- routes.add(route);
- }
+ if (routes == null) return Collections.emptySet();
+
+ return ImmutableSet.copyOf(routes);
+ }
- return routes;
- }
+
+
+ public R getLastAddedRoute(I routeId) {
+
+ Set<R> routes = getRoutes(routeId);
+
+ if (routes.isEmpty()) return null;
+
+ R route = null;
+ Iterator<R> iter = routes.iterator();
+ while (iter.hasNext())
+ route = iter.next();
+
+ return route;
+ }
@Override
- public Set<Map.Entry> getAllRoutes() {
- return routingTableCache.entrySet();
+ public void addRoute(I routeId, R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+ Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+
+ try{
+ clusterGlobalServices.tbegin();
+ log.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+ threadSafeAdd(routeId, route);
+ clusterGlobalServices.tcommit();
+
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+ throw new RoutingTableException("Transaction error - while trying to remove route id="
+ + routeId, e);
+ } catch (javax.transaction.SystemException e) {
+ throw new SystemException("System error occurred - while trying to remove with value", e);
+ }
}
@Override
- public R getARoute(I routeId) {
- throw new UnsupportedOperationException("Not implemented yet!");
+ public void addRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ addRoute(routeId, route);
}
+ }
- /**
- * @deprecated doesn't do anything will be removed once listeners used
- * whiteboard pattern Registers listener for sending any change
- * notification
- * @param listener
- */
- @Override
- public void registerRouteChangeListener(RouteChangeListener listener) {
+ @Override
+ public void removeRoute(I routeId, R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+ Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
- }
+ LinkedHashSet<R> routes = rpcCache.get(routeId);
+ if (routes == null) return;
- public void setRouteChangeListener(RouteChangeListener rcl) {
- if(rcl != null){
- routeChangeListeners.add(rcl);
- }else{
- log.warn("setRouteChangeListener called with null listener");
- }
- }
+ try {
+ log.debug("removeRoute: removing a new route with k/v [{}/{}]", routeId, route);
- public void unSetRouteChangeListener(RouteChangeListener rcl) {
- if(rcl != null){
- routeChangeListeners.remove(rcl);
- }else{
- log.warn("unSetRouteChangeListener called with null listener");
- }
+ clusterGlobalServices.tbegin();
+ threadSafeRemove(routeId, route);
+ clusterGlobalServices.tcommit();
+
+ } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+ throw new RoutingTableException("Transaction error - while trying to remove route id="
+ + routeId, e);
+ } catch (javax.transaction.SystemException e) {
+ throw new SystemException("System error occurred - while trying to remove with value", e);
}
+ }
- /**
- * Returning the set of route change listeners for Unit testing Note: the
- * package scope is default
- *
- * @return List of registered RouteChangeListener<I,R> listeners
- */
- Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
- return routeChangeListeners;
+ @Override
+ public void removeRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
+ Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+ for (I routeId : routeIds){
+ removeRoute(routeId, route);
}
+ }
- public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
- this.clusterGlobalServices = clusterGlobalServices;
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+ */
+ private void threadSafeAdd(I routeId, R route) {
+
+ for (int i=0;i<100;i++){
+
+ LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+ updatedRoutes.add(route);
+ LinkedHashSet<R> oldRoutes = rpcCache.putIfAbsent(routeId, updatedRoutes);
+ if (oldRoutes == null) return;
+
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.add(route);
+
+ if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
}
+ //the method did not already return means it failed to add route in 10 attempts
+ throw new IllegalStateException("Failed to add route [" + routeId + "]");
+ }
+
+ /**
+ * This method guarantees that no 2 thread over write each other's changes.
+ * Just so that we dont end up in infinite loop, it tries for 10 times then throw
+ */
+ private void threadSafeRemove(I routeId, R route) {
+ LinkedHashSet<R> updatedRoutes = null;
+ for (int i=0;i<10;i++){
+ LinkedHashSet<R> oldRoutes = rpcCache.get(routeId);
+
+ // if route to be deleted is the only entry in the set then remove routeId from the cache
+ if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+ rpcCache.remove(routeId);
+ return;
+ }
+
+ // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+ updatedRoutes = new LinkedHashSet<>(oldRoutes);
+ updatedRoutes.remove(route);
+ if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
- public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
- if((clusterGlobalServices != null ) && (this.clusterGlobalServices.equals(clusterGlobalServices))){
- this.clusterGlobalServices = null;
- }
}
+ //the method did not already return means it failed to remove route in 10 attempts
+ throw new IllegalStateException("Failed to remove route [" + routeId + "]");
+ }
+
- /**
- * Creates the Routing Table clustered global services cache
- *
- * @throws CacheExistException
- * -- cluster global services exception when cache exist
- * @throws CacheConfigException
- * -- cluster global services exception during cache config
- * @throws CacheListenerAddException
- * -- cluster global services exception during adding of listener
- */
-
- void createRoutingTableCache() throws CacheExistException, CacheConfigException,
- CacheListenerAddException {
- // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
- // should be caching?
-
- // let us check here if the cache already exists -- if so don't create
- if (!clusterGlobalServices.existCache(ROUTING_TABLE_GLOBAL_CACHE)) {
-
- if (log.isDebugEnabled()) {
- log.debug("createRoutingTableCache: creating a new routing table cache "
- + ROUTING_TABLE_GLOBAL_CACHE);
- }
- routingTableCache = clusterGlobalServices.createCache(ROUTING_TABLE_GLOBAL_CACHE,
- EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
- } else {
- if (log.isDebugEnabled()) {
- log.debug("createRoutingTableCache: found existing routing table cache "
- + ROUTING_TABLE_GLOBAL_CACHE);
- }
- routingTableCache = clusterGlobalServices.getCache(ROUTING_TABLE_GLOBAL_CACHE);
- }
+// /**
+// * @deprecated doesn't do anything will be removed once listeners used
+// * whiteboard pattern Registers listener for sending any change
+// * notification
+// * @param listener
+// */
+// @Override
+// public void registerRouteChangeListener(RouteChangeListener listener) {
+//
+// }
+
+// public void setRouteChangeListener(RouteChangeListener rcl) {
+// if(rcl != null){
+// routeChangeListeners.add(rcl);
+// }else{
+// log.warn("setRouteChangeListener called with null listener");
+// }
+// }
+//
+// public void unSetRouteChangeListener(RouteChangeListener rcl) {
+// if(rcl != null){
+// routeChangeListeners.remove(rcl);
+// }else{
+// log.warn("unSetRouteChangeListener called with null listener");
+// }
+// }
+
+ /**
+ * Returning the set of route change listeners for Unit testing Note: the
+ * package scope is default
+ *
+ * @return List of registered RouteChangeListener<I,R> listeners
+ */
+// Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
+// return routeChangeListeners;
+// }
+ public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
+ this.clusterGlobalServices = clusterGlobalServices;
+ }
+
+ public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
+ if ((clusterGlobalServices != null) && (this.clusterGlobalServices.equals(clusterGlobalServices))) {
+ this.clusterGlobalServices = null;
+ }
+ }
+ /**
+ * Finds OR Creates clustered cache for Global RPCs
+ *
+ * @throws CacheExistException -- cluster global services exception when cache exist
+ * @throws CacheConfigException -- cluster global services exception during cache config
+ * @throws CacheListenerAddException -- cluster global services exception during adding of listener
+ */
+
+ void findOrCreateGlobalRpcCache() throws CacheExistException, CacheConfigException,
+ CacheListenerAddException {
+ // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
+ // should be caching?
+
+ // let us check here if the cache already exists -- if so don't create
+ if (!clusterGlobalServices.existCache(GLOBALRPC_CACHE)) {
+
+ globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.createCache(GLOBALRPC_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ log.debug("Cache created [{}] ", GLOBALRPC_CACHE);
+
+ } else {
+ globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.getCache(GLOBALRPC_CACHE);
+ log.debug("Cache exists [{}] ", GLOBALRPC_CACHE);
}
+ }
- /**
- * Function called by the dependency manager when all the required
- * dependencies are satisfied
- *
- */
- void init(Component c) {
- try {
-
- createRoutingTableCache();
- } catch (CacheExistException e) {
- throw new IllegalStateException("could not construct routing table cache");
- } catch (CacheConfigException e) {
- throw new IllegalStateException("could not construct routing table cache");
- } catch (CacheListenerAddException e) {
- throw new IllegalStateException("could not construct routing table cache");
- }
+ /**
+ * Finds OR Creates clustered cache for Routed RPCs
+ *
+ * @throws CacheExistException -- cluster global services exception when cache exist
+ * @throws CacheConfigException -- cluster global services exception during cache config
+ * @throws CacheListenerAddException -- cluster global services exception during adding of listener
+ */
+
+ void findOrCreateRpcCache() throws CacheExistException, CacheConfigException,
+ CacheListenerAddException {
+ // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
+ // should be caching?
+
+ if (clusterGlobalServices.existCache(RPC_CACHE)){
+ rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.getCache(RPC_CACHE);
+ log.debug("Cache exists [{}] ", RPC_CACHE);
+ return;
}
- /**
- * Get routing table method is useful for unit testing <note>It has package
- * scope</note>
- */
- ConcurrentMap getRoutingTableCache() {
- return this.routingTableCache;
+ //cache doesnt exist, create one
+ rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+ log.debug("Cache created [{}] ", RPC_CACHE);
+ }
+
+
+ /**
+ * Function called by the dependency manager when all the required
+ * dependencies are satisfied
+ */
+ void init(Component c) {
+ try {
+
+ findOrCreateGlobalRpcCache();
+ findOrCreateRpcCache();
+
+ } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
+ throw new IllegalStateException("could not construct routing table cache");
}
+ }
+
+ /**
+ * Useful for unit testing <note>It has package
+ * scope</note>
+ */
+ ConcurrentMap getGlobalRpcCache() {
+ return this.globalRpcCache;
+ }
+
+ /**
+ * Useful for unit testing <note>It has package
+ * scope</note>
+ */
+ ConcurrentMap getRpcCache() {
+ return this.rpcCache;
+ }
- /**
- * This is used from integration test NP rest API to check out the result of the
- * cache population
- * <Note> For testing purpose only-- use it wisely</Note>
- * @return
- */
- public String dumpRoutingTableCache(){
- Set<Map.Entry<I, R>> cacheEntrySet = this.routingTableCache.entrySet();
- StringBuilder sb = new StringBuilder();
- for(Map.Entry<I,R> entry:cacheEntrySet){
- sb.append("Key:").append(entry.getKey()).append("---->Value:")
- .append((entry.getValue() != null)?entry.getValue():"null")
- .append("\n");
- }
- return sb.toString();
+ /**
+ * This is used from integration test NP rest API to check out the result of the
+ * cache population
+ * <Note> For testing purpose only-- use it wisely</Note>
+ *
+ * @return
+ */
+ public String dumpGlobalRpcCache() {
+ Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<I, R> entry : cacheEntrySet) {
+ sb.append("Key:").append(entry.getKey()).append("---->Value:")
+ .append((entry.getValue() != null) ? entry.getValue() : "null")
+ .append("\n");
}
+ return sb.toString();
+ }
- /**
- * Invoked when a new entry is available in the cache, the key is only
- * provided, the value will come as an entryUpdate invocation
- *
- * @param key
- * Key for the entry just created
- * @param cacheName
- * name of the cache for which update has been received
- * @param originLocal
- * true if the event is generated from this node
- */
- @Override
- public void entryCreated(I key, String cacheName, boolean originLocal) {
- // TBD: do we require this.
- if (log.isDebugEnabled()) {
- log.debug("RoutingTableUpdates: entryCreated routeId = " + key + " cacheName=" + cacheName);
- }
+ public String dumpRpcCache() {
+ Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
+ sb.append("Key:").append(entry.getKey()).append("---->Value:")
+ .append((entry.getValue() != null) ? entry.getValue() : "null")
+ .append("\n");
}
+ return sb.toString();
+ }
+ /**
+ * Invoked when a new entry is available in the cache, the key is only
+ * provided, the value will come as an entryUpdate invocation
+ *
+ * @param key Key for the entry just created
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
+ */
+ @Override
+ public void entryCreated(I key, String cacheName, boolean originLocal) {
+ // TBD: do we require this.
+ if (log.isDebugEnabled()) {
+ log.debug("RoutingTableUpdates: entryCreated routeId = " + key + " cacheName=" + cacheName);
+ }
+ }
- /**
- * Called anytime a given entry is updated
- *
- * @param key
- * Key for the entry modified
- * @param new_value
- * the new value the key will have
- * @param cacheName
- * name of the cache for which update has been received
- * @param originLocal
- * true if the event is generated from this node
- */
- @Override
- public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
- if (log.isDebugEnabled()) {
- log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + ",value = " + new_value
- + " ,cacheName=" + cacheName + " originLocal="+originLocal);
- }
- if (!originLocal) {
- for (RouteChangeListener rcl : routeChangeListeners) {
- rcl.onRouteUpdated(key, new_value);
- }
- }
+ /**
+ * Called anytime a given entry is updated
+ *
+ * @param key Key for the entry modified
+ * @param new_value the new value the key will have
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
+ */
+ @Override
+ public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
+ if (log.isDebugEnabled()) {
+ log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + ",value = " + new_value
+ + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
}
+// if (!originLocal) {
+// for (RouteChangeListener rcl : routeChangeListeners) {
+// rcl.onRouteUpdated(key, new_value);
+// }
+// }
+ }
- /**
- * Called anytime a given key is removed from the ConcurrentHashMap we are
- * listening to.
- *
- * @param key
- * Key of the entry removed
- * @param cacheName
- * name of the cache for which update has been received
- * @param originLocal
- * true if the event is generated from this node
- */
- @Override
- public void entryDeleted(I key, String cacheName, boolean originLocal) {
- if (log.isDebugEnabled()) {
- log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + " local = " + originLocal
- + " cacheName=" + cacheName + " originLocal="+originLocal);
- }
- if (!originLocal) {
- for (RouteChangeListener rcl : routeChangeListeners) {
- rcl.onRouteDeleted(key);
- }
- }
+ /**
+ * Called anytime a given key is removed from the ConcurrentHashMap we are
+ * listening to.
+ *
+ * @param key Key of the entry removed
+ * @param cacheName name of the cache for which update has been received
+ * @param originLocal true if the event is generated from this node
+ */
+ @Override
+ public void entryDeleted(I key, String cacheName, boolean originLocal) {
+ if (log.isDebugEnabled()) {
+ log.debug("RoutingTableUpdates: entryUpdated routeId = " + key + " local = " + originLocal
+ + " cacheName=" + cacheName + " originLocal=" + originLocal);
}
+// if (!originLocal) {
+// for (RouteChangeListener rcl : routeChangeListeners) {
+// rcl.onRouteDeleted(key);
+// }
+// }
+ }
}
\ No newline at end of file
import junit.framework.Assert;
import org.apache.felix.dm.Component;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
import org.opendaylight.controller.clustering.services.IClusterServices;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashSet;
-import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
-/**
- * @author: syedbahm
- */
public class RoutingTableImplTest {
- private IClusterGlobalServices ics = mock(IClusterGlobalServices.class);
- private RoutingTableImpl rti = new RoutingTableImpl();
+ private final URI namespace = URI.create("http://cisco.com/example");
+ private final QName QNAME = new QName(namespace, "global");
- private final URI namespace = URI.create("http://cisco.com/example");
- private final QName QNAME = new QName(namespace,"global");
+ private IClusterGlobalServices clusterService;
+ private RoutingTableImpl<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
+ ConcurrentMap mockGlobalRpcCache;
+ ConcurrentMap mockRpcCache;
- ConcurrentMap concurrentMapMock = mock(ConcurrentMap.class);
+ @Before
+ public void setUp() throws Exception{
+ clusterService = mock(IClusterGlobalServices.class);
+ routingTable = new RoutingTableImpl<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
+ mockGlobalRpcCache = new ConcurrentHashMap<>();
+ mockRpcCache = new ConcurrentHashMap<>();
+ createRoutingTableCache();
+ }
+ @After
+ public void tearDown(){
+ reset(clusterService);
+ mockGlobalRpcCache = null;
+ mockRpcCache = null;
+ }
- @Test
- public void testAddGlobalRoute() throws Exception {
- ConcurrentMap concurrentMap = createRoutingTableCache();
+ @Test
+ public void addGlobalRoute_ValidArguments_ShouldAdd() throws Exception {
- Assert.assertNotNull(concurrentMap);
- RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = mock(RpcRouter.RouteIdentifier.class);
- InstanceIdentifier identifier = mock(InstanceIdentifier.class);
- when(routeIdentifier.getType()).thenReturn(QNAME);
- when(routeIdentifier.getRoute()).thenReturn(identifier);
+ Assert.assertNotNull(mockGlobalRpcCache);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
- rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+ final String expectedRoute = "172.27.12.1:5000";
+ routingTable.addGlobalRoute(routeIdentifier, expectedRoute);
- Set<String> globalService = new HashSet<String>();
- globalService.add("172.27.12.1:5000");
+ ConcurrentMap latestCache = routingTable.getGlobalRpcCache();
+ Assert.assertEquals(mockGlobalRpcCache, latestCache);
+ Assert.assertEquals(expectedRoute, latestCache.get(routeIdentifier));
+ }
- when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
- ConcurrentMap latestCache = rti.getRoutingTableCache();
+ @Test (expected = RoutingTable.DuplicateRouteException.class)
+ public void addGlobalRoute_DuplicateRoute_ShouldThrow() throws Exception{
- Assert.assertEquals(concurrentMap,latestCache);
+ Assert.assertNotNull(mockGlobalRpcCache);
- Set<String> servicesGlobal = (Set<String>)latestCache.get(routeIdentifier);
- Assert.assertEquals(servicesGlobal.size(),1);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
+ routingTable.addGlobalRoute(routeIdentifier, new String());
+ routingTable.addGlobalRoute(routeIdentifier, new String());
+ }
- Assert.assertEquals(servicesGlobal.iterator().next(),"172.27.12.1:5000");
+ @Test
+ public void getGlobalRoute_ExistingRouteId_ShouldReturnRoute() throws Exception {
- }
+ Assert.assertNotNull(mockGlobalRpcCache);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
+ String expectedRoute = "172.27.12.1:5000";
- @Test
- public void testGetRoutes() throws Exception {
- ConcurrentMap concurrentMap = createRoutingTableCache();
+ routingTable.addGlobalRoute(routeIdentifier, expectedRoute);
- Assert.assertNotNull(concurrentMap);
- RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = mock(RpcRouter.RouteIdentifier.class);
- InstanceIdentifier identifier = mock(InstanceIdentifier.class);
- when(routeIdentifier.getContext()).thenReturn(QNAME);
- when(routeIdentifier.getRoute()).thenReturn(identifier);
+ String actualRoute = (String) routingTable.getGlobalRoute(routeIdentifier);
+ Assert.assertEquals(expectedRoute, actualRoute);
+ }
- rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+ @Test
+ public void getGlobalRoute_NonExistentRouteId_ShouldReturnNull() throws Exception {
- String globalService = "172.27.12.1:5000";
+ Assert.assertNotNull(mockGlobalRpcCache);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
- when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
- ConcurrentMap latestCache = rti.getRoutingTableCache();
+ String actualRoute = (String) routingTable.getGlobalRoute(routeIdentifier);
+ Assert.assertNull(actualRoute);
+ }
- Assert.assertEquals(concurrentMap,latestCache);
+ @Test
+ public void removeGlobalRoute_ExistingRouteId_ShouldRemove() throws Exception {
- Set<String> servicesGlobal = rti.getRoutes(routeIdentifier);
+ Assert.assertNotNull(mockGlobalRpcCache);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
+ ConcurrentMap cache = routingTable.getGlobalRpcCache();
+ Assert.assertTrue(cache.size() == 0);
+ routingTable.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+ Assert.assertTrue(cache.size() == 1);
- Assert.assertEquals(servicesGlobal.size(),1);
- Iterator<String> iterator = servicesGlobal.iterator();
- while(iterator.hasNext()){
- Assert.assertEquals(iterator.next(),"172.27.12.1:5000");
- }
+ routingTable.removeGlobalRoute(routeIdentifier);
+ Assert.assertTrue(cache.size() == 0);
+ }
- }
- @Test
- public void testRegisterRouteChangeListener() throws Exception {
- Assert.assertEquals(rti.getRegisteredRouteChangeListeners().size(),0);
- rti.registerRouteChangeListener(new RouteChangeListenerImpl());
+ @Test
+ public void removeGlobalRoute_NonExistentRouteId_ShouldDoNothing() throws Exception {
- Assert.assertEquals(rti.getRegisteredRouteChangeListeners().size(),0); //old should not work
- //what about the new approach - using whiteboard pattern
- rti.setRouteChangeListener(new RouteChangeListenerImpl());
+ Assert.assertNotNull(mockGlobalRpcCache);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
- Assert.assertEquals(rti.getRegisteredRouteChangeListeners().size(),1); //should not work
+ ConcurrentMap cache = routingTable.getGlobalRpcCache();
+ Assert.assertTrue(cache.size() == 0);
+ routingTable.removeGlobalRoute(routeIdentifier);
+ Assert.assertTrue(cache.size() == 0);
- }
- @Test
- public void testRemoveGlobalRoute()throws Exception {
+ }
- ConcurrentMap concurrentMap = createRoutingTableCache();
+ @Test
+ public void addRoute_ForNewRouteId_ShouldAddRoute() throws Exception {
+ Assert.assertTrue(mockRpcCache.size() == 0);
- Assert.assertNotNull(concurrentMap);
- RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = mock(RpcRouter.RouteIdentifier.class);
- InstanceIdentifier identifier = mock(InstanceIdentifier.class);
- when(routeIdentifier.getContext()).thenReturn(QNAME);
- when(routeIdentifier.getRoute()).thenReturn(identifier);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeId = getRouteIdentifier();
- rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+ routingTable.addRoute(routeId, new String());
+ Assert.assertTrue(mockRpcCache.size() == 1);
- String globalService = "172.27.12.1:5000";
+ Set<String> routes = routingTable.getRoutes(routeId);
+ Assert.assertEquals(1, routes.size());
+ }
- when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
- ConcurrentMap latestCache = rti.getRoutingTableCache();
+ @Test
+ public void addRoute_ForExistingRouteId_ShouldAppendRoute() throws Exception {
- Assert.assertEquals(concurrentMap,latestCache);
+ Assert.assertTrue(mockRpcCache.size() == 0);
- Set<String> servicesGlobal = rti.getRoutes(routeIdentifier);
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeId = getRouteIdentifier();
+ String route_1 = "10.0.0.1:5955";
+ String route_2 = "10.0.0.2:5955";
- Assert.assertEquals(servicesGlobal.size(),1);
+ routingTable.addRoute(routeId, route_1);
+ routingTable.addRoute(routeId, route_2);
- Assert.assertEquals(servicesGlobal.iterator().next(),"172.27.12.1:5000");
+ Assert.assertTrue(mockRpcCache.size() == 1);
- rti.removeGlobalRoute(routeIdentifier);
+ Set<String> routes = routingTable.getRoutes(routeId);
+ Assert.assertEquals(2, routes.size());
+ Assert.assertTrue(routes.contains(route_1));
+ Assert.assertTrue(routes.contains(route_2));
+ }
- Assert.assertNotNull(rti.getRoutes(routeIdentifier));
+ @Test
+ public void addRoute_UsingMultipleThreads_ShouldNotOverwrite(){
+ ExecutorService threadPool = Executors.newCachedThreadPool();
+ int numOfRoutesToAdd = 100;
+ String routePrefix_1 = "10.0.0.1:555";
+ RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+ threadPool.submit(addRoutes(numOfRoutesToAdd, routePrefix_1, routeId));
+ String routePrefix_2 = "10.0.0.1:556";
+ threadPool.submit(addRoutes(numOfRoutesToAdd, routePrefix_2, routeId));
+ // wait for all tasks to complete; timeout in 10 sec
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(10, TimeUnit.SECONDS); //
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
- private ConcurrentMap createRoutingTableCache() throws Exception {
+ Assert.assertEquals(2*numOfRoutesToAdd, routingTable.getRoutes(routeId).size());
+ }
- //here init
- Component c = mock(Component.class);
+ @Test(expected = NullPointerException.class)
+ public void addRoute_NullRouteId_shouldThrowNpe() throws Exception {
- when(ics.existCache(
- RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE)).thenReturn(false);
+ routingTable.addRoute(null, new String());
+ }
- when(ics.createCache(RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL))).thenReturn(concurrentMapMock);
- rti.setClusterGlobalServices(this.ics);
- rti.init(c);
+ @Test(expected = NullPointerException.class)
+ public void addRoute_NullRoute_shouldThrowNpe() throws Exception{
- Assert.assertEquals(concurrentMapMock,rti.getRoutingTableCache() );
- return concurrentMapMock;
+ routingTable.addRoute(getRouteIdentifier(), null);
+ }
- }
+ @Test (expected = UnsupportedOperationException.class)
+ public void getRoutes_Call_ShouldReturnImmutableCopy() throws Exception{
+ Assert.assertNotNull(routingTable);
+ RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+ routingTable.addRoute(routeId, new String());
+ Set<String> routes = routingTable.getRoutes(routeId); //returns Immutable Set
- @Test
- public void testCreateRoutingTableCacheReturnExistingCache() throws Exception {
- ConcurrentMap concurrentMap = createRoutingTableCache();
+ routes.add(new String()); //can not be modified; should throw
+ }
- //OK here we should try creating again the cache but this time it should return the existing one
- when(ics.existCache(
- RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE)).thenReturn(true);
+ @Test
+ public void getRoutes_With2RoutesFor1RouteId_ShouldReturnASetWithSize2() throws Exception{
+ Assert.assertNotNull(routingTable);
+ RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+ routingTable.addRoute(routeId, "10.0.0.1:5555");
+ routingTable.addRoute(routeId, "10.0.0.2:5555");
- when(ics.getCache(
- RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE)).thenReturn(concurrentMap);
+ Set<String> routes = routingTable.getRoutes(routeId); //returns Immutable Set
+ Assert.assertEquals(2, routes.size());
+ }
- //here init
- Component c = mock(Component.class);
+ @Test
+ public void getLastAddedRoute_WhenMultipleRoutesExists_ShouldReturnLatestRoute()
+ throws Exception {
- rti.init(c);
+ Assert.assertNotNull(routingTable);
+ RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+ String route_1 = "10.0.0.1:5555";
+ String route_2 = "10.0.0.2:5555";
+ routingTable.addRoute(routeId, route_1);
+ routingTable.addRoute(routeId, route_2);
- Assert.assertEquals(concurrentMap,rti.getRoutingTableCache());
+ Assert.assertEquals(route_2, routingTable.getLastAddedRoute(routeId));
+ }
+ @Test
+ public void removeRoute_WhenMultipleRoutesExist_RemovesGivenRoute() throws Exception{
+ Assert.assertNotNull(routingTable);
+ RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+ String route_1 = "10.0.0.1:5555";
+ String route_2 = "10.0.0.2:5555";
+ routingTable.addRoute(routeId, route_1);
+ routingTable.addRoute(routeId, route_2);
+ Assert.assertEquals(2, routingTable.getRoutes(routeId).size());
+ routingTable.removeRoute(routeId, route_1);
+ Assert.assertEquals(1, routingTable.getRoutes(routeId).size());
- }
+ }
- private class RouteChangeListenerImpl<I,R> implements RouteChangeListener<I,R>{
+ @Test
+ public void removeRoute_WhenOnlyOneRouteExists_RemovesRouteId() throws Exception{
+ Assert.assertNotNull(routingTable);
+ RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+ String route_1 = "10.0.0.1:5555";
- @Override
- public void onRouteUpdated(I key, R new_value) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
+ routingTable.addRoute(routeId, route_1);
+ Assert.assertEquals(1, routingTable.getRoutes(routeId).size());
- @Override
- public void onRouteDeleted(I key) {
- //To change body of implemented methods use File | Settings | File Templates.
- }
- }
+ routingTable.removeRoute(routeId, route_1);
+ ConcurrentMap cache = routingTable.getRpcCache();
+ Assert.assertFalse(cache.containsKey(routeId));
+
+ }
+ /*
+ * Private helper methods
+ */
+ private void createRoutingTableCache() throws Exception {
+
+ //here init
+ Component c = mock(Component.class);
+
+ when(clusterService.existCache(
+ RoutingTableImpl.GLOBALRPC_CACHE)).thenReturn(false);
+
+ when(clusterService.createCache(RoutingTableImpl.GLOBALRPC_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL))).
+ thenReturn(mockGlobalRpcCache);
+
+ when(clusterService.existCache(
+ RoutingTableImpl.RPC_CACHE)).thenReturn(false);
+
+ when(clusterService.createCache(RoutingTableImpl.RPC_CACHE,
+ EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL))).
+ thenReturn(mockRpcCache);
+
+ doNothing().when(clusterService).tbegin();
+ doNothing().when(clusterService).tcommit();
+
+ routingTable.setClusterGlobalServices(this.clusterService);
+ routingTable.init(c);
+
+ Assert.assertEquals(mockGlobalRpcCache, routingTable.getGlobalRpcCache());
+ Assert.assertEquals(mockRpcCache, routingTable.getRpcCache());
+ }
+
+ private RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> getRouteIdentifier(){
+ RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = mock(RpcRouter.RouteIdentifier.class);
+ InstanceIdentifier identifier = mock(InstanceIdentifier.class);
+ when(routeIdentifier.getType()).thenReturn(QNAME);
+ when(routeIdentifier.getRoute()).thenReturn(identifier);
+
+ return routeIdentifier;
+ }
+
+ private Runnable addRoutes(final int numRoutes, final String routePrefix, final RpcRouter.RouteIdentifier routeId){
+ return new Runnable() {
+ @Override
+ public void run() {
+ for (int i=0;i<numRoutes;i++){
+ String route = routePrefix + i;
+ try {
+ routingTable.addRoute(routeId, route);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ };
+ }
}
--- /dev/null
+package org.opendaylight.controller.sal.core.api;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public interface RoutedRpcDefaultImplementation {
+
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
+
+}
ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener);
RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+
+ /**
+ * Sets this RoutedRpc Implementation as a delegate rpc provider and will be asked to invoke rpc if the
+ * current provider can't service the rpc request
+ *
+ * @param defaultImplementation
+ * Provider's implementation of RPC functionality
+ */
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation);
}
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
import org.opendaylight.controller.sal.core.api.Broker
import org.opendaylight.controller.sal.core.api.Consumer
import org.opendaylight.controller.sal.core.api.Provider
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext
-import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
import org.opendaylight.yangtools.yang.common.QName
import org.opendaylight.yangtools.yang.common.RpcResult
import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
import org.osgi.framework.BundleContext
import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
private static val log = LoggerFactory.getLogger(BrokerImpl);
override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
router.addRoutedRpcImplementation(rpcType,implementation);
}
-
+
+ override setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+ router.setRoutedRpcDefaultDelegate(defaultImplementation);
+ }
+
override addRpcRegistrationListener(RpcRegistrationListener listener) {
return router.addRpcRegistrationListener(listener);
}
import org.opendaylight.controller.sal.common.DataStoreIdentifier;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
}
@Override
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+ rpcs.setRoutedRpcDefaultDelegate(defaultImplementation);
+ }
+
+ @Override
public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
throws IllegalArgumentException {
return rpcs.addRpcImplementation(rpcType, implementation);
import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
-public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
+public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
private RpcImplementation defaultImplementation;
private SchemaContextProvider schemaProvider;
+ private RoutedRpcDefaultImplementation defaultDelegate;
public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
super();
this.schemaProvider = schemaProvider;
}
+ public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+ return defaultDelegate;
+ }
+
@Override
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+ this.defaultDelegate = defaultDelegate;
+ }
+
+ @Override
public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
checkArgument(rpcType != null, "RPC Type should not be null");
checkArgument(implementation != null, "RPC Implementatoin should not be null");
return ret;
}
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ checkState(defaultDelegate != null);
+ return defaultDelegate.invokeRpc(rpc, identifier, input);
+ }
+
private static abstract class RoutingStrategy implements Identifiable<QName> {
private final QName identifier;
SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
Object route = routeContainer.getValue();
+ checkArgument(route instanceof InstanceIdentifier);
RpcImplementation potential = null;
if (route != null) {
RoutedRpcRegImpl potentialReg = implementations.get(route);
}
}
if (potential == null) {
- potential = defaultDelegate;
+ return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
}
checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
return potential.invokeRpc(rpc, input);
package org.opendaylight.controller.sal.dom.broker.osgi;
import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
+import org.opendaylight.controller.sal.core.api.*;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
return getDelegate().addRoutedRpcImplementation(rpcType, implementation);
}
- @Override
+ @Override
+ public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+ getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
+ }
+
+ @Override
public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
return getDelegate().registerRouteChangeListener(listener);
}
import org.opendaylight.controller.sal.connector.remoterpc.*;
import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.osgi.framework.BundleContext;
/**
ClientImpl clientImpl = new ClientImpl();
- RoutingTableProvider provider = new RoutingTableProvider(bundleContext,serverImpl);
+ RoutingTableProvider provider = new RoutingTableProvider(bundleContext);//,serverImpl);
- RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
-
- facade.setRoutingTableProvider(provider );
-
- broker.registerProvider(facade, bundleContext);
- return facade;
- }
- public void setBundleContext(BundleContext bundleContext) {
- this.bundleContext = bundleContext;
- }
+ facade.setRoutingTableProvider(provider );
+ facade.setContext(bundleContext);
+ facade.setRpcProvisionRegistry((RpcProvisionRegistry) broker);
+
+ broker.registerProvider(facade, bundleContext);
+ return facade;
+ }
+
+ public void setBundleContext(BundleContext bundleContext) {
+ this.bundleContext = bundleContext;
+ }
}
import org.opendaylight.controller.sal.common.util.Rpcs;
import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
this.routingTableProvider = routingTableProvider;
}
- @Override
- public Set<QName> getSupportedRpcs(){
- //TODO: Find the entries from routing table
- return Collections.emptySet();
- }
-
@Override
public void start() {/*NOOPS*/}
* @param input payload for the remote service
* @return
*/
- @Override
public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ String address = lookupRemoteAddressForGlobalRpc(routeId);
+ return sendMessage(input, routeId, address);
+ }
+
+ /**
+ * Finds remote server that can execute this routed rpc and sends a message to it
+ * requesting execution.
+ * The call blocks until a response from remote server is received. Its upto
+ * the client of this API to implement a timeout functionality.
+ *
+ * @param rpc
+ * rpc to be called
+ * @param identifier
+ * instance identifier on which rpc is to be executed
+ * @param input
+ * payload
+ * @return
+ */
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
RouteIdentifierImpl routeId = new RouteIdentifierImpl();
routeId.setType(rpc);
+ routeId.setRoute(identifier);
+
+ String address = lookupRemoteAddressForRpc(routeId);
- String address = lookupRemoteAddress(routeId);
+ return sendMessage(input, routeId, address);
+ }
+ private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
Message request = new Message.MessageBuilder()
.type(Message.MessageType.REQUEST)
.sender(Context.getInstance().getLocalUri())
collectErrors(e, errors);
return Rpcs.getRpcResult(false, null, errors);
}
-
}
/**
* @param routeId route identifier
* @return remote network address
*/
- private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
+ private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){
checkNotNull(routeId, "route must not be null");
- Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+ Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
checkNotNull(routingTable.isPresent(), "Routing table is null");
- Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
- checkNotNull(addresses, "Address not found for route [%s]", routeId);
- checkState(addresses.size() == 1,
- "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
+ String address = null;
+ try {
+ address = routingTable.get().getGlobalRoute(routeId);
+ } catch (RoutingTableException|SystemException e) {
+ _logger.error("Exception caught while looking up remote address " + e);
+ }
+ checkState(address != null, "Address not found for route [%s]", routeId);
+
+ return address;
+ }
+
+ /**
+ * Find address for the given route identifier in routing table
+ * @param routeId route identifier
+ * @return remote network address
+ */
+ private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){
+ checkNotNull(routeId, "route must not be null");
+
+ Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
+ checkNotNull(routingTable.isPresent(), "Routing table is null");
- String address = addresses.iterator().next();
- checkNotNull(address, "Address not found for route [%s]", routeId);
+ String address = routingTable.get().getLastAddedRoute(routeId);
+ checkState(address != null, "Address not found for route [%s]", routeId);
return address;
}
package org.opendaylight.controller.sal.connector.remoterpc;
import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
-public interface RemoteRpcClient extends RpcImplementation,AutoCloseable{
-
+public interface RemoteRpcClient extends AutoCloseable{
void setRoutingTableProvider(RoutingTableProvider provider);
package org.opendaylight.controller.sal.connector.remoterpc;
+import com.google.common.base.Optional;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.osgi.framework.BundleContext;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
-public class RemoteRpcProvider implements
- RemoteRpcServer,
- RemoteRpcClient,
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class RemoteRpcProvider implements
+ RpcImplementation,
+ RoutedRpcDefaultImplementation,
+ AutoCloseable,
Provider {
- private final ServerImpl server;
- private final ClientImpl client;
- private RoutingTableProvider provider;
+ private Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
- @Override
- public void setRoutingTableProvider(RoutingTableProvider provider) {
- this.provider = provider;
- server.setRoutingTableProvider(provider);
- client.setRoutingTableProvider(provider);
+ private final ServerImpl server;
+ private final ClientImpl client;
+ private RoutingTableProvider routingTableProvider;
+ private final RpcListener listener = new RpcListener();
+ private final RoutedRpcListener routeChangeListener = new RoutedRpcListener();
+ private ProviderSession brokerSession;
+ private RpcProvisionRegistry rpcProvisionRegistry;
+ private BundleContext context;
+ private ServiceTracker clusterTracker;
+
+ public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
+ this.server = server;
+ this.client = client;
+ }
+
+ public void setRoutingTableProvider(RoutingTableProvider provider) {
+ this.routingTableProvider = provider;
+ client.setRoutingTableProvider(provider);
+ }
+
+ public void setContext(BundleContext context){
+ this.context = context;
+ }
+
+ public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){
+ this.rpcProvisionRegistry = rpcProvisionRegistry;
+ }
+
+ @Override
+ public void onSessionInitiated(ProviderSession session) {
+ brokerSession = session;
+ server.setBrokerSession(session);
+ start();
+ }
+
+ @Override
+ public Set<QName> getSupportedRpcs() {
+ //TODO: Ask Tony if we need to get this from routing table
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Collection<ProviderFunctionality> getProviderFunctionality() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+ return client.invokeRpc(rpc, input);
+ }
+
+ @Override
+ public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+ return client.invokeRpc(rpc, identifier, input);
+ }
+
+ public void start() {
+ server.start();
+ client.start();
+ brokerSession.addRpcRegistrationListener(listener);
+ rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this);
+ rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+
+ announceSupportedRpcs();
+ announceSupportedRoutedRpcs();
+ }
+
+ @Override
+ public void close() throws Exception {
+ unregisterSupportedRpcs();
+ unregisterSupportedRoutedRpcs();
+ server.close();
+ client.close();
+ }
+
+ public void stop() {
+ server.stop();
+ client.stop();
+ }
+
+ /**
+ * Add all the locally registered RPCs in the clustered routing table
+ */
+ private void announceSupportedRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationAdded(rpc);
}
-
- @Override
- public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
- return client.invokeRpc(rpc, input);
+ }
+
+ /**
+ * Add all the locally registered Routed RPCs in the clustered routing table
+ */
+ private void announceSupportedRoutedRpcs(){
+
+ //TODO: announce all routed RPCs as well
+
+ }
+
+ /**
+ * Un-Register all the supported RPCs from clustered routing table
+ */
+ private void unregisterSupportedRpcs(){
+ Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+ //TODO: remove all routed RPCs as well
+ for (QName rpc : currentlySupported) {
+ listener.onRpcImplementationRemoved(rpc);
}
-
+ }
+
+ /**
+ * Un-Register all the locally supported Routed RPCs from clustered routing table
+ */
+ private void unregisterSupportedRoutedRpcs(){
+
+ //TODO: remove all routed RPCs as well
+
+ }
+
+ private RoutingTable<RpcRouter.RouteIdentifier, String> getRoutingTable(){
+ Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable =
+ routingTableProvider.getRoutingTable();
+
+ checkNotNull(routingTable.isPresent(), "Routing table is null");
+
+ return routingTable.get();
+ }
+
+ /**
+ * Listener for rpc registrations in broker
+ */
+ private class RpcListener implements RpcRegistrationListener {
+
@Override
- public Set<QName> getSupportedRpcs() {
- return client.getSupportedRpcs();
- }
-
-
- public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
- this.server = server;
- this.client = client;
- }
-
- public void setBrokerSession(ProviderSession session) {
- server.setBrokerSession(session);
- }
-// public void setServerPool(ExecutorService serverPool) {
-// server.setServerPool(serverPool);
-// }
- public void start() {
- //when listener was being invoked and addRPCImplementation was being
- //called the client was null.
- server.setClient(client);
- server.start();
- client.start();
+ public void onRpcImplementationAdded(QName rpc) {
+
+ _logger.debug("Adding registration for [{}]", rpc);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+ RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
+ try {
+ routingTable.addGlobalRoute(routeId, server.getServerAddress());
+ _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress());
+
+ } catch (RoutingTableException | SystemException e) {
+ //TODO: This can be thrown when route already exists in the table. Broker
+ //needs to handle this.
+ _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+ }
}
-
@Override
- public Collection<ProviderFunctionality> getProviderFunctionality() {
- // TODO Auto-generated method stub
- return null;
+ public void onRpcImplementationRemoved(QName rpc) {
+
+ _logger.debug("Removing registration for [{}]", rpc);
+ RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+ routeId.setType(rpc);
+
+ RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
+
+ try {
+ routingTable.removeGlobalRoute(routeId);
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route delete failed {}", e);
+ }
}
-
-
+ }
+
+ /**
+ * Listener for Routed Rpc registrations in broker
+ */
+ private class RoutedRpcListener
+ implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier> {
+
+ /**
+ *
+ * @param routeChange
+ */
@Override
- public void onSessionInitiated(ProviderSession session) {
- server.setBrokerSession(session);
- start();
+ public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
+ Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
+ announce(getRouteIdentifiers(announcements));
+
+ Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
+ remove(getRouteIdentifiers(removals));
+ }
+
+ /**
+ *
+ * @param announcements
+ */
+ private void announce(Set<RpcRouter.RouteIdentifier> announcements) {
+ _logger.debug("Announcing [{}]", announcements);
+ RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
+ try {
+ routingTable.addRoutes(announcements, server.getServerAddress());
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route announcement failed {}", e);
+ }
}
-
-
- public void close() throws Exception {
- server.close();
- client.close();
+
+ /**
+ *
+ * @param removals
+ */
+ private void remove(Set<RpcRouter.RouteIdentifier> removals){
+ _logger.debug("Removing [{}]", removals);
+ RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
+ try {
+ routingTable.removeRoutes(removals, server.getServerAddress());
+ } catch (RoutingTableException | SystemException e) {
+ _logger.error("Route removal failed {}", e);
+ }
}
- @Override
- public void stop() {
- server.stop();
- client.stop();
+ /**
+ *
+ * @param changes
+ * @return
+ */
+ private Set<RpcRouter.RouteIdentifier> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
+ RouteIdentifierImpl routeId = null;
+ Set<RpcRouter.RouteIdentifier> routeIdSet = new HashSet<RpcRouter.RouteIdentifier>();
+
+ for (RpcRoutingContext context : changes.keySet()){
+ routeId = new RouteIdentifierImpl();
+ routeId.setType(context.getRpc());
+ routeId.setContext(context.getContext());
+
+ for (InstanceIdentifier instanceId : changes.get(context)){
+ routeId.setRoute(instanceId);
+ routeIdSet.add(routeId);
+ }
+ }
+ return routeIdSet;
}
+
+
+
+ }
+
}
package org.opendaylight.controller.sal.connector.remoterpc;
import com.google.common.base.Optional;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
import org.osgi.framework.BundleContext;
import org.osgi.util.tracker.ServiceTracker;
private RoutingTableImpl routingTableImpl = null;
- final private RouteChangeListener routeChangeListener;
+ //final private RouteChangeListener routeChangeListener;
- public RoutingTableProvider(BundleContext ctx,RouteChangeListener rcl) {
+ public RoutingTableProvider(BundleContext ctx){//,RouteChangeListener rcl) {
@SuppressWarnings("rawtypes")
ServiceTracker<RoutingTable, RoutingTable> rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null);
tracker = rawTracker;
tracker.open();
- routeChangeListener = rcl;
+ //routeChangeListener = rcl;
}
- public Optional<RoutingTable<String, String>> getRoutingTable() {
+ public Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> getRoutingTable() {
@SuppressWarnings("unchecked")
- RoutingTable<String,String> tracked = tracker.getService();
+ RoutingTable<RpcRouter.RouteIdentifier,String> tracked = tracker.getService();
if(tracked instanceof RoutingTableImpl){
if(routingTableImpl != tracked){
routingTableImpl= (RoutingTableImpl)tracked;
- routingTableImpl.setRouteChangeListener(routeChangeListener);
+ //routingTableImpl.setRouteChangeListener(routeChangeListener);
}
}
import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
import org.opendaylight.yangtools.yang.common.QName;
import static com.google.common.base.Preconditions.checkState;
/**
- * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
- * so that it gets route change notifications from routing table.
+ * ZeroMq based implementation of RpcRouter.
*/
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
+public class ServerImpl implements RemoteRpcServer {
private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
private ProviderSession brokerSession;
private ZMQ.Context context;
- private final RpcListener listener = new RpcListener();
-
private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
private final int HANDLER_WORKER_COUNT = 2;
private final int HWM = 200;//high water mark on sockets
private String serverAddress;
private int port;
- private ClientImpl client;
-
- private RoutingTableProvider routingTableProvider;
-
public static enum State {
STARTING, STARTED, STOPPED;
}
this.port = port;
}
- public RoutingTableProvider getRoutingTableProvider() {
- return routingTableProvider;
- }
-
- public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
- this.routingTableProvider = routingTableProvider;
- }
-
- public ClientImpl getClient(){
- return this.client;
- }
-
- public void setClient(ClientImpl client) {
- this.client = client;
- }
-
public State getStatus() {
return this.status;
}
remoteServices = new HashSet<QName>();//
serverPool = Executors.newSingleThreadExecutor();//main server thread
serverPool.execute(receive()); // Start listening rpc requests
- brokerSession.addRpcRegistrationListener(listener);
-
- announceLocalRpcs();
-
- registerRemoteRpcs();
status = State.STARTED;
_logger.info("Remote RPC Server started [{}]", getServerAddress());
if (State.STOPPED == this.getStatus()) return; //do nothing
- unregisterLocalRpcs();
-
if (serverPool != null)
serverPool.shutdown();
/**
* Closes ZMQ Context. It tries to gracefully terminate the context. If
- * termination takes more than a second, its forcefully shutdown.
+ * termination takes more than 5 seconds, its forcefully shutdown.
*/
private void closeZmqContext() {
ExecutorService exec = Executors.newSingleThreadExecutor();
};
}
- /**
- * Register the remote RPCs from the routing table into broker
- */
- private void registerRemoteRpcs(){
- Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
-
- Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
-
- Set<Map.Entry> remoteRoutes =
- routingTableProvider.getRoutingTable().get().getAllRoutes();
-
- //filter out all entries that contains local address
- //we dont want to register local RPCs as remote
- Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
- public boolean apply(Map.Entry remoteRoute){
- return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
- }
- };
-
- //filter the entries created by current node
- Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
-
- for (Map.Entry route : filteredRemoteRoutes){
- onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
- }
- }
-
- /**
- * Un-Register the local RPCs from the routing table
- */
- private void unregisterLocalRpcs(){
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationRemoved(rpc);
- }
- }
-
- /**
- * Publish all the locally registered RPCs in the routing table
- */
- private void announceLocalRpcs(){
- Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
- for (QName rpc : currentlySupported) {
- listener.onRpcImplementationAdded(rpc);
- }
- }
-
- /**
- * @param key
- * @param value
- */
- @Override
- public void onRouteUpdated(String key, String value) {
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- try {
- _logger.debug("Updating key/value {}-{}", key, value);
- brokerSession.addRpcImplementation(
- (QName) rId.fromString(key).getType(), client);
-
- //TODO: Check with Tony for routed rpc
- //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
- } catch (Exception e) {
- _logger.info("Route update failed {}", e);
- }
- }
-
- /**
- * @param key
- */
- @Override
- public void onRouteDeleted(String key) {
- //TODO: Broker session needs to be updated to support this
- throw new UnsupportedOperationException();
- }
-
/**
* Finds IPv4 address of the local VM
* TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
return null;
}
- /**
- * Listener for rpc registrations
- */
- private class RpcListener implements RpcRegistrationListener {
-
- @Override
- public void onRpcImplementationAdded(QName name) {
-
- //if the service name exists in the set, this notice
- //has bounced back from the broker. It should be ignored
- if (remoteServices.contains(name))
- return;
-
- _logger.debug("Adding registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
-
- RoutingTable<String, String> routingTable = getRoutingTable();
-
- try {
- routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
- _logger.debug("Route added [{}-{}]", name, getServerAddress());
-
- } catch (RoutingTableException | SystemException e) {
- //TODO: This can be thrown when route already exists in the table. Broker
- //needs to handle this.
- _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
- }
- }
-
- @Override
- public void onRpcImplementationRemoved(QName name) {
-
- _logger.debug("Removing registration for [{}]", name);
- RouteIdentifierImpl routeId = new RouteIdentifierImpl();
- routeId.setType(name);
-
- RoutingTable<String, String> routingTable = getRoutingTable();
-
- try {
- routingTable.removeGlobalRoute(routeId.toString());
- } catch (RoutingTableException | SystemException e) {
- _logger.error("Route delete failed {}", e);
- }
- }
-
- private RoutingTable<String, String> getRoutingTable(){
- Optional<RoutingTable<String, String>> routingTable =
- routingTableProvider.getRoutingTable();
-
- checkNotNull(routingTable.isPresent(), "Routing table is null");
-
- return routingTable.get();
- }
- }
-
- /*
- * Listener for Route changes in broker. Broker notifies this listener in the event
- * of any change (add/delete). Listener then updates the routing table.
- */
- private class BrokerRouteChangeListener
- implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
-
- @Override
- public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
-
- }
- }
-
}
public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
- transient ObjectMapper mapper = new ObjectMapper();
-
private QName context;
private QName type;
private InstanceIdentifier route;
public void setRoute(InstanceIdentifier route) {
this.route = route;
}
-
- @Override
- public String toString() {
- try {
- return mapper.writeValueAsString(this);
- } catch (Throwable e) {
- //do nothing
- }
-
- return super.toString();
- }
-
- public RpcRouter.RouteIdentifier fromString(String input)
- throws Exception {
-
- JsonNode root = mapper.readTree(input);
- this.context = parseQName(root.get("context"));
- this.type = parseQName(root.get("type"));
-
- return this;
- }
-
- private QName parseQName(JsonNode node){
- if (node == null) return null;
-
- String namespace = (node.get("namespace") != null) ?
- node.get("namespace").asText() : "";
-
- String localName = (node.get("localName") != null) ?
- node.get("localName").asText() : "";
-
- URI uri = URI.create(namespace);
- return new QName(uri, localName);
- }
}
identity remote-zeromq-rpc-server {
base config:module-type;
- config:provided-service remote-rpc-server;
- config:provided-service remote-rpc-client;
config:java-name-prefix ZeroMQServer;
}
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
//mock routing table
routingTableProvider = mock(RoutingTableProvider.class);
- RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
- Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+ RoutingTable<RpcRouter.RouteIdentifier, String> mockRoutingTable = new MockRoutingTable<String, String>();
+ Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
//mock ClientRequestHandler
}
- @Test
+ //@Test
public void invokeRpc_NormalCall_ShouldReturnSuccess() throws Exception {
when(mockHandler.handle(any(Message.class))).
Assert.assertNull(result.getResult());
}
- @Test
+ //@Test
public void invokeRpc_HandlerThrowsException_ShouldReturnError() throws Exception {
when(mockHandler.handle(any(Message.class))).
}
+ @Override
+ public void addRoutes(Set set, Object o) throws RoutingTableException, SystemException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public void removeRoutes(Set set, Object o) throws RoutingTableException, SystemException {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
@Override
public void removeGlobalRoute(Object o) throws RoutingTableException, SystemException {
}
+ @Override
+ public Object getGlobalRoute(Object o) throws RoutingTableException, SystemException {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
@Override
public Set getRoutes(Object o) {
Set<String> routes = new HashSet<String>();
}
@Override
- public Set<Map.Entry> getAllRoutes() {
- return Collections.emptySet();
+ public Object getLastAddedRoute(Object o) {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
}
- @Override
- public Object getARoute(Object o) {
- return null;
- }
+// @Override
+// public Set<Map.Entry> getAllRoutes() {
+// return Collections.emptySet();
+// }
- @Override
- public void registerRouteChangeListener(RouteChangeListener routeChangeListener) {
-
- }
+// @Override
+// public Object getARoute(Object o) {
+// return null;
+// }
}
--- /dev/null
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteRpcProviderTest {
+ @Before
+ public void setUp() throws Exception {
+
+ }
+
+ @After
+ public void tearDown() throws Exception {
+
+ }
+
+ @Test
+ public void testSetRoutingTableProvider() throws Exception {
+
+ }
+
+ @Test
+ public void testOnSessionInitiated() throws Exception {
+
+ }
+
+ @Test
+ public void testGetSupportedRpcs() throws Exception {
+
+ }
+
+ @Test
+ public void testGetProviderFunctionality() throws Exception {
+
+ }
+
+ @Test
+ public void testInvokeRpc() throws Exception {
+
+ }
+
+ @Test
+ public void testInvokeRoutedRpc() throws Exception {
+
+ }
+
+ @Test
+ public void testStart() throws Exception {
+
+ }
+
+ @Test
+ public void testClose() throws Exception {
+
+ }
+
+ @Test
+ public void testStop() throws Exception {
+
+ }
+}
+++ /dev/null
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others. All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import java.net.URI;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RouteIdentifierImplTest {
-
- Logger _logger = LoggerFactory.getLogger(RouteIdentifierImplTest.class);
-
- private final URI namespace = URI.create("http://cisco.com/example");
- private final QName QNAME = new QName(namespace, "heartbeat");
-
- @Test
- public void testToString() throws Exception {
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- rId.setType(QNAME);
-
- _logger.debug(rId.toString());
-
- Assert.assertTrue(true);
-
- }
-
- @Test
- public void testFromString() throws Exception {
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- rId.setType(QNAME);
-
- String s = rId.toString();
- _logger.debug("serialized route: {}", s);
-
- RpcRouter.RouteIdentifier ref = new RouteIdentifierImpl().fromString(s);
- _logger.debug("deserialized route: {}", ref);
-
- Assert.assertTrue(true);
- }
-
- @Test(expected = JsonParseException.class)
- public void testFromInvalidString() throws Exception {
- String invalidInput = "aklhdgadfa;;;;;;;]]]]=]ag" ;
- RouteIdentifierImpl rId = new RouteIdentifierImpl();
- rId.fromString(invalidInput);
-
- _logger.debug("" + rId);
- Assert.assertTrue(true);
- }
-}
import com.google.common.base.Optional;
import junit.framework.Assert;
import org.junit.*;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
import org.opendaylight.controller.sal.core.api.Broker;
server = new ServerImpl(port);
server.setBrokerSession(brokerSession);
- server.setRoutingTableProvider(routingTableProvider);
- RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
- Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+ RoutingTable<RpcRouter.RouteIdentifier, String> mockRoutingTable = new MockRoutingTable<String, String>();
+ Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
when(brokerSession.addRpcRegistrationListener(listener)).thenReturn(null);
Assert.assertEquals(ServerImpl.State.STOPPED, server.getStatus());
}
- @Test
- public void getRoutingTableProvider_Call_ShouldReturnRoutingTable() throws Exception {
- Assert.assertNotNull(server.getRoutingTableProvider());
- }
-
@Test
public void getBrokerSession_Call_ShouldReturnBrokerSession() throws Exception {
Optional<Broker.ProviderSession> mayBeBroker = server.getBrokerSession();
</Export-Package>
<Import-Package>
com.sun.jersey.spi.container.servlet,
- org.codehaus.jackson.annotate,
+ !org.codehaus.jackson.annotate,
javax.ws.rs,
javax.ws.rs.core,
javax.xml.bind,
javax.xml.bind.annotation,
org.slf4j,
org.apache.catalina.filters,
- org.codehaus.jackson.jaxrs,
+ !org.codehaus.jackson.jaxrs,
org.opendaylight.controller.sample.zeromq.provider,
org.opendaylight.controller.sample.zeromq.consumer,
org.opendaylight.controller.sal.utils,