Adding routed RPC support in Remote RPC Router 83/4683/8
authorAbhishek Kumar <abhishk2@cisco.com>
Fri, 24 Jan 2014 08:31:49 +0000 (00:31 -0800)
committerTony Tkacik <ttkacik@cisco.com>
Wed, 26 Feb 2014 14:14:02 +0000 (15:14 +0100)
 - Updated RoutingTable to add a new routed RPC cache
 - Updated sal-core-api to add Remote RPC Router as default delegate in SchemaAwareRpcBroker
 - Refactored RemoteRpcProvider code.
 - Added unit tests for routing table
 - Removed RouteIdentifier <-> JSON conversion. Its not used anymore
 - Rebased with master

Change-Id: Id13a0a6a9ae6ded7ea9068b7a613a9e196c89a7b
Signed-off-by: Abhishek Kumar <abhishk2@cisco.com>
24 files changed:
opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/api/RoutingTable.java
opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/impl/Activator.java
opendaylight/md-sal/remoterpc-routingtable/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImpl.java
opendaylight/md-sal/remoterpc-routingtable/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/impl/RoutingTableImplTest.java
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java [new file with mode: 0644]
opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RpcProvisionRegistry.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/BrokerImpl.xtend
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/MountPointImpl.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/impl/SchemaAwareRpcBroker.java
opendaylight/md-sal/sal-dom-broker/src/main/java/org/opendaylight/controller/sal/dom/broker/osgi/RpcProvisionRegistryProxy.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/config/yang/md/sal/remote/rpc/ZeroMQServerModule.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcClient.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProvider.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/RoutingTableProvider.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/java/org/opendaylight/controller/sal/connector/remoterpc/dto/RouteIdentifierImpl.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/main/yang/odl-sal-dom-rpc-remote-cfg.yang
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ClientImplTest.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/MockRoutingTable.java
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProviderTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java [deleted file]
opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/ServerImplTest.java
opendaylight/md-sal/sal-remoterpc-connector/integrationtest/test-nb/pom.xml

index 2da031e54008cb68c5cf23cddc84b4cd50742da1..e5e314cd877d74ab7e52d505a80cf4e1ea059d44 100644 (file)
@@ -7,25 +7,33 @@
  */
 package org.opendaylight.controller.sal.connector.remoterpc.api;
 
-import java.util.Map;
 import java.util.Set;
 
 public interface RoutingTable<I,R> {
 
-
-
   /**
-   * Adds a network address for the route. If address for route
-   * exists, appends the address to the list
+   * Adds a network address for the route. If the route already exists,
+   * it throws <code>DuplicateRouteException</code>.
+   * This method would be used when registering a global service.
+   *
    *
    * @param routeId route identifier
    * @param route network address
-   * @throws RoutingTableException for any logical exception
+   * @throws DuplicateRouteException
+   * @throws RoutingTableException
+   */
+  public void addGlobalRoute(I routeId, R route) throws  RoutingTableException, SystemException;
+
+  /**
+   * Remove the route.
+   * This method would be used when registering a global service.
+   * @param routeId
+   * @throws RoutingTableException
    * @throws SystemException
    */
-  public void addRoute(I routeId, R route) throws  RoutingTableException,SystemException;
+  public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException;
 
-    /**
+  /**
    * Adds a network address for the route. If the route already exists,
    * it throws <code>DuplicateRouteException</code>.
    * This method would be used when registering a global service.
@@ -36,9 +44,18 @@ public interface RoutingTable<I,R> {
    * @throws DuplicateRouteException
    * @throws RoutingTableException
    */
-  public void addGlobalRoute(I routeId, R route) throws  RoutingTableException, SystemException;
-
+  public R getGlobalRoute(I routeId) throws  RoutingTableException, SystemException;
 
+  /**
+   * Adds a network address for the route. If address for route
+   * exists, appends the address to the list
+   *
+   * @param routeId route identifier
+   * @param route network address
+   * @throws RoutingTableException for any logical exception
+   * @throws SystemException
+   */
+  public void addRoute(I routeId, R route) throws RoutingTableException,SystemException;
 
 
   /**
@@ -47,17 +64,28 @@ public interface RoutingTable<I,R> {
    * @param routeId
    * @param route
    */
-  public void removeRoute(I routeId, R route);
+  public void removeRoute(I routeId, R route) throws RoutingTableException,SystemException;
 
+  /**
+   * Adds address for a set of route identifiers. If address for route
+   * exists, appends the address to the set.
+   *
+   * @param routeIds a set of routeIds
+   * @param route network address
+   * @throws RoutingTableException for any logical exception
+   * @throws SystemException
+   */
+  public void addRoutes(Set<I> routeIds, R route) throws  RoutingTableException,SystemException;
 
-    /**
-     * Remove the route.
-     * This method would be used when registering a global service.
-     * @param routeId
-     * @throws RoutingTableException
-     * @throws SystemException
-     */
-    public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException;
+  /**
+   * Removes address for a set of route identifiers.
+   *
+   * @param routeIds a set of routeIds
+   * @param route network address
+   * @throws RoutingTableException for any logical exception
+   * @throws SystemException
+   */
+  public void removeRoutes(Set<I> routeIds, R route) throws  RoutingTableException,SystemException;
 
   /**
    * Returns a set of network addresses associated with this route
@@ -66,29 +94,14 @@ public interface RoutingTable<I,R> {
    */
   public Set<R> getRoutes(I routeId);
 
-  /**
-   * Returns all network addresses stored in the table
-   * @return
-   */
-  public Set<Map.Entry> getAllRoutes();
 
   /**
-   * Returns only one address from the list of network addresses
-   * associated with the route. The algorithm to determine that
-   * one address is upto the implementer
+   * Returns the last inserted address from the list of network addresses
+   * associated with the route.
    * @param routeId
    * @return
    */
-  public R getARoute(I routeId);
-
-    /**
-     *
-     * This will be removed after listeners
-     * have made change on their end to use whiteboard pattern
-     * @deprecated
-     */
-
-  public void registerRouteChangeListener(RouteChangeListener listener);
+  public R getLastAddedRoute(I routeId);
 
   public class DuplicateRouteException extends RoutingTableException {
       public DuplicateRouteException(String message) {
index 6e2d280a89f5158358c3d6757c7c1e4e2f72e64e..a826a3c1d7bfcc3bac9c43cd2ae43345bff1b21d 100644 (file)
@@ -67,7 +67,8 @@ public class Activator extends ComponentActivatorAbstractBase {
         if (imp.equals(RoutingTableImpl.class)) {
             Dictionary<String, Set<String>> props = new Hashtable<String, Set<String>>();
             Set<String> propSet = new HashSet<String>();
-            propSet.add(RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE);
+            propSet.add(RoutingTableImpl.GLOBALRPC_CACHE);
+            propSet.add(RoutingTableImpl.RPC_CACHE);
             props.put(CACHE_UPDATE_AWARE_REGISTRY_KEY, propSet);
 
             c.setInterface(new String[] { RoutingTable.class.getName(),ICacheUpdateAware.class.getName()  }, props);
index 40c4c6b43625d9c99eecb786a1556762cbdc0085..d6b42faccf60f8351f849cdd350f02819bec0674 100644 (file)
@@ -8,15 +8,11 @@
 
 package org.opendaylight.controller.sal.connector.remoterpc.impl;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import org.apache.felix.dm.Component;
-import org.opendaylight.controller.clustering.services.CacheConfigException;
-import org.opendaylight.controller.clustering.services.CacheExistException;
-import org.opendaylight.controller.clustering.services.CacheListenerAddException;
-import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
-import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
-import org.opendaylight.controller.clustering.services.IClusterServices;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.clustering.services.*;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
@@ -27,309 +23,415 @@ import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
 import javax.transaction.NotSupportedException;
 import javax.transaction.RollbackException;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.ConcurrentMap;
 
-/**
- * @author: syedbahm
- */
 public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
-    public static final String ROUTING_TABLE_GLOBAL_CACHE = "routing_table_global_cache";
 
-    private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
+  private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
 
-    private IClusterGlobalServices clusterGlobalServices = null;
-    private RoutingTableImpl routingTableInstance = null;
-    private ConcurrentMap routingTableCache = null;
-    private Set<RouteChangeListener> routeChangeListeners = Collections
-            .synchronizedSet(new HashSet<RouteChangeListener>());
+  private IClusterGlobalServices clusterGlobalServices = null;
 
-    public RoutingTableImpl() {
-    }
+  private ConcurrentMap<I,R> globalRpcCache = null;
+  private ConcurrentMap<I, LinkedHashSet<R>> rpcCache = null;  //need routes to ordered by insert-order
 
-    @Override
-    public void addRoute(I routeId, R route) throws RoutingTableException {
-        throw new UnsupportedOperationException(" Not implemented yet!");
-    }
+  public static final String GLOBALRPC_CACHE = "remoterpc_routingtable.globalrpc_cache";
+  public static final String RPC_CACHE = "remoterpc_routingtable.rpc_cache";
 
-    @Override
-    public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
-        Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
-        Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
-        try {
-
-            Set<R> existingRoute = null;
-            // ok does the global route is already registered ?
-            if ((existingRoute = getRoutes(routeId)) == null) {
-
-                if (log.isDebugEnabled()) {
-                    log.debug("addGlobalRoute: adding  a new route with id" + routeId + " and value = "
-                            + route);
-                }
-                // lets start a transaction
-                clusterGlobalServices.tbegin();
-
-                routingTableCache.put(routeId, route);
-                clusterGlobalServices.tcommit();
-            } else {
-                throw new DuplicateRouteException(" There is already existing route " + existingRoute);
-            }
-
-        } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
-            throw new RoutingTableException("Transaction error - while trying to create route id="
-                    + routeId + "with route" + route, e);
-        } catch (javax.transaction.SystemException e) {
-            throw new SystemException("System error occurred - while trying to create with value", e);
-        }
+  public RoutingTableImpl() {
+  }
 
-    }
+  @Override
+  public R getGlobalRoute(I routeId) throws RoutingTableException, SystemException {
+    Preconditions.checkNotNull(routeId, "getGlobalRoute: routeId cannot be null!");
+    return globalRpcCache.get(routeId);
+  }
 
-    @Override
-    public void removeRoute(I routeId, R route) {
-        throw new UnsupportedOperationException("Not implemented yet!");
+  @Override
+  public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
+    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+    try {
+
+      log.debug("addGlobalRoute: adding  a new route with id[{}] and value [{}]", routeId, route);
+      clusterGlobalServices.tbegin();
+      if (globalRpcCache.putIfAbsent(routeId, route) != null) {
+        throw new DuplicateRouteException(" There is already existing route " + routeId);
+      }
+      clusterGlobalServices.tcommit();
+
+    } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+      throw new RoutingTableException("Transaction error - while trying to create route id="
+          + routeId + "with route" + route, e);
+    } catch (javax.transaction.SystemException e) {
+      throw new SystemException("System error occurred - while trying to create with value", e);
     }
 
-    @Override
-    public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
-        Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug("removeGlobalRoute: removing  a new route with id" + routeId);
-            }
-            // lets start a transaction
-            clusterGlobalServices.tbegin();
-
-            routingTableCache.remove(routeId);
-            clusterGlobalServices.tcommit();
-
-        } catch (NotSupportedException|HeuristicRollbackException|RollbackException|HeuristicMixedException e) {
-            throw new RoutingTableException("Transaction error - while trying to remove route id="
-                    + routeId, e);
-        } catch (javax.transaction.SystemException e) {
-            throw new SystemException("System error occurred - while trying to remove with value", e);
-        }
+  }
+
+  @Override
+  public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
+    Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+    try {
+      log.debug("removeGlobalRoute: removing  a new route with id [{}]", routeId);
+
+      clusterGlobalServices.tbegin();
+      globalRpcCache.remove(routeId);
+      clusterGlobalServices.tcommit();
+
+    } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+      throw new RoutingTableException("Transaction error - while trying to remove route id="
+          + routeId, e);
+    } catch (javax.transaction.SystemException e) {
+      throw new SystemException("System error occurred - while trying to remove with value", e);
     }
