package org.opendaylight.mdsal.singleton.dom.impl;
+import static com.google.common.base.Verify.verify;
+import static com.google.common.base.Verify.verifyNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.google.common.base.Verify;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
+import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException;
import org.opendaylight.mdsal.eos.common.api.GenericEntity;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange;
import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener;
R extends GenericEntityOwnershipListenerRegistration<P, G>>
implements ClusterSingletonServiceProvider, GenericEntityOwnershipListener<P, C> {
- private static final Logger LOG = LoggerFactory
- .getLogger(AbstractClusterSingletonServiceProviderImpl.class.getName());
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterSingletonServiceProviderImpl.class);
- private static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
- private static final String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
+ @VisibleForTesting
+ static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
+ @VisibleForTesting
+ static final String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
private final S entityOwnershipService;
- private final ConcurrentMap<String, ClusterSingletonServiceGroup<P, E, C>> serviceGroupMap =
- new ConcurrentHashMap<>();
+ private final Map<String, ClusterSingletonServiceGroup<P, E, C>> serviceGroupMap = new ConcurrentHashMap<>();
/* EOS Entity Listeners Registration */
private R serviceEntityListenerReg;
* This method must be called once on startup to initialize this provider.
*/
public final void initializeProvider() {
- LOG.debug("Initialization method for ClusterSingletonService Provider {}", this.getClass().getName());
+ LOG.debug("Initialization method for ClusterSingletonService Provider {}", this);
this.serviceEntityListenerReg = registerListener(SERVICE_ENTITY_TYPE, entityOwnershipService);
this.asyncCloseEntityListenerReg = registerListener(CLOSE_SERVICE_ENTITY_TYPE, entityOwnershipService);
}
@Override
- public final ClusterSingletonServiceRegistration registerClusterSingletonService(
+ public final synchronized ClusterSingletonServiceRegistration registerClusterSingletonService(
@CheckForNull final ClusterSingletonService service) {
- LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service,
- this.getClass().getName());
-
- Preconditions.checkArgument(service != null);
- Preconditions.checkArgument(!Strings.isNullOrEmpty(service.getIdentifier().getValue()),
- "ClusterSingletonService idetnifier can not be null. {}", service);
+ LOG.debug("Call registrationService {} method for ClusterSingletonService Provider {}", service, this);
final String serviceIdentifier = service.getIdentifier().getValue();
- ClusterSingletonServiceGroup<P, E, C> serviceGroup = serviceGroupMap.get(serviceIdentifier);
- if (serviceGroup == null) {
- final E mainEntity = createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier);
- final E closeEntity = createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier);
- serviceGroup = new ClusterSingletonServiceGroupImpl<>(serviceIdentifier,
- mainEntity, closeEntity, entityOwnershipService, serviceGroupMap);
- serviceGroupMap.put(service.getIdentifier().getValue(), serviceGroup);
- serviceGroup.initializationClusterSingletonGroup();
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(serviceIdentifier),
+ "ClusterSingletonService identifier may not be null nor empty");
+
+ final ClusterSingletonServiceGroup<P, E, C> serviceGroup;
+ ClusterSingletonServiceGroup<P, E, C> existing = serviceGroupMap.get(serviceIdentifier);
+ if (existing == null) {
+ serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
+ serviceGroupMap.put(serviceIdentifier, serviceGroup);
+
+ try {
+ initializeOrRemoveGroup(serviceGroup);
+ } catch (CandidateAlreadyRegisteredException e) {
+ throw new IllegalArgumentException("Service group already registered", e);
+ }
+ } else {
+ serviceGroup = existing;
+ }
+
+ serviceGroup.registerService(service);
+ return new AbstractClusterSingletonServiceRegistration(service) {
+ @Override
+ protected void removeRegistration() {
+ // We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
+ // shutdown of the group and user registering it again.
+ AbstractClusterSingletonServiceProviderImpl.this.removeRegistration(serviceIdentifier, service);
+ }
+ };
+ }
+
+ private ClusterSingletonServiceGroup<P, E, C> createGroup(final String serviceIdentifier,
+ final List<ClusterSingletonService> services) {
+ return new ClusterSingletonServiceGroupImpl<>(serviceIdentifier, entityOwnershipService,
+ createEntity(SERVICE_ENTITY_TYPE, serviceIdentifier),
+ createEntity(CLOSE_SERVICE_ENTITY_TYPE, serviceIdentifier), services);
+ }
+
+ private void initializeOrRemoveGroup(final ClusterSingletonServiceGroup<P, E, C> group)
+ throws CandidateAlreadyRegisteredException {
+ try {
+ group.initialize();
+ } catch (CandidateAlreadyRegisteredException e) {
+ serviceGroupMap.remove(group.getIdentifier(), group);
+ throw e;
+ }
+ }
+
+ void removeRegistration(final String serviceIdentifier, final ClusterSingletonService service) {
+
+ final PlaceholderGroup<P, E, C> placeHolder;
+ final ListenableFuture<?> future;
+ synchronized (this) {
+ final ClusterSingletonServiceGroup<P, E, C> lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
+ if (!lookup.unregisterService(service)) {
+ return;
+ }
+
+ // Close the group and replace it with a placeholder
+ LOG.debug("Closing service group {}", serviceIdentifier);
+ future = lookup.closeClusterSingletonGroup();
+ placeHolder = new PlaceholderGroup<>(lookup, future);
+
+ final String identifier = service.getIdentifier().getValue();
+ verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
+ LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
+ }
+
+ future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
+ }
+
+ synchronized void finishShutdown(final PlaceholderGroup<P, E, C> placeHolder) {
+ final String identifier = placeHolder.getIdentifier();
+ LOG.debug("Service group {} closed", identifier);
+
+ final List<ClusterSingletonService> services = placeHolder.getServices();
+ if (services.isEmpty()) {
+ // No services, we are all done
+ if (serviceGroupMap.remove(identifier, placeHolder)) {
+ LOG.debug("Service group {} removed", placeHolder);
+ } else {
+ LOG.debug("Service group {} superseded by {}", placeHolder, serviceGroupMap.get(identifier));
+ }
+ return;
+ }
+
+ // Placeholder is being retired, we are reusing its services as the seed for the group.
+ final ClusterSingletonServiceGroup<P, E, C> group = createGroup(identifier, services);
+ Verify.verify(serviceGroupMap.replace(identifier, placeHolder, group));
+ placeHolder.setSuccessor(group);
+ LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
+
+ try {
+ initializeOrRemoveGroup(group);
+ } catch (CandidateAlreadyRegisteredException e) {
+ LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
}
- return serviceGroup.registerService(service);
}
@Override
public final void close() {
- LOG.debug("Close method for ClusterSingletonService Provider {}", this.getClass().getName());
+ LOG.debug("Close method for ClusterSingletonService Provider {}", this);
if (serviceEntityListenerReg != null) {
serviceEntityListenerReg.close();
serviceEntityListenerReg = null;
}
- final List<ListenableFuture<List<Void>>> listGroupCloseListFuture = new ArrayList<>();
+ final List<ListenableFuture<?>> listGroupCloseListFuture = new ArrayList<>();
for (final ClusterSingletonServiceGroup<P, E, C> serviceGroup : serviceGroupMap.values()) {
listGroupCloseListFuture.add(serviceGroup.closeClusterSingletonGroup());
}
- final ListenableFuture<List<List<Void>>> finalCloseFuture = Futures.allAsList(listGroupCloseListFuture);
- Futures.addCallback(finalCloseFuture, new FutureCallback<List<List<Void>>>() {
+ final ListenableFuture<List<Object>> finalCloseFuture = Futures.allAsList(listGroupCloseListFuture);
+ Futures.addCallback(finalCloseFuture, new FutureCallback<List<?>>() {
@Override
- public void onSuccess(final List<List<Void>> result) {
- cleaningProvider(null);
+ public void onSuccess(final List<?> result) {
+ cleanup();
}
@Override
public void onFailure(final Throwable throwable) {
- cleaningProvider(throwable);
+ LOG.warn("Unexpected problem by closing ClusterSingletonServiceProvider {}",
+ AbstractClusterSingletonServiceProviderImpl.this, throwable);
+ cleanup();
}
});
}
/**
* Method is called async. from close method in end of Provider lifecycle.
- *
- * @param throwable Throwable (needs for log)
*/
- protected final void cleaningProvider(@Nullable final Throwable throwable) {
- LOG.debug("Final cleaning ClusterSingletonServiceProvider {}", this.getClass().getName());
- if (throwable != null) {
- LOG.warn("Unexpected problem by closing ClusterSingletonServiceProvider {}",
- this.getClass().getName(), throwable);
- }
+ final void cleanup() {
+ LOG.debug("Final cleaning ClusterSingletonServiceProvider {}", this);
if (asyncCloseEntityListenerReg != null) {
asyncCloseEntityListenerReg.close();
asyncCloseEntityListenerReg = null;