1. Fixed not sending of events to the node that created the routing table entry
[controller.git] / opendaylight / md-sal / zeromq-routingtable / implementation / src / main / java / org / opendaylight / controller / sal / connector / remoterpc / impl / RoutingTableImpl.java
index 558c8a80d32b59aaf092a4505e680c9eabb14190..4e1dfb00588a3e2ae566a744ea1f05eb51ca4bf1 100644 (file)
@@ -10,12 +10,16 @@ package org.opendaylight.controller.sal.connector.remoterpc.impl;
 
 import com.google.common.base.Preconditions;
 import org.apache.felix.dm.Component;
-import org.opendaylight.controller.clustering.services.*;
+import org.opendaylight.controller.clustering.services.CacheConfigException;
+import org.opendaylight.controller.clustering.services.CacheExistException;
+import org.opendaylight.controller.clustering.services.CacheListenerAddException;
+import org.opendaylight.controller.clustering.services.ICacheUpdateAware;
+import org.opendaylight.controller.clustering.services.IClusterGlobalServices;
+import org.opendaylight.controller.clustering.services.IClusterServices;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RouteChangeListener;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTable;
 import org.opendaylight.controller.sal.connector.remoterpc.api.RoutingTableException;
 import org.opendaylight.controller.sal.connector.remoterpc.api.SystemException;
-import org.osgi.framework.ServiceRegistration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,243 +27,298 @@ import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
 import javax.transaction.NotSupportedException;
 import javax.transaction.RollbackException;
-import java.util.*;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 /**
  * @author: syedbahm
  */