+  }
+
 
-    @Override
-    public Set<R> getRoutes(I routeId) {
+  @Override
+  public Set<R> getRoutes(I routeId) {
+    Preconditions.checkNotNull(routeId, "getRoutes: routeId cannot be null!");
+    Set<R> routes = rpcCache.get(routeId);
 
-        // Note: currently works for global routes only wherein there is just single
-        // route
-        Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!");
-        R route = (R)routingTableCache.get(routeId);
-        Set<R>routes = null;
-        if(route !=null){
-           routes = new HashSet<R>();
-           routes.add(route);
-        }
+    if (routes == null) return Collections.emptySet();
+
+    return ImmutableSet.copyOf(routes);
+  }
 
-        return routes;
-    }
+
+
+  public R getLastAddedRoute(I routeId) {
+
+    Set<R> routes = getRoutes(routeId);
+
+    if (routes.isEmpty()) return null;
+
+    R route = null;
+    Iterator<R> iter = routes.iterator();
+    while (iter.hasNext())
+      route = iter.next();
+
+    return route;
+  }
 
   @Override
-  public Set<Map.Entry> getAllRoutes() {
-    return routingTableCache.entrySet();
+  public void addRoute(I routeId, R route)  throws RoutingTableException, SystemException {
+    Preconditions.checkNotNull(routeId, "addRoute: routeId cannot be null");
+    Preconditions.checkNotNull(route, "addRoute: route cannot be null");
+
+    try{
+      clusterGlobalServices.tbegin();
+      log.debug("addRoute: adding a route with k/v [{}/{}]", routeId, route);
+      threadSafeAdd(routeId, route);
+      clusterGlobalServices.tcommit();
+
+    } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+      throw new RoutingTableException("Transaction error - while trying to remove route id="
+          + routeId, e);
+    } catch (javax.transaction.SystemException e) {
+      throw new SystemException("System error occurred - while trying to remove with value", e);
+    }
   }
 
   @Override
-    public R getARoute(I routeId) {
-        throw new UnsupportedOperationException("Not implemented yet!");
+  public void addRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
+    Preconditions.checkNotNull(routeIds, "addRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      addRoute(routeId, route);
     }
+  }
 
-    /**
-     * @deprecated doesn't do anything will be removed once listeners used
-     *             whiteboard pattern Registers listener for sending any change
-     *             notification
-     * @param listener
-     */
-    @Override
-    public void registerRouteChangeListener(RouteChangeListener listener) {
+  @Override
+  public void removeRoute(I routeId, R route) throws RoutingTableException, SystemException {
+    Preconditions.checkNotNull(routeId, "removeRoute: routeId cannot be null!");
+    Preconditions.checkNotNull(route, "removeRoute: route cannot be null!");
 
-    }
+    LinkedHashSet<R> routes = rpcCache.get(routeId);
+    if (routes == null) return;
 
-    public void setRouteChangeListener(RouteChangeListener rcl) {
-        if(rcl != null){
-            routeChangeListeners.add(rcl);
-        }else{
-            log.warn("setRouteChangeListener called with null listener");
-        }
-    }
+    try {
+      log.debug("removeRoute: removing  a new route with k/v [{}/{}]", routeId, route);
 
-    public void unSetRouteChangeListener(RouteChangeListener rcl) {
-        if(rcl != null){
-         routeChangeListeners.remove(rcl);
-        }else{
-            log.warn("unSetRouteChangeListener called with null listener");
-        }
+      clusterGlobalServices.tbegin();
+      threadSafeRemove(routeId, route);
+      clusterGlobalServices.tcommit();
+
+    } catch (NotSupportedException | HeuristicRollbackException | RollbackException | HeuristicMixedException e) {
+      throw new RoutingTableException("Transaction error - while trying to remove route id="
+          + routeId, e);
+    } catch (javax.transaction.SystemException e) {
+      throw new SystemException("System error occurred - while trying to remove with value", e);
     }
+  }
 
-    /**
-     * Returning the set of route change listeners for Unit testing Note: the
-     * package scope is default
-     *
-     * @return List of registered RouteChangeListener<I,R> listeners
-     */
-    Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
-        return routeChangeListeners;
+  @Override
+  public void removeRoutes(Set<I> routeIds, R route) throws RoutingTableException, SystemException {
+    Preconditions.checkNotNull(routeIds, "removeRoutes: routeIds must not be null");
+    for (I routeId : routeIds){
+      removeRoute(routeId, route);
     }
+  }
 
-    public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
-        this.clusterGlobalServices = clusterGlobalServices;
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 100 times then throw
+   */
+  private void threadSafeAdd(I routeId, R route) {
+
+    for (int i=0;i<100;i++){
+
+      LinkedHashSet<R> updatedRoutes = new LinkedHashSet<>();
+      updatedRoutes.add(route);
+      LinkedHashSet<R> oldRoutes = rpcCache.putIfAbsent(routeId, updatedRoutes);
+      if (oldRoutes == null) return;
+
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.add(route);
+
+      if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
     }
+    //the method did not already return means it failed to add route in 10 attempts
+    throw new IllegalStateException("Failed to add route [" + routeId + "]");
+  }
+
+  /**
+   * This method guarantees that no 2 thread over write each other's changes.
+   * Just so that we dont end up in infinite loop, it tries for 10 times then throw
+   */
+  private void threadSafeRemove(I routeId, R route) {
+    LinkedHashSet<R> updatedRoutes = null;
+    for (int i=0;i<10;i++){
+      LinkedHashSet<R> oldRoutes = rpcCache.get(routeId);
+
+      // if route to be deleted is the only entry in the set then remove routeId from the cache
+      if ((oldRoutes.size() == 1) && oldRoutes.contains(route)){
+        rpcCache.remove(routeId);
+        return;
+      }
+
+      // if there are multiple routes for this routeId, remove the route to be deleted only from the set.
+      updatedRoutes = new LinkedHashSet<>(oldRoutes);
+      updatedRoutes.remove(route);
+      if (rpcCache.replace(routeId, oldRoutes, updatedRoutes)) return;
 
-    public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
-        if((clusterGlobalServices != null ) &&  (this.clusterGlobalServices.equals(clusterGlobalServices))){
-            this.clusterGlobalServices = null;
-        }
     }
+    //the method did not already return means it failed to remove route in 10 attempts
+    throw new IllegalStateException("Failed to remove route [" + routeId + "]");
+  }
+
 
-    /**
-     * Creates the Routing Table clustered global services cache
-     *
-     * @throws CacheExistException
-     *           -- cluster global services exception when cache exist
-     * @throws CacheConfigException
-     *           -- cluster global services exception during cache config
-     * @throws CacheListenerAddException
-     *           -- cluster global services exception during adding of listener
-     */
-
-    void createRoutingTableCache() throws CacheExistException, CacheConfigException,
-            CacheListenerAddException {
-        // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
-        // should be caching?
-
-        // let us check here if the cache already exists -- if so don't create
-        if (!clusterGlobalServices.existCache(ROUTING_TABLE_GLOBAL_CACHE)) {
-
-            if (log.isDebugEnabled()) {
-                log.debug("createRoutingTableCache: creating a new routing table cache "
-                        + ROUTING_TABLE_GLOBAL_CACHE);
-            }
-            routingTableCache = clusterGlobalServices.createCache(ROUTING_TABLE_GLOBAL_CACHE,
-                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("createRoutingTableCache: found existing routing table cache "
-                        + ROUTING_TABLE_GLOBAL_CACHE);
-            }
-            routingTableCache = clusterGlobalServices.getCache(ROUTING_TABLE_GLOBAL_CACHE);
-        }
+//    /**
+//     * @deprecated doesn't do anything will be removed once listeners used
+//     *             whiteboard pattern Registers listener for sending any change
+//     *             notification
+//     * @param listener
+//     */
+//    @Override
+//    public void registerRouteChangeListener(RouteChangeListener listener) {
+//
+//    }
+
+//    public void setRouteChangeListener(RouteChangeListener rcl) {
+//        if(rcl != null){
+//            routeChangeListeners.add(rcl);
+//        }else{
+//            log.warn("setRouteChangeListener called with null listener");
+//        }
+//    }
+//
+//    public void unSetRouteChangeListener(RouteChangeListener rcl) {
+//        if(rcl != null){
+//         routeChangeListeners.remove(rcl);
+//        }else{
+//            log.warn("unSetRouteChangeListener called with null listener");
+//        }
+//    }
+
+  /**
+   * Returning the set of route change listeners for Unit testing Note: the
+   * package scope is default
+   *
+   * @return List of registered RouteChangeListener<I,R> listeners
+   */
+//    Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
+//        return routeChangeListeners;
+//    }
+  public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
+    this.clusterGlobalServices = clusterGlobalServices;
+  }
+
+  public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
+    if ((clusterGlobalServices != null) && (this.clusterGlobalServices.equals(clusterGlobalServices))) {
+      this.clusterGlobalServices = null;
+    }
+  }
 
