package org.opendaylight.controller.clustering.services_implementation.internal;
import java.net.InetAddress;
+import java.util.Collections;
+import java.util.Dictionary;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import org.apache.felix.dm.Component;
import org.opendaylight.controller.clustering.services.CacheConfigException;
import org.opendaylight.controller.clustering.services.CacheExistException;
import org.opendaylight.controller.clustering.services.CacheListenerAddException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Dictionary;
-import java.util.Collections;
-import java.util.HashSet;
-import org.apache.felix.dm.Component;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract public class ClusterManagerCommon implements IClusterServicesCommon {
+public abstract class ClusterManagerCommon implements IClusterServicesCommon {
protected String containerName = null;
- private IClusterServices clusterService = null;
+ protected IClusterServices clusterService = null;
protected static final Logger logger = LoggerFactory
.getLogger(ClusterManagerCommon.class);
- private Set<ICacheUpdateAware> cacheUpdateAware = Collections
- .synchronizedSet(new HashSet<ICacheUpdateAware>());
+ private ConcurrentMap<String, GetUpdatesContainer> cacheUpdateAware =
+ new ConcurrentHashMap<String, GetUpdatesContainer>();
private Set<ICoordinatorChangeAware> coordinatorChangeAware = Collections
.synchronizedSet(new HashSet<ICoordinatorChangeAware>());
private ListenCoordinatorChange coordinatorChangeListener = null;
* export the interface ICoordinatorChangeAware
*/
class ListenCoordinatorChange implements IListenRoleChange {
+ @Override
public void newActiveAvailable() {
if (coordinatorChangeAware != null) {
// Make sure to look the set while walking it
}
}
- void setCacheUpdateAware(ICacheUpdateAware s) {
+ void setCacheUpdateAware(Map props, ICacheUpdateAware s) {
+ logger.trace("CacheUpdateAware being set on container:{}",
+ this.containerName);
if (this.cacheUpdateAware != null) {
- this.cacheUpdateAware.add(s);
+ Set<String> caches = (Set<String>)props.get("cachenames");
+ if (caches != null) {
+ logger.trace("cachenames provided below:");
+ for (String cache : caches) {
+ if (this.cacheUpdateAware.get(cache) != null) {
+ logger.error("cachename:{} on container:{} has already a listener", cache, this.containerName);
+ } else {
+ GetUpdatesContainer<?, ?> up = new GetUpdatesContainer(s, this.containerName, cache);
+ if (up != null) {
+ try {
+ this.clusterService.addListener(this.containerName,
+ cache, up);
+ this.cacheUpdateAware.put(cache, up);
+ logger.trace("cachename:{} on container:{} has " +
+ "been registered", cache,
+ this.containerName);
+ } catch (CacheListenerAddException exc) {
+ logger.debug("Cache {} didn't exist when {} tried to register to its updates", cache, s);
+ // Do nothing, the important is that
+ // we don't register the listener in
+ // the shadow, and we are not doing
+ // that.
+ }
+ }
+ }
+ }
+ }
}
}
- void unsetCacheUpdateAware(ICacheUpdateAware s) {
+ void unsetCacheUpdateAware(Map props, ICacheUpdateAware s) {
+ logger.trace("CacheUpdateAware being unset on container:{}",
+ this.containerName);
if (this.cacheUpdateAware != null) {
- this.cacheUpdateAware.remove(s);
+ Set<String> caches = (Set<String>)props.get("cachenames");
+ if (caches != null) {
+ logger.trace("cachenames provided below:");
+ GetUpdatesContainer<?, ?> up = null;
+ for (String cache : caches) {
+ up = this.cacheUpdateAware.get(cache);
+ if (up != null) {
+ this.cacheUpdateAware.remove(cache);
+ this.clusterService.removeListener(this.containerName,
+ cache, up);
+ }
+ }
+ }
}
}
}
}
+ @Override
+ public void tbegin(long timeout, TimeUnit unit) throws NotSupportedException, SystemException {
+ if (this.clusterService != null) {
+ this.clusterService.tbegin(timeout, unit);
+ } else {
+ throw new IllegalStateException();
+ }
+ }
+
@Override
public void tcommit() throws RollbackException, HeuristicMixedException,
HeuristicRollbackException, java.lang.SecurityException,