-public class RoutingTableImpl<I, R> implements RoutingTable<I, R>,ICacheUpdateAware<I,R> {
+public class RoutingTableImpl<I, R> implements RoutingTable<I, R>, ICacheUpdateAware<I, R> {
     public static final String ROUTING_TABLE_GLOBAL_CACHE = "routing_table_global_cache";
 
-  private Logger log = LoggerFactory
-            .getLogger(RoutingTableImpl.class);
-
-  private IClusterGlobalServices clusterGlobalServices = null;
-  private RoutingTableImpl routingTableInstance = null;
-  private ConcurrentMap routingTableCache = null;
-  private List<RouteChangeListener>  routeChangeListeners = new ArrayList<RouteChangeListener>();
-  private ServiceRegistration cacheAwareRegistration = null;
-                                                      
- public RoutingTableImpl() {
-  }
-
-  @Override
-  public void addRoute(I routeId, R route) throws RoutingTableException {
-       throw new UnsupportedOperationException(" Not implemented yet!");
-  }
-
-  @Override
-  public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
-    Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
-    Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
-    try {
-
-      Set<R> existingRoute = null;
-      // ok does the global route is already registered ?
-      if ((existingRoute = getRoutes(routeId)) == null) {
-
-          if(log.isDebugEnabled()){
-              log.debug("addGlobalRoute: adding  a new route with id"+ routeId + " and value = "+route);
-          }
-        // lets start a transaction
-        clusterGlobalServices.tbegin();
-        Set<R> routes  = new HashSet<R>();
-        routes.add(route);
-        routingTableCache.put(routeId, routes);
-        clusterGlobalServices.tcommit();
-      } else {
-        throw new DuplicateRouteException(" There is already existing route " + existingRoute);
-      }
-
-    } catch (NotSupportedException e) {
-      throw new RoutingTableException("Transaction error - while trying to create route id=" + routeId + "with route" + route, e);
-    } catch (HeuristicRollbackException e) {
-      throw new RoutingTableException("Transaction error - while trying to create route id=" + routeId + "with route" + route, e);
-    } catch (RollbackException e) {
-      throw new RoutingTableException("Transaction error - while trying to create route id=" + routeId + "with route" + route, e);
-    } catch (HeuristicMixedException e) {
-      throw new RoutingTableException("Transaction error - while trying to create route id=" + routeId + "with route" + route, e);
-    } catch (javax.transaction.SystemException e){
-        throw new SystemException ( "System error occurred - while trying to create with value",e);
+    private Logger log = LoggerFactory.getLogger(RoutingTableImpl.class);
+
+    private IClusterGlobalServices clusterGlobalServices = null;
+    private RoutingTableImpl routingTableInstance = null;
+    private ConcurrentMap routingTableCache = null;
+    private Set<RouteChangeListener> routeChangeListeners = Collections
+            .synchronizedSet(new HashSet<RouteChangeListener>());
+
+    public RoutingTableImpl() {
     }
 
-  }
+    @Override
+    public void addRoute(I routeId, R route) throws RoutingTableException {
+        throw new UnsupportedOperationException(" Not implemented yet!");
+    }
 
-  @Override
-  public void removeRoute(I routeId, R route) {
-         throw new UnsupportedOperationException("Not implemented yet!");
-  }
     @Override
-    public void removeGlobalRoute(I routeId) {
-        routingTableCache.remove(routeId);
+    public void addGlobalRoute(I routeId, R route) throws RoutingTableException, SystemException {
+        Preconditions.checkNotNull(routeId, "addGlobalRoute: routeId cannot be null!");
+        Preconditions.checkNotNull(route, "addGlobalRoute: route cannot be null!");
+        try {
+
+            Set<R> existingRoute = null;
+            // ok does the global route is already registered ?
+            if ((existingRoute = getRoutes(routeId)) == null) {
+
+                if (log.isDebugEnabled()) {
+                    log.debug("addGlobalRoute: adding  a new route with id" + routeId + " and value = "
+                            + route);
+                }
+                // lets start a transaction
+                clusterGlobalServices.tbegin();
+                Set<R> routes = new HashSet<R>();
+                routes.add(route);
+                routingTableCache.put(routeId, routes);
+                clusterGlobalServices.tcommit();
+            } else {
+                throw new DuplicateRouteException(" There is already existing route " + existingRoute);
+            }
+
+        } catch (NotSupportedException e) {
+            throw new RoutingTableException("Transaction error - while trying to create route id="
+                    + routeId + "with route" + route, e);
+        } catch (HeuristicRollbackException e) {
+            throw new RoutingTableException("Transaction error - while trying to create route id="
+                    + routeId + "with route" + route, e);
+        } catch (RollbackException e) {
+            throw new RoutingTableException("Transaction error - while trying to create route id="
+                    + routeId + "with route" + route, e);
+        } catch (HeuristicMixedException e) {
+            throw new RoutingTableException("Transaction error - while trying to create route id="
+                    + routeId + "with route" + route, e);
+        } catch (javax.transaction.SystemException e) {
+            throw new SystemException("System error occurred - while trying to create with value", e);
+        }
+
     }
 
-  @Override
-  public Set<R> getRoutes(I routeId) {
+    @Override
+    public void removeRoute(I routeId, R route) {
+        throw new UnsupportedOperationException("Not implemented yet!");
+    }
 
-      //Note: currently works for global routes only wherein there is just single route
-      Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!");
-      return (Set<R>) routingTableCache.get(routeId);
-  }
+    @Override
+    public void removeGlobalRoute(I routeId) throws RoutingTableException, SystemException {
+        Preconditions.checkNotNull(routeId, "removeGlobalRoute: routeId cannot be null!");
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("removeGlobalRoute: removing  a new route with id" + routeId);
+            }
+            // lets start a transaction
+            clusterGlobalServices.tbegin();
+
+            routingTableCache.remove(routeId);
+            clusterGlobalServices.tcommit();
+
+        } catch (NotSupportedException e) {
+            throw new RoutingTableException("Transaction error - while trying to remove route id="
+                    + routeId, e);
+        } catch (HeuristicRollbackException e) {
+            throw new RoutingTableException("Transaction error - while trying to remove route id="
+                    + routeId, e);
+        } catch (RollbackException e) {
+            throw new RoutingTableException("Transaction error - while trying to remove route id="
+                    + routeId, e);
+        } catch (HeuristicMixedException e) {
+            throw new RoutingTableException("Transaction error - while trying to remove route id="
+                    + routeId, e);
+        } catch (javax.transaction.SystemException e) {
+            throw new SystemException("System error occurred - while trying to remove with value", e);
+        }
+    }
 
-  @Override
-  public R getARoute(I routeId) {
-       throw new UnsupportedOperationException("Not implemented yet!");
-  }
+    @Override
+    public Set<R> getRoutes(I routeId) {
 
-  /**
-   * Registers listener for sending any change notification
-   * 
-   * @param listener
-   */
-  @Override
-  public void registerRouteChangeListener(RouteChangeListener listener) {
-      routeChangeListeners.add(listener);
-  }
+        // Note: currently works for global routes only wherein there is just single
+        // route
+        Preconditions.checkNotNull(routeId, "getARoute: routeId cannot be null!");
+        return (Set<R>) routingTableCache.get(routeId);
+    }
 
+    @Override
+    public R getARoute(I routeId) {
+        throw new UnsupportedOperationException("Not implemented yet!");
+    }
 
     /**
-     * Returning the list of route change listeners for Unit testing
-     * Note: the package scope is default
-     * @return   List of registered RouteChangeListener<I,R> listeners
+     * @deprecated doesn't do anything will be removed once listeners used
+     *             whiteboard pattern Registers listener for sending any change
+     *             notification
+     * @param listener
      */
-  List<RouteChangeListener> getRegisteredRouteChangeListeners(){
-      return routeChangeListeners;
-  }
+    @Override
+    public void registerRouteChangeListener(RouteChangeListener listener) {
 
-  public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
-    this.clusterGlobalServices = clusterGlobalServices;
-  }
+    }
 