+  /**
+   * Finds OR Creates clustered cache for Global RPCs
+   *
+   * @throws CacheExistException       -- cluster global services exception when cache exist
+   * @throws CacheConfigException      -- cluster global services exception during cache config
+   * @throws CacheListenerAddException -- cluster global services exception during adding of listener
+   */
+
+  void findOrCreateGlobalRpcCache() throws CacheExistException, CacheConfigException,
+      CacheListenerAddException {
+    // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
+    // should be caching?
+
+    // let us check here if the cache already exists -- if so don't create
+    if (!clusterGlobalServices.existCache(GLOBALRPC_CACHE)) {
+
+      globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.createCache(GLOBALRPC_CACHE,
+          EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+      log.debug("Cache created [{}] ", GLOBALRPC_CACHE);
+
+    } else {
+      globalRpcCache = (ConcurrentMap<I,R>) clusterGlobalServices.getCache(GLOBALRPC_CACHE);
+      log.debug("Cache exists [{}] ", GLOBALRPC_CACHE);
     }
+  }
 
-    /**
-     * Function called by the dependency manager when all the required
-     * dependencies are satisfied
-     *
-     */
-    void init(Component c) {
-        try {
-
-            createRoutingTableCache();
-        } catch (CacheExistException e) {
-            throw new IllegalStateException("could not construct routing table cache");
-        } catch (CacheConfigException e) {
-            throw new IllegalStateException("could not construct routing table cache");
-        } catch (CacheListenerAddException e) {
-            throw new IllegalStateException("could not construct routing table cache");
-        }
+  /**
+   * Finds OR Creates clustered cache for Routed RPCs
+   *
+   * @throws CacheExistException       -- cluster global services exception when cache exist
+   * @throws CacheConfigException      -- cluster global services exception during cache config
+   * @throws CacheListenerAddException -- cluster global services exception during adding of listener
+   */
+
+  void findOrCreateRpcCache() throws CacheExistException, CacheConfigException,
+      CacheListenerAddException {
+    // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
+    // should be caching?
+
+    if (clusterGlobalServices.existCache(RPC_CACHE)){
+      rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.getCache(RPC_CACHE);
+      log.debug("Cache exists [{}] ", RPC_CACHE);
+      return;
     }
 
-    /**
-     * Get routing table method is useful for unit testing <note>It has package
-     * scope</note>
-     */
-    ConcurrentMap getRoutingTableCache() {
-        return this.routingTableCache;
+    //cache doesnt exist, create one
+    rpcCache = (ConcurrentMap<I,LinkedHashSet<R>>) clusterGlobalServices.createCache(RPC_CACHE,
+          EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+    log.debug("Cache created [{}] ", RPC_CACHE);
+  }
+
+
+  /**
+   * Function called by the dependency manager when all the required
+   * dependencies are satisfied
+   */
+  void init(Component c) {
+    try {
+
+      findOrCreateGlobalRpcCache();
+      findOrCreateRpcCache();
+
+    } catch (CacheExistException|CacheConfigException|CacheListenerAddException e) {
+      throw new IllegalStateException("could not construct routing table cache");
     }
+  }
+
+  /**
+   * Useful for unit testing <note>It has package
+   * scope</note>
+   */
+  ConcurrentMap getGlobalRpcCache() {
+    return this.globalRpcCache;
+  }
+
+  /**
+   * Useful for unit testing <note>It has package
+   * scope</note>
+   */
+  ConcurrentMap getRpcCache() {
+    return this.rpcCache;
+  }
 
-    /**
-     * This is used from integration test NP rest API to check out the result of the
-     * cache population
-     * <Note> For testing purpose only-- use it wisely</Note>
-     * @return
-     */
-    public String dumpRoutingTableCache(){
-       Set<Map.Entry<I, R>> cacheEntrySet = this.routingTableCache.entrySet();
-       StringBuilder sb = new StringBuilder();
-       for(Map.Entry<I,R> entry:cacheEntrySet){
-           sb.append("Key:").append(entry.getKey()).append("---->Value:")
-                   .append((entry.getValue() != null)?entry.getValue():"null")
-                   .append("\n");
-       }
-       return sb.toString();
+  /**
+   * This is used from integration test NP rest API to check out the result of the
+   * cache population
+   * <Note> For testing purpose only-- use it wisely</Note>
+   *
+   * @return
+   */
+  public String dumpGlobalRpcCache() {
+    Set<Map.Entry<I, R>> cacheEntrySet = this.globalRpcCache.entrySet();
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<I, R> entry : cacheEntrySet) {
+      sb.append("Key:").append(entry.getKey()).append("---->Value:")
+          .append((entry.getValue() != null) ? entry.getValue() : "null")
+          .append("\n");
     }
+    return sb.toString();
+  }
 
-    /**
-     * Invoked when a new entry is available in the cache, the key is only
-     * provided, the value will come as an entryUpdate invocation
-     *
-     * @param key
-     *          Key for the entry just created
-     * @param cacheName
-     *          name of the cache for which update has been received
-     * @param originLocal
-     *          true if the event is generated from this node
-     */
-    @Override
-    public void entryCreated(I key, String cacheName, boolean originLocal) {
-        // TBD: do we require this.
-        if (log.isDebugEnabled()) {
-            log.debug("RoutingTableUpdates: entryCreated  routeId = " + key + " cacheName=" + cacheName);
-        }
+  public String dumpRpcCache() {
+    Set<Map.Entry<I, LinkedHashSet<R>>> cacheEntrySet = this.rpcCache.entrySet();
+    StringBuilder sb = new StringBuilder();
+    for (Map.Entry<I, LinkedHashSet<R>> entry : cacheEntrySet) {
+      sb.append("Key:").append(entry.getKey()).append("---->Value:")
+          .append((entry.getValue() != null) ? entry.getValue() : "null")
+          .append("\n");
     }
+    return sb.toString();
+  }
+  /**
+   * Invoked when a new entry is available in the cache, the key is only
+   * provided, the value will come as an entryUpdate invocation
+   *
+   * @param key         Key for the entry just created
+   * @param cacheName   name of the cache for which update has been received
+   * @param originLocal true if the event is generated from this node
+   */
+  @Override
+  public void entryCreated(I key, String cacheName, boolean originLocal) {
+    // TBD: do we require this.
+    if (log.isDebugEnabled()) {
+      log.debug("RoutingTableUpdates: entryCreated  routeId = " + key + " cacheName=" + cacheName);
+    }
+  }
 
-    /**
-     * Called anytime a given entry is updated
-     *
-     * @param key
-     *          Key for the entry modified
-     * @param new_value
-     *          the new value the key will have
-     * @param cacheName
-     *          name of the cache for which update has been received
-     * @param originLocal
-     *          true if the event is generated from this node
-     */
-    @Override
-    public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
-        if (log.isDebugEnabled()) {
-            log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + ",value = " + new_value
-                    + " ,cacheName=" + cacheName + " originLocal="+originLocal);
-        }
-        if (!originLocal) {
-            for (RouteChangeListener rcl : routeChangeListeners) {
-                rcl.onRouteUpdated(key, new_value);
-            }
-        }
+  /**
+   * Called anytime a given entry is updated
+   *
+   * @param key         Key for the entry modified
+   * @param new_value   the new value the key will have
+   * @param cacheName   name of the cache for which update has been received
+   * @param originLocal true if the event is generated from this node
+   */
+  @Override
+  public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
+    if (log.isDebugEnabled()) {
+      log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + ",value = " + new_value
+          + " ,cacheName=" + cacheName + " originLocal=" + originLocal);
     }
+//        if (!originLocal) {
+//            for (RouteChangeListener rcl : routeChangeListeners) {
+//                rcl.onRouteUpdated(key, new_value);
+//            }
+//        }
+  }
 
-    /**
-     * Called anytime a given key is removed from the ConcurrentHashMap we are
-     * listening to.
-     *
-     * @param key
-     *          Key of the entry removed
-     * @param cacheName
-     *          name of the cache for which update has been received
-     * @param originLocal
-     *          true if the event is generated from this node
-     */
-    @Override
-    public void entryDeleted(I key, String cacheName, boolean originLocal) {
-        if (log.isDebugEnabled()) {
-            log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + " local = " + originLocal
-                    + " cacheName=" + cacheName + " originLocal="+originLocal);
-        }
-        if (!originLocal) {
-            for (RouteChangeListener rcl : routeChangeListeners) {
-                rcl.onRouteDeleted(key);
-            }
-        }
+  /**
+   * Called anytime a given key is removed from the ConcurrentHashMap we are
+   * listening to.
+   *
+   * @param key         Key of the entry removed
+   * @param cacheName   name of the cache for which update has been received
+   * @param originLocal true if the event is generated from this node
+   */
+  @Override
+  public void entryDeleted(I key, String cacheName, boolean originLocal) {
+    if (log.isDebugEnabled()) {
+      log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + " local = " + originLocal
+          + " cacheName=" + cacheName + " originLocal=" + originLocal);
     }
+//        if (!originLocal) {
+//            for (RouteChangeListener rcl : routeChangeListeners) {
+//                rcl.onRouteDeleted(key);
+//            }
+//        }
+  }
 }
\ No newline at end of file
index 50460d4e5ec897892a9a507a1b48244098887c86..0987df595689176b2d734da6c048ef00a909b2b0 100644 (file)
@@ -10,196 +10,321 @@ package org.opendaylight.controller.sal.connector.remoterpc.impl;
 
 import junit.framework.Assert;
 import org.apache.felix.dm.Component;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
 import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.HashSet;
