Fix CacheUpdateAware mechanism in cluster.services-implementation
[controller.git] / opendaylight / clustering / services_implementation / src / main / java / org / opendaylight / controller / clustering / services_implementation / internal / ClusterManagerCommon.java
index 7bf495426f61cfb4d177583b157beafa873d6edf..fabf3e9f1d704413346d97b98c561e8d2adc8a16 100644 (file)
 package org.opendaylight.controller.clustering.services_implementation.internal;
 
 import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-
 import javax.transaction.HeuristicMixedException;
 import javax.transaction.HeuristicRollbackException;
 import javax.transaction.NotSupportedException;
@@ -22,7 +26,7 @@ import javax.transaction.RollbackException;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-
+import org.apache.felix.dm.Component;
 import org.opendaylight.controller.clustering.services.CacheConfigException;
 import org.opendaylight.controller.clustering.services.CacheExistException;
 import org.opendaylight.controller.clustering.services.CacheListenerAddException;
@@ -35,20 +39,13 @@ import org.opendaylight.controller.clustering.services.ListenRoleChangeAddExcept
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Dictionary;
-import java.util.Collections;
-import java.util.HashSet;
-import org.apache.felix.dm.Component;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 abstract public class ClusterManagerCommon implements IClusterServicesCommon {
     protected String containerName = null;
     private IClusterServices clusterService = null;
     protected static final Logger logger = LoggerFactory
             .getLogger(ClusterManagerCommon.class);
-    private Set<ICacheUpdateAware> cacheUpdateAware = Collections
-            .synchronizedSet(new HashSet<ICacheUpdateAware>());
+    private ConcurrentMap<String, GetUpdatesContainer> cacheUpdateAware =
+        new ConcurrentHashMap<String, GetUpdatesContainer>();
     private Set<ICoordinatorChangeAware> coordinatorChangeAware = Collections
             .synchronizedSet(new HashSet<ICoordinatorChangeAware>());
     private ListenCoordinatorChange coordinatorChangeListener = null;
@@ -85,15 +82,60 @@ abstract public class ClusterManagerCommon implements IClusterServicesCommon {
         }
     }
 
-    void setCacheUpdateAware(ICacheUpdateAware s) {
+    void setCacheUpdateAware(Map props, ICacheUpdateAware s) {
+        logger.trace("CacheUpdateAware being set on container:{}",
+                     this.containerName);
         if (this.cacheUpdateAware != null) {
-            this.cacheUpdateAware.add(s);
+            Set<String> caches = (Set<String>)props.get("cachenames");
+            if (caches != null) {
+                logger.trace("cachenames provided below:");
+                for (String cache : caches) {
+                    if (this.cacheUpdateAware.get(cache) != null) {
+                        logger.error("cachename:{} on container:{} has " +
+                                     "already a listener", cache,
+                                     this.containerName);
+                    } else {
+                        GetUpdatesContainer<?, ?> up =
+                            new GetUpdatesContainer(s, this.containerName,
+                                                    cache);
+                        if (up != null) {
+                            try {
+                                this.clusterService.addListener(this.containerName,
+                                                                cache, up);
+                                this.cacheUpdateAware.put(cache, up);
+                                logger.trace("cachename:{} on container:{} has " +
+                                             "been registered", cache,
+                                             this.containerName);
+                            } catch (CacheListenerAddException exc) {
+                                // Do nothing, the important is that
+                                // we don't register the listener in
+                                // the shadow, and we are not doing
+                                // that.
+                            }
+                        }
+                    }
+                }
+            }
         }
     }
 
-    void unsetCacheUpdateAware(ICacheUpdateAware s) {
+    void unsetCacheUpdateAware(Map props, ICacheUpdateAware s) {
+        logger.trace("CacheUpdateAware being unset on container:{}",
+                     this.containerName);
         if (this.cacheUpdateAware != null) {
-            this.cacheUpdateAware.remove(s);
+            Set<String> caches = (Set<String>)props.get("cachenames");
+            if (caches != null) {
+                logger.trace("cachenames provided below:");
+                GetUpdatesContainer<?, ?> up = null;
+                for (String cache : caches) {
+                    up = this.cacheUpdateAware.get(cache);
+                    if (up != null) {
+                        this.cacheUpdateAware.remove(cache);
+                        this.clusterService.removeListener(this.containerName,
+                                                           cache, up);
+                    }
+                }
+            }
         }
     }