-  public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
-    if(cacheAwareRegistration != null) {
-        cacheAwareRegistration.unregister();
+    public void setRouteChangeListener(RouteChangeListener rcl) {
+        if(rcl != null){
+            routeChangeListeners.add(rcl);
+        }else{
+            log.warn("setRouteChangeListener called with null listener");
+        }
+    }
+
+    public void unSetRouteChangeListener(RouteChangeListener rcl) {
+        if(rcl != null){
+         routeChangeListeners.remove(rcl);
+        }else{
+            log.warn("unSetRouteChangeListener called with null listener");
+        }
     }
-    this.clusterGlobalServices = null;
-  }
 
     /**
-     * Creates the Routing Table clustered global services cache
-     * @throws CacheExistException  -- cluster global services exception when cache exist
-     * @throws CacheConfigException -- cluster global services exception during cache config
-     * @throws CacheListenerAddException  -- cluster global services exception during adding of listener
+     * Returning the set of route change listeners for Unit testing Note: the
+     * package scope is default
+     *
+     * @return List of registered RouteChangeListener<I,R> listeners
      */
+    Set<RouteChangeListener> getRegisteredRouteChangeListeners() {
+        return routeChangeListeners;
+    }
 
-  void createRoutingTableCache() throws CacheExistException, CacheConfigException, CacheListenerAddException {
-    // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
-    // should be caching?
-
-    // let us check here if the cache already exists -- if so don't create
-    if (!clusterGlobalServices.existCache(
-        ROUTING_TABLE_GLOBAL_CACHE)) {
+    public void setClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
+        this.clusterGlobalServices = clusterGlobalServices;
+    }
 
-        if(log.isDebugEnabled()){
-            log.debug("createRoutingTableCache: creating a new routing table cache "+ROUTING_TABLE_GLOBAL_CACHE );
-        }
-      routingTableCache = clusterGlobalServices.createCache(
-          ROUTING_TABLE_GLOBAL_CACHE, EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
-    } else {
-        if(log.isDebugEnabled()){
-            log.debug("createRoutingTableCache: found existing routing table cache "+ROUTING_TABLE_GLOBAL_CACHE );
+    public void unsetClusterGlobalServices(IClusterGlobalServices clusterGlobalServices) {
+        if((clusterGlobalServices != null ) &&  (this.clusterGlobalServices.equals(clusterGlobalServices))){
+            this.clusterGlobalServices = null;
         }
-      routingTableCache = clusterGlobalServices.getCache(
-          ROUTING_TABLE_GLOBAL_CACHE);
     }
 
-  }
-
-  /**
-   * Function called by the dependency manager when all the required
-   * dependencies are satisfied
-   * 
-   */
-  void init(Component c) {
-    try {
-
-      createRoutingTableCache();
-    } catch (CacheExistException e) {
-      throw new IllegalStateException("could not construct routing table cache");
-    } catch (CacheConfigException e) {
-      throw new IllegalStateException("could not construct routing table cache");
-    } catch (CacheListenerAddException e) {
-        throw new IllegalStateException("could not construct routing table cache");
+    /**
+     * Creates the Routing Table clustered global services cache
+     *
+     * @throws CacheExistException
+     *           -- cluster global services exception when cache exist
+     * @throws CacheConfigException
+     *           -- cluster global services exception during cache config
+     * @throws CacheListenerAddException
+     *           -- cluster global services exception during adding of listener
+     */
+
+    void createRoutingTableCache() throws CacheExistException, CacheConfigException,
+            CacheListenerAddException {
+        // TBD: HOW DO WE DECIDE ON PROPERTIES OF THE CACHE i.e. what duration it
+        // should be caching?
+
+        // let us check here if the cache already exists -- if so don't create
+        if (!clusterGlobalServices.existCache(ROUTING_TABLE_GLOBAL_CACHE)) {
+
+            if (log.isDebugEnabled()) {
+                log.debug("createRoutingTableCache: creating a new routing table cache "
+                        + ROUTING_TABLE_GLOBAL_CACHE);
+            }
+            routingTableCache = clusterGlobalServices.createCache(ROUTING_TABLE_GLOBAL_CACHE,
+                    EnumSet.of(IClusterServices.cacheMode.TRANSACTIONAL));
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("createRoutingTableCache: found existing routing table cache "
+                        + ROUTING_TABLE_GLOBAL_CACHE);
+            }
+            routingTableCache = clusterGlobalServices.getCache(ROUTING_TABLE_GLOBAL_CACHE);
+        }
+
     }
-  }
 
+    /**
+     * Function called by the dependency manager when all the required
+     * dependencies are satisfied
+     *
+     */
+    void init(Component c) {
+        try {
+
+            createRoutingTableCache();
+        } catch (CacheExistException e) {
+            throw new IllegalStateException("could not construct routing table cache");
+        } catch (CacheConfigException e) {
+            throw new IllegalStateException("could not construct routing table cache");
+        } catch (CacheListenerAddException e) {
+            throw new IllegalStateException("could not construct routing table cache");
+        }
+    }
 
     /**
-     * Get routing table method is useful for unit testing
-     * <note>It has package scope</note>
+     * Get routing table method is useful for unit testing <note>It has package
+     * scope</note>
      */
-    ConcurrentMap getRoutingTableCache(){
+    ConcurrentMap getRoutingTableCache() {
         return this.routingTableCache;
     }
 
+    /**
+     * Invoked when a new entry is available in the cache, the key is only
+     * provided, the value will come as an entryUpdate invocation
+     *
+     * @param key
+     *          Key for the entry just created
+     * @param cacheName
+     *          name of the cache for which update has been received
+     * @param originLocal
+     *          true if the event is generated from this node
+     */
+    @Override
+    public void entryCreated(I key, String cacheName, boolean originLocal) {
+        // TBD: do we require this.
+        if (log.isDebugEnabled()) {
+            log.debug("RoutingTableUpdates: entryCreated  routeId = " + key + " cacheName=" + cacheName);
+        }
+    }
 
-      /**
-       * Invoked when a new entry is available in the cache, the key is
-       * only provided, the value will come as an entryUpdate invocation
-       *
-       * @param key         Key for the entry just created
-       * @param cacheName   name of the cache for which update has been
-       *                    received
-       * @param originLocal true if the event is generated from this
-       *                    node
-       */
-      @Override
-      public void entryCreated(I key, String cacheName, boolean originLocal) {
-          //TBD: do we require this.
-          if(log.isDebugEnabled()){
-              log.debug("RoutingTableUpdates: entryCreated  routeId = "+key
-                      + " cacheName="+cacheName
-                      );
-          }
-      }
-
-      /**
-       * Called anytime a given entry is updated
-       *
-       * @param key         Key for the entry modified
-       * @param new_value   the new value the key will have
-       * @param cacheName   name of the cache for which update has been
-       *                    received
-       * @param originLocal true if the event is generated from this
-       *                    node
-       */
-      @Override
-      public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
-          if(log.isDebugEnabled()){
-              log.debug("RoutingTableUpdates: entryUpdated  routeId = "+key
-                      + ",value = "+ new_value
-                      + " ,cacheName="+cacheName
-                      );
-          }
-          for(RouteChangeListener rcl:routeChangeListeners){
-              rcl.onRouteUpdated(key, new_value);
-          }
-      }
-
-      /**
-       * Called anytime a given key is removed from the
-       * ConcurrentHashMap we are listening to.
-       *
-       * @param key         Key of the entry removed
-       * @param cacheName   name of the cache for which update has been
-       *                    received
-       * @param originLocal true if the event is generated from this
-       *                    node
-       */
-      @Override
-      public void entryDeleted(I key, String cacheName, boolean originLocal) {
-          if(log.isDebugEnabled()){
-              log.debug("RoutingTableUpdates: entryUpdated  routeId = "+key
-                      + " local = "+ originLocal
-                      + " cacheName="+cacheName
-                       );
-          }
-          for(RouteChangeListener rcl:routeChangeListeners){
-              rcl.onRouteDeleted(key);
-          }
-      }
-  }
\ No newline at end of file
+    /**
+     * Called anytime a given entry is updated
+     *
+     * @param key
+     *          Key for the entry modified
+     * @param new_value
+     *          the new value the key will have
+     * @param cacheName
+     *          name of the cache for which update has been received
+     * @param originLocal
+     *          true if the event is generated from this node
+     */
+    @Override
+    public void entryUpdated(I key, R new_value, String cacheName, boolean originLocal) {
+        if (log.isDebugEnabled()) {
+            log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + ",value = " + new_value
+                    + " ,cacheName=" + cacheName + " originLocal="+originLocal);
+        }
+        if (!originLocal) {
+            for (RouteChangeListener rcl : routeChangeListeners) {
+                rcl.onRouteUpdated(key, new_value);
+            }
+        }
+    }
+
+    /**
+     * Called anytime a given key is removed from the ConcurrentHashMap we are
+     * listening to.
+     *
+     * @param key
+     *          Key of the entry removed
+     * @param cacheName
+     *          name of the cache for which update has been received
+     * @param originLocal
+     *          true if the event is generated from this node
+     */
+    @Override
+    public void entryDeleted(I key, String cacheName, boolean originLocal) {
+        if (log.isDebugEnabled()) {
+            log.debug("RoutingTableUpdates: entryUpdated  routeId = " + key + " local = " + originLocal
+                    + " cacheName=" + cacheName + " originLocal="+originLocal);
+        }
+        if (!originLocal) {
+            for (RouteChangeListener rcl : routeChangeListeners) {
+                rcl.onRouteDeleted(key);
+            }
+        }
+    }
+}
\ No newline at end of file