-import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.*;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 
-/**
- * @author: syedbahm
- */
 public class RoutingTableImplTest {
 
-    private IClusterGlobalServices ics =  mock(IClusterGlobalServices.class);
-    private RoutingTableImpl rti = new RoutingTableImpl();
+  private final URI namespace = URI.create("http://cisco.com/example");
+  private final QName QNAME = new QName(namespace, "global");
 
-    private final URI namespace = URI.create("http://cisco.com/example");
-    private final QName QNAME = new QName(namespace,"global");
+  private IClusterGlobalServices clusterService;
+  private RoutingTableImpl<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String> routingTable;
+  ConcurrentMap mockGlobalRpcCache;
+  ConcurrentMap mockRpcCache;
 
-    ConcurrentMap concurrentMapMock = mock(ConcurrentMap.class);
+  @Before
+  public void setUp() throws Exception{
+    clusterService = mock(IClusterGlobalServices.class);
+    routingTable = new RoutingTableImpl<RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>, String>();
+    mockGlobalRpcCache = new ConcurrentHashMap<>();
+    mockRpcCache = new ConcurrentHashMap<>();
+    createRoutingTableCache();
+  }
 
+  @After
+  public void tearDown(){
+    reset(clusterService);
+    mockGlobalRpcCache = null;
+    mockRpcCache = null;
+  }
 
-    @Test
-    public void testAddGlobalRoute() throws Exception {
-        ConcurrentMap concurrentMap = createRoutingTableCache();
+  @Test
+  public void addGlobalRoute_ValidArguments_ShouldAdd() throws Exception {
 
-        Assert.assertNotNull(concurrentMap);
-        RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier =  mock(RpcRouter.RouteIdentifier.class);
-        InstanceIdentifier identifier = mock(InstanceIdentifier.class);
-        when(routeIdentifier.getType()).thenReturn(QNAME);
-        when(routeIdentifier.getRoute()).thenReturn(identifier);
+    Assert.assertNotNull(mockGlobalRpcCache);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
 
-        rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+    final String expectedRoute = "172.27.12.1:5000";
+    routingTable.addGlobalRoute(routeIdentifier, expectedRoute);
 
-        Set<String> globalService = new HashSet<String>();
-        globalService.add("172.27.12.1:5000");
+    ConcurrentMap latestCache = routingTable.getGlobalRpcCache();
+    Assert.assertEquals(mockGlobalRpcCache, latestCache);
+    Assert.assertEquals(expectedRoute, latestCache.get(routeIdentifier));
+  }
 
-        when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
-        ConcurrentMap latestCache = rti.getRoutingTableCache();
+  @Test (expected = RoutingTable.DuplicateRouteException.class)
+  public void addGlobalRoute_DuplicateRoute_ShouldThrow() throws Exception{
 
-        Assert.assertEquals(concurrentMap,latestCache);
+    Assert.assertNotNull(mockGlobalRpcCache);
 
-        Set<String> servicesGlobal = (Set<String>)latestCache.get(routeIdentifier);
-        Assert.assertEquals(servicesGlobal.size(),1);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
+    routingTable.addGlobalRoute(routeIdentifier, new String());
+    routingTable.addGlobalRoute(routeIdentifier, new String());
+  }
 
-        Assert.assertEquals(servicesGlobal.iterator().next(),"172.27.12.1:5000");
+  @Test
+  public void getGlobalRoute_ExistingRouteId_ShouldReturnRoute() throws Exception {
 
-    }
+    Assert.assertNotNull(mockGlobalRpcCache);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
+    String expectedRoute = "172.27.12.1:5000";
 
-    @Test
-    public void testGetRoutes() throws Exception {
-        ConcurrentMap concurrentMap = createRoutingTableCache();
+    routingTable.addGlobalRoute(routeIdentifier, expectedRoute);
 
-        Assert.assertNotNull(concurrentMap);
-        RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier =  mock(RpcRouter.RouteIdentifier.class);
-        InstanceIdentifier identifier = mock(InstanceIdentifier.class);
-        when(routeIdentifier.getContext()).thenReturn(QNAME);
-        when(routeIdentifier.getRoute()).thenReturn(identifier);
+    String actualRoute = (String) routingTable.getGlobalRoute(routeIdentifier);
+    Assert.assertEquals(expectedRoute, actualRoute);
+  }
 
-        rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+  @Test
+  public void getGlobalRoute_NonExistentRouteId_ShouldReturnNull() throws Exception {
 
-        String globalService =   "172.27.12.1:5000";
+    Assert.assertNotNull(mockGlobalRpcCache);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
 
-        when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
-        ConcurrentMap latestCache = rti.getRoutingTableCache();
+    String actualRoute = (String) routingTable.getGlobalRoute(routeIdentifier);
+    Assert.assertNull(actualRoute);
+  }
 
-        Assert.assertEquals(concurrentMap,latestCache);
+  @Test
+  public void removeGlobalRoute_ExistingRouteId_ShouldRemove() throws Exception {
 
-        Set<String> servicesGlobal =  rti.getRoutes(routeIdentifier);
+    Assert.assertNotNull(mockGlobalRpcCache);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
 
+    ConcurrentMap cache = routingTable.getGlobalRpcCache();
+    Assert.assertTrue(cache.size() == 0);
+    routingTable.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+    Assert.assertTrue(cache.size() == 1);
 
-        Assert.assertEquals(servicesGlobal.size(),1);
-        Iterator<String> iterator = servicesGlobal.iterator();
-        while(iterator.hasNext()){
-        Assert.assertEquals(iterator.next(),"172.27.12.1:5000");
-        }
+    routingTable.removeGlobalRoute(routeIdentifier);
+    Assert.assertTrue(cache.size() == 0);
 
+  }
 
-    }
-    @Test
-    public void testRegisterRouteChangeListener() throws Exception {
-        Assert.assertEquals(rti.getRegisteredRouteChangeListeners().size(),0);
-        rti.registerRouteChangeListener(new RouteChangeListenerImpl());
+  @Test
+  public void removeGlobalRoute_NonExistentRouteId_ShouldDoNothing() throws Exception {
 
-        Assert.assertEquals(rti.getRegisteredRouteChangeListeners().size(),0); //old should not work
-        //what about the new approach - using whiteboard pattern
-        rti.setRouteChangeListener(new RouteChangeListenerImpl());
+    Assert.assertNotNull(mockGlobalRpcCache);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = getRouteIdentifier();
 
-        Assert.assertEquals(rti.getRegisteredRouteChangeListeners().size(),1); //should not work
+    ConcurrentMap cache = routingTable.getGlobalRpcCache();
+    Assert.assertTrue(cache.size() == 0);
 
+    routingTable.removeGlobalRoute(routeIdentifier);
+    Assert.assertTrue(cache.size() == 0);
 
-    }
-    @Test
-    public void testRemoveGlobalRoute()throws Exception {
+  }
 
-        ConcurrentMap concurrentMap = createRoutingTableCache();
+  @Test
+  public void addRoute_ForNewRouteId_ShouldAddRoute() throws Exception {
+    Assert.assertTrue(mockRpcCache.size() == 0);
 
-        Assert.assertNotNull(concurrentMap);
-        RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier =  mock(RpcRouter.RouteIdentifier.class);
-        InstanceIdentifier identifier = mock(InstanceIdentifier.class);
-        when(routeIdentifier.getContext()).thenReturn(QNAME);
-        when(routeIdentifier.getRoute()).thenReturn(identifier);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeId = getRouteIdentifier();
 
-        rti.addGlobalRoute(routeIdentifier, "172.27.12.1:5000");
+    routingTable.addRoute(routeId, new String());
+    Assert.assertTrue(mockRpcCache.size() == 1);
 
-        String globalService =   "172.27.12.1:5000";
+    Set<String> routes = routingTable.getRoutes(routeId);
+    Assert.assertEquals(1, routes.size());
+  }
 
-        when(concurrentMap.get(routeIdentifier)).thenReturn(globalService);
-        ConcurrentMap latestCache = rti.getRoutingTableCache();
+  @Test
+  public void addRoute_ForExistingRouteId_ShouldAppendRoute() throws Exception {
 
-        Assert.assertEquals(concurrentMap,latestCache);
+    Assert.assertTrue(mockRpcCache.size() == 0);
 
-        Set<String> servicesGlobal =  rti.getRoutes(routeIdentifier);
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeId = getRouteIdentifier();
 
+    String route_1 = "10.0.0.1:5955";
+    String route_2 = "10.0.0.2:5955";
 
-        Assert.assertEquals(servicesGlobal.size(),1);
+    routingTable.addRoute(routeId, route_1);
+    routingTable.addRoute(routeId, route_2);
 
-        Assert.assertEquals(servicesGlobal.iterator().next(),"172.27.12.1:5000");
+    Assert.assertTrue(mockRpcCache.size() == 1);
 
-        rti.removeGlobalRoute(routeIdentifier);
+    Set<String> routes = routingTable.getRoutes(routeId);
+    Assert.assertEquals(2, routes.size());
+    Assert.assertTrue(routes.contains(route_1));
+    Assert.assertTrue(routes.contains(route_2));
+  }
 
-        Assert.assertNotNull(rti.getRoutes(routeIdentifier));
+  @Test
+  public void addRoute_UsingMultipleThreads_ShouldNotOverwrite(){
+    ExecutorService threadPool = Executors.newCachedThreadPool();
 
+    int numOfRoutesToAdd = 100;
+    String routePrefix_1   = "10.0.0.1:555";
+    RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+    threadPool.submit(addRoutes(numOfRoutesToAdd, routePrefix_1, routeId));
+    String routePrefix_2   = "10.0.0.1:556";
+    threadPool.submit(addRoutes(numOfRoutesToAdd, routePrefix_2, routeId));
 
+    // wait for all tasks to complete; timeout in 10 sec
+    threadPool.shutdown();
+    try {
+      threadPool.awaitTermination(10, TimeUnit.SECONDS); //
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
 
-    private ConcurrentMap createRoutingTableCache() throws Exception {
+    Assert.assertEquals(2*numOfRoutesToAdd, routingTable.getRoutes(routeId).size());
+  }
 
-        //here init
-        Component c = mock(Component.class);
+  @Test(expected = NullPointerException.class)
+  public void addRoute_NullRouteId_shouldThrowNpe() throws Exception {
 
-        when(ics.existCache(
-                RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE)).thenReturn(false);
+    routingTable.addRoute(null, new String());
+  }
 
-        when(ics.createCache(RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL))).thenReturn(concurrentMapMock);
-         rti.setClusterGlobalServices(this.ics);
-        rti.init(c);
+  @Test(expected = NullPointerException.class)
+  public void addRoute_NullRoute_shouldThrowNpe() throws Exception{
 
-        Assert.assertEquals(concurrentMapMock,rti.getRoutingTableCache() );
-        return concurrentMapMock;
+    routingTable.addRoute(getRouteIdentifier(), null);
+  }
 
-    }
+  @Test (expected = UnsupportedOperationException.class)
+  public void getRoutes_Call_ShouldReturnImmutableCopy() throws Exception{
+    Assert.assertNotNull(routingTable);
+    RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+    routingTable.addRoute(routeId, new String());
 
+    Set<String> routes = routingTable.getRoutes(routeId); //returns Immutable Set
 
-    @Test
-    public void testCreateRoutingTableCacheReturnExistingCache() throws Exception {
-        ConcurrentMap concurrentMap = createRoutingTableCache();
+    routes.add(new String()); //can not be modified; should throw
+  }
 
-        //OK here we should try creating again the cache but this time it should return the existing one
-        when(ics.existCache(
-                RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE)).thenReturn(true);
+  @Test
+  public void getRoutes_With2RoutesFor1RouteId_ShouldReturnASetWithSize2() throws Exception{
+    Assert.assertNotNull(routingTable);
+    RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+    routingTable.addRoute(routeId, "10.0.0.1:5555");
+    routingTable.addRoute(routeId, "10.0.0.2:5555");
 
-        when(ics.getCache(
-                RoutingTableImpl.ROUTING_TABLE_GLOBAL_CACHE)).thenReturn(concurrentMap);
+    Set<String> routes = routingTable.getRoutes(routeId); //returns Immutable Set
 
+    Assert.assertEquals(2, routes.size());
+  }
 
-        //here init
-        Component c = mock(Component.class);
+  @Test
+  public void getLastAddedRoute_WhenMultipleRoutesExists_ShouldReturnLatestRoute()
+    throws Exception {
 
-        rti.init(c);
+    Assert.assertNotNull(routingTable);
+    RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+    String route_1 = "10.0.0.1:5555";
+    String route_2 = "10.0.0.2:5555";
+    routingTable.addRoute(routeId, route_1);
+    routingTable.addRoute(routeId, route_2);
 
-        Assert.assertEquals(concurrentMap,rti.getRoutingTableCache());
+    Assert.assertEquals(route_2, routingTable.getLastAddedRoute(routeId));
+  }
 
+  @Test
+  public void removeRoute_WhenMultipleRoutesExist_RemovesGivenRoute() throws Exception{
+    Assert.assertNotNull(routingTable);
+    RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+    String route_1 = "10.0.0.1:5555";
+    String route_2 = "10.0.0.2:5555";
 
+    routingTable.addRoute(routeId, route_1);
+    routingTable.addRoute(routeId, route_2);
 
+    Assert.assertEquals(2, routingTable.getRoutes(routeId).size());
 
+    routingTable.removeRoute(routeId, route_1);
+    Assert.assertEquals(1, routingTable.getRoutes(routeId).size());
 
-    }
+  }
 
-    private class RouteChangeListenerImpl<I,R> implements RouteChangeListener<I,R>{
+  @Test
+  public void removeRoute_WhenOnlyOneRouteExists_RemovesRouteId() throws Exception{
+    Assert.assertNotNull(routingTable);
+    RpcRouter.RouteIdentifier routeId = getRouteIdentifier();
+    String route_1 = "10.0.0.1:5555";
 
-        @Override
-        public void onRouteUpdated(I key, R new_value) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
+    routingTable.addRoute(routeId, route_1);
+    Assert.assertEquals(1, routingTable.getRoutes(routeId).size());
 
-        @Override
-        public void onRouteDeleted(I key) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-    }
+    routingTable.removeRoute(routeId, route_1);
+    ConcurrentMap cache = routingTable.getRpcCache();
+    Assert.assertFalse(cache.containsKey(routeId));
+
+  }
 
+  /*
+   * Private helper methods
+   */
+  private void createRoutingTableCache() throws Exception {
+
+    //here init
+    Component c = mock(Component.class);
+
+    when(clusterService.existCache(
+        RoutingTableImpl.GLOBALRPC_CACHE)).thenReturn(false);
+
+    when(clusterService.createCache(RoutingTableImpl.GLOBALRPC_CACHE,
+        EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL))).
+        thenReturn(mockGlobalRpcCache);
+
+    when(clusterService.existCache(
+        RoutingTableImpl.RPC_CACHE)).thenReturn(false);
+
+    when(clusterService.createCache(RoutingTableImpl.RPC_CACHE,
+        EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL))).
+        thenReturn(mockRpcCache);
+
+    doNothing().when(clusterService).tbegin();
+    doNothing().when(clusterService).tcommit();
+
+    routingTable.setClusterGlobalServices(this.clusterService);
+    routingTable.init(c);
+
+    Assert.assertEquals(mockGlobalRpcCache, routingTable.getGlobalRpcCache());
+    Assert.assertEquals(mockRpcCache, routingTable.getRpcCache());
+  }
+
+  private RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> getRouteIdentifier(){
+    RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier> routeIdentifier = mock(RpcRouter.RouteIdentifier.class);
+    InstanceIdentifier identifier = mock(InstanceIdentifier.class);
+    when(routeIdentifier.getType()).thenReturn(QNAME);
+    when(routeIdentifier.getRoute()).thenReturn(identifier);
+
+    return routeIdentifier;
+  }
+
+  private Runnable addRoutes(final int numRoutes, final String routePrefix, final RpcRouter.RouteIdentifier routeId){
+    return new Runnable() {
+      @Override
+      public void run() {
+        for (int i=0;i<numRoutes;i++){
+          String route = routePrefix + i;
+          try {
+            routingTable.addRoute(routeId, route);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    };
+  }
 }
diff --git a/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java b/opendaylight/md-sal/sal-dom-api/src/main/java/org/opendaylight/controller/sal/core/api/RoutedRpcDefaultImplementation.java
new file mode 100644 (file)
index 0000000..c8eb7fd
--- /dev/null
@@ -0,0 +1,12 @@
+package org.opendaylight.controller.sal.core.api;
+
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.common.RpcResult;
+import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+
+public interface RoutedRpcDefaultImplementation {
+
+  public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input);
+
+}
index 8a9d1678657c6b3555ba23d4a3c536cae5d3d30d..f43dcd6b43565692136eda0eecac5354b29681ac 100644 (file)
@@ -42,4 +42,13 @@ public interface RpcProvisionRegistry extends RpcImplementation, BrokerService,
     ListenerRegistration<RpcRegistrationListener> addRpcRegistrationListener(RpcRegistrationListener listener);
 
     RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation);
