X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fclustering%2Fservices_implementation%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fclustering%2Fservices_implementation%2Finternal%2FClusterManagerCommon.java;h=06e5bc5b611c0480de04ff4bff814095804048bf;hb=5aa8f995feede44d69bc26e70a67e6c44b01c758;hp=2afbabe87fd35bbaa232bff7fb57919fb7cde032;hpb=42210c03b0a4c54706320ba9f55794c0abd4d201;p=controller.git diff --git a/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManagerCommon.java b/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManagerCommon.java index 2afbabe87f..06e5bc5b61 100644 --- a/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManagerCommon.java +++ b/opendaylight/clustering/services_implementation/src/main/java/org/opendaylight/controller/clustering/services_implementation/internal/ClusterManagerCommon.java @@ -10,10 +10,16 @@ package org.opendaylight.controller.clustering.services_implementation.internal; import java.net.InetAddress; +import java.util.Collections; +import java.util.Dictionary; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; @@ -21,8 +27,8 @@ import javax.transaction.NotSupportedException; import javax.transaction.RollbackException; import javax.transaction.SystemException; import javax.transaction.Transaction; -import javax.transaction.TransactionManager; +import org.apache.felix.dm.Component; import org.opendaylight.controller.clustering.services.CacheConfigException; import org.opendaylight.controller.clustering.services.CacheExistException; import org.opendaylight.controller.clustering.services.CacheListenerAddException; @@ -35,20 +41,13 @@ import org.opendaylight.controller.clustering.services.ListenRoleChangeAddExcept import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Dictionary; -import java.util.Collections; -import java.util.HashSet; -import org.apache.felix.dm.Component; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract public class ClusterManagerCommon implements IClusterServicesCommon { +public abstract class ClusterManagerCommon implements IClusterServicesCommon { protected String containerName = null; - private IClusterServices clusterService = null; + protected IClusterServices clusterService = null; protected static final Logger logger = LoggerFactory .getLogger(ClusterManagerCommon.class); - private Set cacheUpdateAware = Collections - .synchronizedSet(new HashSet()); + private ConcurrentMap cacheUpdateAware = + new ConcurrentHashMap(); private Set coordinatorChangeAware = Collections .synchronizedSet(new HashSet()); private ListenCoordinatorChange coordinatorChangeListener = null; @@ -59,6 +58,7 @@ abstract public class ClusterManagerCommon implements IClusterServicesCommon { * export the interface ICoordinatorChangeAware */ class ListenCoordinatorChange implements IListenRoleChange { + @Override public void newActiveAvailable() { if (coordinatorChangeAware != null) { // Make sure to look the set while walking it @@ -85,15 +85,57 @@ abstract public class ClusterManagerCommon implements IClusterServicesCommon { } } - void setCacheUpdateAware(ICacheUpdateAware s) { + void setCacheUpdateAware(Map props, ICacheUpdateAware s) { + logger.trace("CacheUpdateAware being set on container:{}", + this.containerName); if (this.cacheUpdateAware != null) { - this.cacheUpdateAware.add(s); + Set caches = (Set)props.get("cachenames"); + if (caches != null) { + logger.trace("cachenames provided below:"); + for (String cache : caches) { + if (this.cacheUpdateAware.get(cache) != null) { + logger.error("cachename:{} on container:{} has already a listener", cache, this.containerName); + } else { + GetUpdatesContainer up = new GetUpdatesContainer(s, this.containerName, cache); + if (up != null) { + try { + this.clusterService.addListener(this.containerName, + cache, up); + this.cacheUpdateAware.put(cache, up); + logger.trace("cachename:{} on container:{} has " + + "been registered", cache, + this.containerName); + } catch (CacheListenerAddException exc) { + logger.debug("Cache {} didn't exist when {} tried to register to its updates", cache, s); + // Do nothing, the important is that + // we don't register the listener in + // the shadow, and we are not doing + // that. + } + } + } + } + } } } - void unsetCacheUpdateAware(ICacheUpdateAware s) { + void unsetCacheUpdateAware(Map props, ICacheUpdateAware s) { + logger.trace("CacheUpdateAware being unset on container:{}", + this.containerName); if (this.cacheUpdateAware != null) { - this.cacheUpdateAware.remove(s); + Set caches = (Set)props.get("cachenames"); + if (caches != null) { + logger.trace("cachenames provided below:"); + GetUpdatesContainer up = null; + for (String cache : caches) { + up = this.cacheUpdateAware.get(cache); + if (up != null) { + this.cacheUpdateAware.remove(cache); + this.clusterService.removeListener(this.containerName, + cache, up); + } + } + } } } @@ -116,7 +158,7 @@ abstract public class ClusterManagerCommon implements IClusterServicesCommon { Dictionary props = c.getServiceProperties(); if (props != null) { this.containerName = (String) props.get("containerName"); - logger.debug("Running containerName:" + this.containerName); + logger.debug("Running containerName: {}", this.containerName); } else { // In the Global instance case the containerName is empty this.containerName = ""; @@ -214,6 +256,15 @@ abstract public class ClusterManagerCommon implements IClusterServicesCommon { } } + @Override + public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException { + if (this.clusterService != null) { + this.clusterService.tbegin(timeout, unit); + } else { + throw new IllegalStateException(); + } + } + @Override public void tcommit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, java.lang.SecurityException,