+
+  /**
+   * Sets this RoutedRpc Implementation as a delegate rpc provider and will be asked to invoke rpc if the
+   * current provider can't service the rpc request
+   *
+   * @param defaultImplementation
+   *              Provider's implementation of RPC functionality
+   */
+    public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation);
 }
index 3bbdab2c0722d656aba74035186faa4bc0d843aa..64de8683d1d1a8a5c3854aa5f55e6b907852c448 100644 (file)
@@ -14,21 +14,22 @@ import java.util.concurrent.Callable
 import java.util.concurrent.ExecutorService
 import java.util.concurrent.Executors
 import java.util.concurrent.Future
-import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.sal.core.api.Broker
 import org.opendaylight.controller.sal.core.api.Consumer
 import org.opendaylight.controller.sal.core.api.Provider
-import org.opendaylight.controller.sal.core.api.RpcImplementation
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext
-import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
 import org.opendaylight.yangtools.yang.common.QName
 import org.opendaylight.yangtools.yang.common.RpcResult
 import org.opendaylight.yangtools.yang.data.api.CompositeNode
-import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
 import org.osgi.framework.BundleContext
 import org.slf4j.LoggerFactory
+import org.opendaylight.controller.sal.dom.broker.spi.RpcRouter
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry
+import org.opendaylight.controller.sal.core.api.RpcImplementation
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation
 
 public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     private static val log = LoggerFactory.getLogger(BrokerImpl);
@@ -122,7 +123,11 @@ public class BrokerImpl implements Broker, RpcProvisionRegistry, AutoCloseable {
     override addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         router.addRoutedRpcImplementation(rpcType,implementation);
     }
-    
+
+    override setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+        router.setRoutedRpcDefaultDelegate(defaultImplementation);
+    }
+
     override addRpcRegistrationListener(RpcRegistrationListener listener) {
         return router.addRpcRegistrationListener(listener);
     }
index a8bdddb5108d3ab0242024b7c3db4df3a1d1694a..5d93f4ee4d162b22a35c610d4f79b35e7cf66fb6 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.sal.common.DataStoreIdentifier;
 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
@@ -104,6 +105,11 @@ public class MountPointImpl implements MountProvisionInstance, SchemaContextProv
     }
 
     @Override
+    public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+      rpcs.setRoutedRpcDefaultDelegate(defaultImplementation);
+    }
+
+  @Override
     public RpcRegistration addRpcImplementation(QName rpcType, RpcImplementation implementation)
             throws IllegalArgumentException {
         return rpcs.addRpcImplementation(rpcType, implementation);
index 28d5ae914fc686a8cd41032755b77db529f9a85b..22319abb17df7d1dec301105a59c50fad2cb1164 100644 (file)
@@ -19,6 +19,7 @@ import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener
 import org.opendaylight.controller.md.sal.common.impl.routing.RoutingUtils;
 import org.opendaylight.controller.sal.core.api.Broker.RoutedRpcRegistration;
 import org.opendaylight.controller.sal.core.api.Broker.RpcRegistration;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
@@ -44,7 +45,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 
-public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
+public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String>, RoutedRpcDefaultImplementation {
 
     private static final Logger LOG = LoggerFactory.getLogger(SchemaAwareRpcBroker.class);
 
@@ -58,6 +59,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
     private final ConcurrentMap<QName, RpcImplementation> implementations = new ConcurrentHashMap<>();
     private RpcImplementation defaultImplementation;
     private SchemaContextProvider schemaProvider;
+    private RoutedRpcDefaultImplementation defaultDelegate;
 
     public SchemaAwareRpcBroker(String identifier, SchemaContextProvider schemaProvider) {
         super();
@@ -81,7 +83,16 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
         this.schemaProvider = schemaProvider;
     }
 
+  public RoutedRpcDefaultImplementation getRoutedRpcDefaultDelegate() {
+    return defaultDelegate;
+  }
+
     @Override
+  public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultDelegate) {
+    this.defaultDelegate = defaultDelegate;
+  }
+
+  @Override
     public RoutedRpcRegistration addRoutedRpcImplementation(QName rpcType, RpcImplementation implementation) {
         checkArgument(rpcType != null, "RPC Type should not be null");
         checkArgument(implementation != null, "RPC Implementatoin should not be null");
@@ -221,6 +232,12 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
         return ret;
     }
 
+    @Override
+    public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+      checkState(defaultDelegate != null);
+      return defaultDelegate.invokeRpc(rpc, identifier, input);
+    }
+
     private static abstract class RoutingStrategy implements Identifiable<QName> {
 
         private final QName identifier;
@@ -304,6 +321,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
             SimpleNode<?> routeContainer = inputContainer.getFirstSimpleByName(strategy.getLeaf());
             checkArgument(routeContainer != null, "Leaf %s must be set with value", strategy.getLeaf());
             Object route = routeContainer.getValue();
+            checkArgument(route instanceof InstanceIdentifier);
             RpcImplementation potential = null;
             if (route != null) {
                 RoutedRpcRegImpl potentialReg = implementations.get(route);
@@ -312,7 +330,7 @@ public class SchemaAwareRpcBroker implements RpcRouter, Identifiable<String> {
                 }
             }
             if (potential == null) {
-                potential = defaultDelegate;
+                return router.invokeRpc(rpc, (InstanceIdentifier) route, input);
             }
             checkState(potential != null, "No implementation is available for rpc:%s path:%s", rpc, route);
             return potential.invokeRpc(rpc, input);
index e218a957826f110bd36d3acb3fe059fc69b5092a..40842c004a2779ede0d231b2fef122920bc4ebc9 100644 (file)
@@ -9,11 +9,7 @@
 package org.opendaylight.controller.sal.dom.broker.osgi;
 
 import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
-import org.opendaylight.controller.sal.core.api.Broker;
-import org.opendaylight.controller.sal.core.api.RpcImplementation;
-import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
-import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
-import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
+import org.opendaylight.controller.sal.core.api.*;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
@@ -45,7 +41,12 @@ public class RpcProvisionRegistryProxy extends AbstractBrokerServiceProxy<RpcPro
         return getDelegate().addRoutedRpcImplementation(rpcType, implementation);
     }
 
-    @Override
+  @Override
+  public void setRoutedRpcDefaultDelegate(RoutedRpcDefaultImplementation defaultImplementation) {
+    getDelegate().setRoutedRpcDefaultDelegate(defaultImplementation);
+  }
+
+  @Override
     public <L extends RouteChangeListener<RpcRoutingContext, InstanceIdentifier>> ListenerRegistration<L> registerRouteChangeListener(L listener) {
         return getDelegate().registerRouteChangeListener(listener);
     }
index 95bb62f93b331257413f2273bcba32ce22fdc7e9..d874381ab374eb2d32a10e09bf959dc810080c0d 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.config.yang.md.sal.remote.rpc;
 
 import org.opendaylight.controller.sal.connector.remoterpc.*;
 import org.opendaylight.controller.sal.core.api.Broker;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.osgi.framework.BundleContext;
 
 /**
@@ -46,17 +47,18 @@ public final class ZeroMQServerModule extends org.opendaylight.controller.config
         
         ClientImpl clientImpl = new ClientImpl();
 
-        RoutingTableProvider provider = new RoutingTableProvider(bundleContext,serverImpl);
+    RoutingTableProvider provider = new RoutingTableProvider(bundleContext);//,serverImpl);
 
-        RemoteRpcProvider facade = new RemoteRpcProvider(serverImpl, clientImpl);
-        
-        facade.setRoutingTableProvider(provider );
-        
-        broker.registerProvider(facade, bundleContext);
-        return facade;
-    }
 
-    public void setBundleContext(BundleContext bundleContext) {
-        this.bundleContext = bundleContext;
-    }
+    facade.setRoutingTableProvider(provider );
+    facade.setContext(bundleContext);
+    facade.setRpcProvisionRegistry((RpcProvisionRegistry) broker);
+
+    broker.registerProvider(facade, bundleContext);
+    return facade;
+  }
+
+  public void setBundleContext(BundleContext bundleContext) {
+    this.bundleContext = bundleContext;
+  }
 }
index 291fe0b8e73a5a52e65f1d6e9be4fcca31290a50..30e11c0806731c24ab151dd7cf585408916fd233 100644 (file)
@@ -12,6 +12,8 @@ import org.opendaylight.controller.sal.common.util.RpcErrors;
 import org.opendaylight.controller.sal.common.util.Rpcs;
 import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.Message;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
 import org.opendaylight.controller.sal.connector.remoterpc.util.XmlUtils;
@@ -20,14 +22,13 @@ import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcError;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.zeromq.ZMQ;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
@@ -66,12 +67,6 @@ public class ClientImpl implements RemoteRpcClient {
     this.routingTableProvider = routingTableProvider;
   }
 
- @Override
-  public Set<QName> getSupportedRpcs(){
-    //TODO: Find the entries from routing table
-    return Collections.emptySet();
-  }
-
   @Override
   public void start() {/*NOOPS*/}
 
@@ -97,14 +92,40 @@ public class ClientImpl implements RemoteRpcClient {
    * @param input payload for the remote service
    * @return
    */
-  @Override
   public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+    RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+    routeId.setType(rpc);
+
+    String address = lookupRemoteAddressForGlobalRpc(routeId);
+    return sendMessage(input, routeId, address);
+  }
+
+  /**
+   * Finds remote server that can execute this routed rpc and sends a message to it
+   * requesting execution.
+   * The call blocks until a response from remote server is received. Its upto
+   * the client of this API to implement a timeout functionality.
+   *
+   * @param rpc
+   *          rpc to be called
+   * @param identifier
+   *          instance identifier on which rpc is to be executed
+   * @param input
+   *          payload
+   * @return
+   */
+  public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
 
     RouteIdentifierImpl routeId = new RouteIdentifierImpl();
     routeId.setType(rpc);
+    routeId.setRoute(identifier);
+
+    String address = lookupRemoteAddressForRpc(routeId);
 
-    String address = lookupRemoteAddress(routeId);
+    return sendMessage(input, routeId, address);
+  }
 
+  private RpcResult<CompositeNode> sendMessage(CompositeNode input, RouteIdentifierImpl routeId, String address) {
     Message request = new Message.MessageBuilder()
         .type(Message.MessageType.REQUEST)
         .sender(Context.getInstance().getLocalUri())
@@ -128,7 +149,6 @@ public class ClientImpl implements RemoteRpcClient {
       collectErrors(e, errors);
       return Rpcs.getRpcResult(false, null, errors);
     }
-
   }
 
   /**
@@ -136,19 +156,36 @@ public class ClientImpl implements RemoteRpcClient {
    * @param  routeId route identifier
    * @return         remote network address
    */
-  private String lookupRemoteAddress(RpcRouter.RouteIdentifier routeId){
+  private String lookupRemoteAddressForGlobalRpc(RpcRouter.RouteIdentifier routeId){
     checkNotNull(routeId, "route must not be null");
 
-    Optional<RoutingTable<String, String>> routingTable = routingTableProvider.getRoutingTable();
+    Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
     checkNotNull(routingTable.isPresent(), "Routing table is null");
 
-    Set<String> addresses = routingTable.get().getRoutes(routeId.toString());
-    checkNotNull(addresses, "Address not found for route [%s]", routeId);
-    checkState(addresses.size() == 1,
-        "Multiple remote addresses found for route [%s], \nonly 1 expected", routeId); //its a global service.
+    String address = null;
+    try {
+      address = routingTable.get().getGlobalRoute(routeId);
+    } catch (RoutingTableException|SystemException e) {
+      _logger.error("Exception caught while looking up remote address " + e);
+    }
+    checkState(address != null, "Address not found for route [%s]", routeId);
+
+    return address;
+  }
+
+  /**
+   * Find address for the given route identifier in routing table
+   * @param  routeId route identifier
+   * @return         remote network address
+   */
+  private String lookupRemoteAddressForRpc(RpcRouter.RouteIdentifier routeId){
+    checkNotNull(routeId, "route must not be null");
+
+    Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable = routingTableProvider.getRoutingTable();
+    checkNotNull(routingTable.isPresent(), "Routing table is null");
 
-    String address = addresses.iterator().next();
-    checkNotNull(address, "Address not found for route [%s]", routeId);
+    String address = routingTable.get().getLastAddedRoute(routeId);
+    checkState(address != null, "Address not found for route [%s]", routeId);
 
     return address;
   }
index 1f78a6771a0b6deb1f4d208db11ecb2275a04a73..a564a0ad045a533818135b2921c173e684a603b1 100644 (file)
@@ -9,9 +9,9 @@
 package org.opendaylight.controller.sal.connector.remoterpc;
 
 import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
 
-public interface RemoteRpcClient extends RpcImplementation,AutoCloseable{
-
+public interface RemoteRpcClient extends AutoCloseable{
 
     void setRoutingTableProvider(RoutingTableProvider provider);
     
index bf205fc38d54a57ebc2ac351a3436f1d0215dbd9..639e31ddc3ec0e8380b5108efbe8f4afa92ec513 100644 (file)
 
 package org.opendaylight.controller.sal.connector.remoterpc;
 
+import com.google.common.base.Optional;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChange;
+import org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
+import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
 import org.opendaylight.controller.sal.core.api.Provider;
+import org.opendaylight.controller.sal.core.api.RoutedRpcDefaultImplementation;
+import org.opendaylight.controller.sal.core.api.RpcImplementation;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
+import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
+import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.common.RpcResult;
 import org.opendaylight.yangtools.yang.data.api.CompositeNode;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.osgi.framework.BundleContext;
+import org.osgi.util.tracker.ServiceTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
-public class RemoteRpcProvider implements 
-    RemoteRpcServer,
-    RemoteRpcClient,
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class RemoteRpcProvider implements
+    RpcImplementation,
+    RoutedRpcDefaultImplementation,
+    AutoCloseable,
     Provider {
 
-    private final ServerImpl server;
-    private final ClientImpl client;
-    private RoutingTableProvider provider;
+  private Logger _logger = LoggerFactory.getLogger(RemoteRpcProvider.class);
 
-    @Override
-    public void setRoutingTableProvider(RoutingTableProvider provider) {
-        this.provider = provider;
-        server.setRoutingTableProvider(provider);
-        client.setRoutingTableProvider(provider);
+  private final ServerImpl server;
+  private final ClientImpl client;
+  private RoutingTableProvider routingTableProvider;
+  private final RpcListener listener = new RpcListener();
+  private final RoutedRpcListener routeChangeListener = new RoutedRpcListener();
+  private ProviderSession brokerSession;
+  private RpcProvisionRegistry rpcProvisionRegistry;
+  private BundleContext context;
+  private ServiceTracker clusterTracker;
+
+  public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
+    this.server = server;
+    this.client = client;
+  }
+
+  public void setRoutingTableProvider(RoutingTableProvider provider) {
+    this.routingTableProvider = provider;
+    client.setRoutingTableProvider(provider);
+  }
+
+  public void setContext(BundleContext context){
+    this.context = context;
+  }
+
+  public void setRpcProvisionRegistry(RpcProvisionRegistry rpcProvisionRegistry){
+    this.rpcProvisionRegistry = rpcProvisionRegistry;
+  }
+
+  @Override
+  public void onSessionInitiated(ProviderSession session) {
+    brokerSession = session;
+    server.setBrokerSession(session);
+    start();
+  }
+
+  @Override
+  public Set<QName> getSupportedRpcs() {
+    //TODO: Ask Tony if we need to get this from routing table
+    return Collections.emptySet();
+  }
+
+  @Override
+  public Collection<ProviderFunctionality> getProviderFunctionality() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
+    return client.invokeRpc(rpc, input);
+  }
+
+  @Override
+  public RpcResult<CompositeNode> invokeRpc(QName rpc, InstanceIdentifier identifier, CompositeNode input) {
+    return client.invokeRpc(rpc, identifier, input);
+  }
+
+  public void start() {
+    server.start();
+    client.start();
+    brokerSession.addRpcRegistrationListener(listener);
+    rpcProvisionRegistry.setRoutedRpcDefaultDelegate(this);
+    rpcProvisionRegistry.registerRouteChangeListener(routeChangeListener);
+
+    announceSupportedRpcs();
+    announceSupportedRoutedRpcs();
+  }
+
+  @Override
+  public void close() throws Exception {
+    unregisterSupportedRpcs();
+    unregisterSupportedRoutedRpcs();
+    server.close();
+    client.close();
+  }
+
+  public void stop() {
+    server.stop();
+    client.stop();
+  }
+
+  /**
+   * Add all the locally registered RPCs in the clustered routing table
+   */
+  private void announceSupportedRpcs(){
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    for (QName rpc : currentlySupported) {
+      listener.onRpcImplementationAdded(rpc);
     }
-    
-    @Override
-    public RpcResult<CompositeNode> invokeRpc(QName rpc, CompositeNode input) {
-        return client.invokeRpc(rpc, input);
+  }
+
+  /**
+   * Add all the locally registered Routed RPCs in the clustered routing table
+   */
+  private void announceSupportedRoutedRpcs(){
+
+    //TODO: announce all routed RPCs as well
+
+  }
+
+  /**
+   * Un-Register all the supported RPCs from clustered routing table
+   */
+  private void unregisterSupportedRpcs(){
+    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
+    //TODO: remove all routed RPCs as well
+    for (QName rpc : currentlySupported) {
+      listener.onRpcImplementationRemoved(rpc);
     }
-    
+  }
+
+  /**
+   * Un-Register all the locally supported Routed RPCs from clustered routing table
+   */
+  private void unregisterSupportedRoutedRpcs(){
+
+    //TODO: remove all routed RPCs as well
+
+  }
+
+  private RoutingTable<RpcRouter.RouteIdentifier, String> getRoutingTable(){
+    Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> routingTable =
+        routingTableProvider.getRoutingTable();
+
+    checkNotNull(routingTable.isPresent(), "Routing table is null");
+
+    return routingTable.get();
+  }
+
+  /**
+   * Listener for rpc registrations in broker
+   */
+  private class RpcListener implements RpcRegistrationListener {
+
     @Override
-    public Set<QName> getSupportedRpcs() {
-        return client.getSupportedRpcs();
-    }
-    
-    
-    public RemoteRpcProvider(ServerImpl server, ClientImpl client) {
-        this.server = server;
-        this.client = client;
-    }
-    
-    public void setBrokerSession(ProviderSession session) {
-        server.setBrokerSession(session);
-    }
-//    public void setServerPool(ExecutorService serverPool) {
-//        server.setServerPool(serverPool);
-//    }
-    public void start() {
-        //when listener was being invoked and addRPCImplementation was being
-        //called the client was null.
-        server.setClient(client);
-        server.start();
-        client.start();
+    public void onRpcImplementationAdded(QName rpc) {
+
+      _logger.debug("Adding registration for [{}]", rpc);
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(rpc);
 
+      RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
 
+      try {
+        routingTable.addGlobalRoute(routeId, server.getServerAddress());
+        _logger.debug("Route added [{}-{}]", routeId, server.getServerAddress());
+
+      } catch (RoutingTableException | SystemException e) {
+        //TODO: This can be thrown when route already exists in the table. Broker
+        //needs to handle this.
+        _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
+
+      }
     }
 
-    
     @Override
-    public Collection<ProviderFunctionality> getProviderFunctionality() {
-        // TODO Auto-generated method stub
-        return null;
+    public void onRpcImplementationRemoved(QName rpc) {
+
+      _logger.debug("Removing registration for [{}]", rpc);
+      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
+      routeId.setType(rpc);
+
+      RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
+
+      try {
+        routingTable.removeGlobalRoute(routeId);
+      } catch (RoutingTableException | SystemException e) {
+        _logger.error("Route delete failed {}", e);
+      }
     }
-    
-    
+  }
+
+  /**
+   * Listener for Routed Rpc registrations in broker
+   */
+  private class RoutedRpcListener
+      implements RouteChangeListener<RpcRoutingContext, InstanceIdentifier> {
+
+    /**
+     *
+     * @param routeChange
+     */
     @Override
-    public void onSessionInitiated(ProviderSession session) {
-        server.setBrokerSession(session);
-        start();
+    public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
+      Map<RpcRoutingContext, Set<InstanceIdentifier>> announcements = routeChange.getAnnouncements();
+      announce(getRouteIdentifiers(announcements));
+
+      Map<RpcRoutingContext, Set<InstanceIdentifier>> removals = routeChange.getRemovals();
+      remove(getRouteIdentifiers(removals));
+    }
+
+    /**
+     *
+     * @param announcements
+     */
+    private void announce(Set<RpcRouter.RouteIdentifier> announcements) {
+      _logger.debug("Announcing [{}]", announcements);
+      RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
+      try {
+        routingTable.addRoutes(announcements, server.getServerAddress());
+      } catch (RoutingTableException | SystemException e) {
+        _logger.error("Route announcement failed {}", e);
+      }
     }
-    
-    
-    public void close() throws Exception {
-        server.close();
-        client.close();
+
+    /**
+     *
+     * @param removals
+     */
+    private void remove(Set<RpcRouter.RouteIdentifier> removals){
+      _logger.debug("Removing [{}]", removals);
+      RoutingTable<RpcRouter.RouteIdentifier, String> routingTable = getRoutingTable();
+      try {
+        routingTable.removeRoutes(removals, server.getServerAddress());
+      } catch (RoutingTableException | SystemException e) {
+        _logger.error("Route removal failed {}", e);
+      }
     }
 
-    @Override
-    public void stop() {
-        server.stop();
-        client.stop();
+    /**
+     *
+     * @param changes
+     * @return
+     */
+    private Set<RpcRouter.RouteIdentifier> getRouteIdentifiers(Map<RpcRoutingContext, Set<InstanceIdentifier>> changes) {
+      RouteIdentifierImpl routeId = null;
+      Set<RpcRouter.RouteIdentifier> routeIdSet = new HashSet<RpcRouter.RouteIdentifier>();
+
+      for (RpcRoutingContext context : changes.keySet()){
+        routeId = new RouteIdentifierImpl();
+        routeId.setType(context.getRpc());
+        routeId.setContext(context.getContext());
+
+        for (InstanceIdentifier instanceId : changes.get(context)){
+          routeId.setRoute(instanceId);
+          routeIdSet.add(routeId);
+        }
+      }
+      return routeIdSet;
     }
+
+
+
+  }
+
 }
index f62c26e0fdaad757112f4e80e860dcd97a464552..71bab288e6ce3c38c555a036cf3a6b05c35af4de 100644 (file)
@@ -9,8 +9,10 @@
 package org.opendaylight.controller.sal.connector.remoterpc;
 
 import com.google.common.base.Optional;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
+import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
 import org.opendaylight.controller.sal.connector.remoterpc.impl.RoutingTableImpl;
 import org.osgi.framework.BundleContext;
 import org.osgi.util.tracker.ServiceTracker;
@@ -22,26 +24,26 @@ public class RoutingTableProvider implements AutoCloseable {
 
     private RoutingTableImpl routingTableImpl = null;
 
-    final private RouteChangeListener routeChangeListener;
+    //final private RouteChangeListener routeChangeListener;
     
     
-    public RoutingTableProvider(BundleContext ctx,RouteChangeListener rcl) {
+    public RoutingTableProvider(BundleContext ctx){//,RouteChangeListener rcl) {
         @SuppressWarnings("rawtypes")
         ServiceTracker<RoutingTable, RoutingTable> rawTracker = new ServiceTracker<>(ctx, RoutingTable.class, null);
         tracker = rawTracker;
         tracker.open();
 
-        routeChangeListener = rcl;
+        //routeChangeListener = rcl;
     }
     
-    public Optional<RoutingTable<String, String>> getRoutingTable() {
+    public Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> getRoutingTable() {
         @SuppressWarnings("unchecked")
-        RoutingTable<String,String> tracked = tracker.getService();
+        RoutingTable<RpcRouter.RouteIdentifier,String> tracked = tracker.getService();
 
         if(tracked instanceof RoutingTableImpl){
             if(routingTableImpl != tracked){
              routingTableImpl= (RoutingTableImpl)tracked;
-             routingTableImpl.setRouteChangeListener(routeChangeListener);
+             //routingTableImpl.setRouteChangeListener(routeChangeListener);
             }
         }
 
index 5c14dd0c453ce8365e528506d743699edf418f3a..722ca0687904f2c7f2e5c42572fc2d16c762936b 100644 (file)
@@ -18,6 +18,7 @@ import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableExcep
 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
 import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
 import org.opendaylight.controller.sal.core.api.Broker.ProviderSession;
+import org.opendaylight.controller.sal.core.api.RpcProvisionRegistry;
 import org.opendaylight.controller.sal.core.api.RpcRegistrationListener;
 import org.opendaylight.controller.sal.core.api.RpcRoutingContext;
 import org.opendaylight.yangtools.yang.common.QName;
@@ -43,10 +44,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
 /**
- * ZeroMq based implementation of RpcRouter. It implements RouteChangeListener of RoutingTable
- * so that it gets route change notifications from routing table.
+ * ZeroMq based implementation of RpcRouter.
  */
-public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String, String> {
+public class ServerImpl implements RemoteRpcServer {
 
   private Logger _logger = LoggerFactory.getLogger(ServerImpl.class);
 
@@ -57,8 +57,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
   private ProviderSession brokerSession;
   private ZMQ.Context context;
 
-  private final RpcListener listener = new RpcListener();
-
   private final String HANDLER_INPROC_ADDRESS = "inproc://rpc-request-handler";
   private final int HANDLER_WORKER_COUNT = 2;
   private final int HWM = 200;//high water mark on sockets
@@ -67,10 +65,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
   private String serverAddress;
   private int port;
 
-  private ClientImpl client;
-
-  private  RoutingTableProvider routingTableProvider;
-
   public static enum State {
     STARTING, STARTED, STOPPED;
   }
@@ -79,22 +73,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     this.port = port;
   }
 
-  public RoutingTableProvider getRoutingTableProvider() {
-    return routingTableProvider;
-  }
-
-  public void setRoutingTableProvider(RoutingTableProvider routingTableProvider) {
-    this.routingTableProvider = routingTableProvider;
-  }
-
-  public ClientImpl getClient(){
-    return this.client;
-  }
-
-  public void setClient(ClientImpl client) {
-    this.client = client;
-  }
-
   public State getStatus() {
     return this.status;
   }
@@ -157,11 +135,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     remoteServices = new HashSet<QName>();//
     serverPool = Executors.newSingleThreadExecutor();//main server thread
     serverPool.execute(receive()); // Start listening rpc requests
-    brokerSession.addRpcRegistrationListener(listener);
-
-    announceLocalRpcs();
-
-    registerRemoteRpcs();
 
     status = State.STARTED;
     _logger.info("Remote RPC Server started [{}]", getServerAddress());
@@ -179,8 +152,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
 
     if (State.STOPPED == this.getStatus()) return; //do nothing
 
-    unregisterLocalRpcs();
-
     if (serverPool != null)
       serverPool.shutdown();
 
@@ -192,7 +163,7 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
 
   /**
    * Closes ZMQ Context. It tries to gracefully terminate the context. If
-   * termination takes more than a second, its forcefully shutdown.
+   * termination takes more than 5 seconds, its forcefully shutdown.
    */
   private void closeZmqContext() {
     ExecutorService exec = Executors.newSingleThreadExecutor();
@@ -269,81 +240,6 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     };
   }
 
-  /**
-   * Register the remote RPCs from the routing table into broker
-   */
-  private void registerRemoteRpcs(){
-    Optional<RoutingTable<String, String>> routingTableOptional = routingTableProvider.getRoutingTable();
-
-    Preconditions.checkState(routingTableOptional.isPresent(), "Routing table is absent");
-
-    Set<Map.Entry> remoteRoutes =
-            routingTableProvider.getRoutingTable().get().getAllRoutes();
-
-    //filter out all entries that contains local address
-    //we dont want to register local RPCs as remote
-    Predicate<Map.Entry> notLocalAddressFilter = new Predicate<Map.Entry>(){
-      public boolean apply(Map.Entry remoteRoute){
-        return !getServerAddress().equalsIgnoreCase((String)remoteRoute.getValue());
-      }
-    };
-
-    //filter the entries created by current node
-    Set<Map.Entry> filteredRemoteRoutes = Sets.filter(remoteRoutes, notLocalAddressFilter);
-
-    for (Map.Entry route : filteredRemoteRoutes){
-      onRouteUpdated((String) route.getKey(), "");//value is not needed by broker
-    }
-  }
-
-  /**
-   * Un-Register the local RPCs from the routing table
-   */
-  private void unregisterLocalRpcs(){
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      listener.onRpcImplementationRemoved(rpc);
-    }
-  }
-
-  /**
-   * Publish all the locally registered RPCs in the routing table
-   */
-  private void announceLocalRpcs(){
-    Set<QName> currentlySupported = brokerSession.getSupportedRpcs();
-    for (QName rpc : currentlySupported) {
-      listener.onRpcImplementationAdded(rpc);
-    }
-  }
-
-  /**
-   * @param key
-   * @param value
-   */
-  @Override
-  public void onRouteUpdated(String key, String value) {
-    RouteIdentifierImpl rId = new RouteIdentifierImpl();
-    try {
-      _logger.debug("Updating key/value {}-{}", key, value);
-      brokerSession.addRpcImplementation(
-          (QName) rId.fromString(key).getType(), client);
-
-      //TODO: Check with Tony for routed rpc
-      //brokerSession.addRoutedRpcImplementation((QName) rId.fromString(key).getRoute(), client);
-    } catch (Exception e) {
-      _logger.info("Route update failed {}", e);
-    }
-  }
-
-  /**
-   * @param key
-   */
-  @Override
-  public void onRouteDeleted(String key) {
-    //TODO: Broker session needs to be updated to support this
-    throw new UnsupportedOperationException();
-  }
-
   /**
    * Finds IPv4 address of the local VM
    * TODO: This method is non-deterministic. There may be more than one IPv4 address. Cant say which
@@ -381,74 +277,4 @@ public class ServerImpl implements RemoteRpcServer, RouteChangeListener<String,
     return null;
   }
 
-  /**
-   * Listener for rpc registrations
-   */
-  private class RpcListener implements RpcRegistrationListener {
-
-    @Override
-    public void onRpcImplementationAdded(QName name) {
-
-      //if the service name exists in the set, this notice
-      //has bounced back from the broker. It should be ignored
-      if (remoteServices.contains(name))
-        return;
-
-      _logger.debug("Adding registration for [{}]", name);
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-
-      RoutingTable<String, String> routingTable = getRoutingTable();
-
-      try {
-        routingTable.addGlobalRoute(routeId.toString(), getServerAddress());
-        _logger.debug("Route added [{}-{}]", name, getServerAddress());
-
-      } catch (RoutingTableException | SystemException e) {
-        //TODO: This can be thrown when route already exists in the table. Broker
-        //needs to handle this.
-        _logger.error("Unhandled exception while adding global route to routing table [{}]", e);
-
-      }
-    }
-
-    @Override
-    public void onRpcImplementationRemoved(QName name) {
-
-      _logger.debug("Removing registration for [{}]", name);
-      RouteIdentifierImpl routeId = new RouteIdentifierImpl();
-      routeId.setType(name);
-
-      RoutingTable<String, String> routingTable = getRoutingTable();
-
-      try {
-        routingTable.removeGlobalRoute(routeId.toString());
-      } catch (RoutingTableException | SystemException e) {
-        _logger.error("Route delete failed {}", e);
-      }
-    }
-
-    private RoutingTable<String, String> getRoutingTable(){
-      Optional<RoutingTable<String, String>> routingTable =
-          routingTableProvider.getRoutingTable();
-
-      checkNotNull(routingTable.isPresent(), "Routing table is null");
-
-      return routingTable.get();
-    }
-  }
-
-  /*
-   * Listener for Route changes in broker. Broker notifies this listener in the event
-   * of any change (add/delete). Listener then updates the routing table.
-   */
-  private class BrokerRouteChangeListener
-      implements org.opendaylight.controller.md.sal.common.api.routing.RouteChangeListener<RpcRoutingContext, InstanceIdentifier>{
-
-    @Override
-    public void onRouteChange(RouteChange<RpcRoutingContext, InstanceIdentifier> routeChange) {
-
-    }
-  }
-
 }
index 06107a8773a6213f79df91c3a2bd637e085647ff..ec6a1a94b6ecfe2a4d58504f332145ccd5c53184 100644 (file)
@@ -18,8 +18,6 @@ import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 
 public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QName, InstanceIdentifier>,Serializable {
 
-  transient ObjectMapper mapper = new ObjectMapper();
-
   private QName context;
   private QName type;
   private InstanceIdentifier route;
@@ -50,38 +48,4 @@ public class RouteIdentifierImpl implements RpcRouter.RouteIdentifier<QName, QNa
   public void setRoute(InstanceIdentifier route) {
     this.route = route;
   }
-
-  @Override
-  public String toString() {
-    try {
-      return mapper.writeValueAsString(this);
-    } catch (Throwable e) {
-      //do nothing
-    }
-
-    return super.toString();
-  }
-
-  public RpcRouter.RouteIdentifier fromString(String input)
-      throws Exception {
-
-    JsonNode root = mapper.readTree(input);
-    this.context  = parseQName(root.get("context"));
-    this.type     = parseQName(root.get("type"));
-
-    return this;
-  }
-
-  private QName parseQName(JsonNode node){
-    if (node == null) return null;
-
-    String namespace = (node.get("namespace") != null) ?
-                       node.get("namespace").asText()  : "";
-
-    String localName = (node.get("localName") != null) ?
-                       node.get("localName").asText() : "";
-
-    URI uri = URI.create(namespace);
-    return new QName(uri, localName);
-  }
 }
index d20efe50c154a054e46eb83aaf1b4c3d2520393c..beeb936c97200517fc911276e56a483e182c1b5f 100644 (file)
@@ -26,8 +26,6 @@ module odl-sal-dom-rpc-remote-cfg {
 
     identity remote-zeromq-rpc-server {
         base config:module-type;
-        config:provided-service remote-rpc-server;
-        config:provided-service remote-rpc-client;
         config:java-name-prefix ZeroMQServer;
     }
 
index f63584b7fc4531b11466e1a84f07cbf14b46eae7..c0aae2dfb56590a39ac1e90a5a413b29c2a87cdb 100644 (file)
@@ -13,6 +13,7 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.sal.common.util.Rpcs;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
@@ -43,8 +44,8 @@ public class ClientImplTest {
 
     //mock routing table
     routingTableProvider = mock(RoutingTableProvider.class);
-    RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
-    Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+    RoutingTable<RpcRouter.RouteIdentifier, String> mockRoutingTable = new MockRoutingTable<String, String>();
+    Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
     when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
 
     //mock ClientRequestHandler
@@ -81,7 +82,7 @@ public class ClientImplTest {
 
   }
 
-  @Test
+  //@Test
   public void invokeRpc_NormalCall_ShouldReturnSuccess() throws Exception {
 
     when(mockHandler.handle(any(Message.class))).
@@ -94,7 +95,7 @@ public class ClientImplTest {
     Assert.assertNull(result.getResult());
   }
 
-  @Test
+  //@Test
   public void invokeRpc_HandlerThrowsException_ShouldReturnError() throws Exception {
 
     when(mockHandler.handle(any(Message.class))).
index 9c74be93bd7d28e22222cffb8273c2ae5b2505f7..0fe0155bb6a90e48a24e3ce7d3ecaaee37f956f8 100644 (file)
@@ -38,11 +38,26 @@ public class MockRoutingTable<K, V> implements RoutingTable {
 
   }
 
+  @Override
+  public void addRoutes(Set set, Object o) throws RoutingTableException, SystemException {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void removeRoutes(Set set, Object o) throws RoutingTableException, SystemException {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
   @Override
   public void removeGlobalRoute(Object o) throws RoutingTableException, SystemException {
 
   }
 
+  @Override
+  public Object getGlobalRoute(Object o) throws RoutingTableException, SystemException {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
+  }
+
   @Override
   public Set getRoutes(Object o) {
     Set<String> routes = new HashSet<String>();
@@ -51,17 +66,17 @@ public class MockRoutingTable<K, V> implements RoutingTable {
   }
 
   @Override
-  public Set<Map.Entry> getAllRoutes() {
-    return Collections.emptySet();
+  public Object getLastAddedRoute(Object o) {
+    return null;  //To change body of implemented methods use File | Settings | File Templates.
   }
 
-  @Override
-  public Object getARoute(Object o) {
-    return null;
-  }
+//  @Override
+//  public Set<Map.Entry> getAllRoutes() {
+//    return Collections.emptySet();
+//  }
 
-  @Override
-  public void registerRouteChangeListener(RouteChangeListener routeChangeListener) {
-
-  }
+//  @Override
+//  public Object getARoute(Object o) {
+//    return null;
+//  }
 }
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProviderTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RemoteRpcProviderTest.java
new file mode 100644 (file)
index 0000000..06360aa
--- /dev/null
@@ -0,0 +1,62 @@
+package org.opendaylight.controller.sal.connector.remoterpc;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RemoteRpcProviderTest {
+  @Before
+  public void setUp() throws Exception {
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+
+  }
+
+  @Test
+  public void testSetRoutingTableProvider() throws Exception {
+
+  }
+
+  @Test
+  public void testOnSessionInitiated() throws Exception {
+
+  }
+
+  @Test
+  public void testGetSupportedRpcs() throws Exception {
+
+  }
+
+  @Test
+  public void testGetProviderFunctionality() throws Exception {
+
+  }
+
+  @Test
+  public void testInvokeRpc() throws Exception {
+
+  }
+
+  @Test
+  public void testInvokeRoutedRpc() throws Exception {
+
+  }
+
+  @Test
+  public void testStart() throws Exception {
+
+  }
+
+  @Test
+  public void testClose() throws Exception {
+
+  }
+
+  @Test
+  public void testStop() throws Exception {
+
+  }
+}
diff --git a/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java b/opendaylight/md-sal/sal-remoterpc-connector/implementation/src/test/java/org/opendaylight/controller/sal/connector/remoterpc/RouteIdentifierImplTest.java
deleted file mode 100644 (file)
index 468d782..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (c) 2013 Cisco Systems, Inc. and others.  All rights reserved.
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-
-package org.opendaylight.controller.sal.connector.remoterpc;
-
-import java.net.URI;
-
-import com.fasterxml.jackson.core.JsonParseException;
-import org.junit.Assert;
-import org.junit.Test;
-import org.opendaylight.controller.sal.connector.api.RpcRouter;
-import org.opendaylight.controller.sal.connector.remoterpc.dto.RouteIdentifierImpl;
-import org.opendaylight.yangtools.yang.common.QName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class RouteIdentifierImplTest {
-
-  Logger _logger = LoggerFactory.getLogger(RouteIdentifierImplTest.class);
-
-  private final URI namespace = URI.create("http://cisco.com/example");
-  private final QName QNAME = new QName(namespace, "heartbeat");
-
-  @Test
-  public void testToString() throws Exception {
-    RouteIdentifierImpl rId = new RouteIdentifierImpl();
-    rId.setType(QNAME);
-
-    _logger.debug(rId.toString());
-
-    Assert.assertTrue(true);
-
-  }
-
-  @Test
-  public void testFromString() throws Exception {
-    RouteIdentifierImpl rId = new RouteIdentifierImpl();
-    rId.setType(QNAME);
-
-    String s = rId.toString();
-    _logger.debug("serialized route: {}", s);
-
-    RpcRouter.RouteIdentifier ref = new RouteIdentifierImpl().fromString(s);
-    _logger.debug("deserialized route: {}", ref);
-
-    Assert.assertTrue(true);
-  }
-
-  @Test(expected = JsonParseException.class)
-  public void testFromInvalidString() throws Exception {
-    String invalidInput = "aklhdgadfa;;;;;;;]]]]=]ag" ;
-    RouteIdentifierImpl rId = new RouteIdentifierImpl();
-    rId.fromString(invalidInput);
-
-    _logger.debug("" + rId);
-    Assert.assertTrue(true);
-  }
-}
index 1b282f6ab5e5ead6d903c387cea22d015afba297..886ff426c7948d4a95d778d0f5717da1f0aa582e 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.sal.connector.remoterpc;
 import com.google.common.base.Optional;
 import junit.framework.Assert;
 import org.junit.*;
+import org.opendaylight.controller.sal.connector.api.RpcRouter;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.connector.remoterpc.utils.MessagingUtil;
 import org.opendaylight.controller.sal.core.api.Broker;
@@ -66,10 +67,9 @@ public class ServerImplTest {
 
     server = new ServerImpl(port);
     server.setBrokerSession(brokerSession);
-    server.setRoutingTableProvider(routingTableProvider);
 
-    RoutingTable<String, String> mockRoutingTable = new MockRoutingTable<String, String>();
-    Optional<RoutingTable<String, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
+    RoutingTable<RpcRouter.RouteIdentifier, String> mockRoutingTable = new MockRoutingTable<String, String>();
+    Optional<RoutingTable<RpcRouter.RouteIdentifier, String>> optionalRoutingTable = Optional.fromNullable(mockRoutingTable);
     when(routingTableProvider.getRoutingTable()).thenReturn(optionalRoutingTable);
 
     when(brokerSession.addRpcRegistrationListener(listener)).thenReturn(null);
@@ -94,11 +94,6 @@ public class ServerImplTest {
     Assert.assertEquals(ServerImpl.State.STOPPED, server.getStatus());
   }
 
-  @Test
-  public void getRoutingTableProvider_Call_ShouldReturnRoutingTable() throws Exception {
-    Assert.assertNotNull(server.getRoutingTableProvider());
-  }
-
   @Test
   public void getBrokerSession_Call_ShouldReturnBrokerSession() throws Exception {
     Optional<Broker.ProviderSession> mayBeBroker = server.getBrokerSession();
index dc2fdbf9a05382d19307d6cccb20c918c6c1aafc..a6bbe31684008c00722482b6e4eb2078d60fa4d4 100644 (file)
             </Export-Package>
             <Import-Package>
               com.sun.jersey.spi.container.servlet,
-              org.codehaus.jackson.annotate,
+              !org.codehaus.jackson.annotate,
               javax.ws.rs,
               javax.ws.rs.core,
               javax.xml.bind,
               javax.xml.bind.annotation,
               org.slf4j,
               org.apache.catalina.filters,
-              org.codehaus.jackson.jaxrs,
+              !org.codehaus.jackson.jaxrs,
               org.opendaylight.controller.sample.zeromq.provider,
               org.opendaylight.controller.sample.zeromq.consumer,
               org.opendaylight.controller.sal.utils,