/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.mdsal.singleton.dom.impl; import static com.google.common.base.Verify.verify; import static com.google.common.base.Verify.verifyNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.base.Verify; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.CheckForNull; import javax.annotation.Nonnull; import org.opendaylight.mdsal.eos.common.api.CandidateAlreadyRegisteredException; import org.opendaylight.mdsal.eos.common.api.GenericEntity; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipChange; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListener; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipListenerRegistration; import org.opendaylight.mdsal.eos.common.api.GenericEntityOwnershipService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonService; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider; import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceRegistration; import org.opendaylight.yangtools.concepts.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Abstract class {@link AbstractClusterSingletonServiceProviderImpl} represents implementations of * {@link ClusterSingletonServiceProvider} and it implements {@link GenericEntityOwnershipListener} * for providing OwnershipChange for all registered {@link ClusterSingletonServiceGroup} entity * candidate. * * @param
the instance identifier path type
* @param , E extends GenericEntity ,
C extends GenericEntityOwnershipChange ,
G extends GenericEntityOwnershipListener ,
S extends GenericEntityOwnershipService ,
R extends GenericEntityOwnershipListenerRegistration >
implements ClusterSingletonServiceProvider, GenericEntityOwnershipListener {
private static final Logger LOG = LoggerFactory.getLogger(AbstractClusterSingletonServiceProviderImpl.class);
@VisibleForTesting
static final String SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.ServiceEntityType";
@VisibleForTesting
static final String CLOSE_SERVICE_ENTITY_TYPE = "org.opendaylight.mdsal.AsyncServiceCloseEntityType";
private final S entityOwnershipService;
private final Map serviceGroup;
ClusterSingletonServiceGroup existing = serviceGroupMap.get(serviceIdentifier);
if (existing == null) {
serviceGroup = createGroup(serviceIdentifier, new ArrayList<>(1));
serviceGroupMap.put(serviceIdentifier, serviceGroup);
try {
initializeOrRemoveGroup(serviceGroup);
} catch (CandidateAlreadyRegisteredException e) {
throw new IllegalArgumentException("Service group already registered", e);
}
} else {
serviceGroup = existing;
}
serviceGroup.registerService(service);
return new AbstractClusterSingletonServiceRegistration(service) {
@Override
protected void removeRegistration() {
// We need to bounce the unregistration through a ordered lock in order not to deal with asynchronous
// shutdown of the group and user registering it again.
AbstractClusterSingletonServiceProviderImpl.this.removeRegistration(serviceIdentifier, service);
}
};
}
private ClusterSingletonServiceGroup createGroup(final String serviceIdentifier,
final List group)
throws CandidateAlreadyRegisteredException {
try {
group.initialize();
} catch (CandidateAlreadyRegisteredException e) {
serviceGroupMap.remove(group.getIdentifier(), group);
throw e;
}
}
void removeRegistration(final String serviceIdentifier, final ClusterSingletonService service) {
final PlaceholderGroup placeHolder;
final ListenableFuture> future;
synchronized (this) {
final ClusterSingletonServiceGroup lookup = verifyNotNull(serviceGroupMap.get(serviceIdentifier));
if (!lookup.unregisterService(service)) {
return;
}
// Close the group and replace it with a placeholder
LOG.debug("Closing service group {}", serviceIdentifier);
future = lookup.closeClusterSingletonGroup();
placeHolder = new PlaceholderGroup<>(lookup, future);
final String identifier = service.getIdentifier().getValue();
verify(serviceGroupMap.replace(identifier, lookup, placeHolder));
LOG.debug("Replaced group {} with {}", serviceIdentifier, placeHolder);
}
future.addListener(() -> finishShutdown(placeHolder), MoreExecutors.directExecutor());
}
synchronized void finishShutdown(final PlaceholderGroup placeHolder) {
final String identifier = placeHolder.getIdentifier();
LOG.debug("Service group {} closed", identifier);
final List group = createGroup(identifier, services);
Verify.verify(serviceGroupMap.replace(identifier, placeHolder, group));
placeHolder.setSuccessor(group);
LOG.debug("Service group upgraded from {} to {}", placeHolder, group);
try {
initializeOrRemoveGroup(group);
} catch (CandidateAlreadyRegisteredException e) {
LOG.error("Failed to register delayed group {}, it will remain inoperational", identifier, e);
}
}
@Override
public final void close() {
LOG.debug("Close method for ClusterSingletonService Provider {}", this);
if (serviceEntityListenerReg != null) {
serviceEntityListenerReg.close();
serviceEntityListenerReg = null;
}
final List serviceGroup : serviceGroupMap.values()) {
listGroupCloseListFuture.add(serviceGroup.closeClusterSingletonGroup());
}
final ListenableFuture serviceHolder = serviceGroupMap.get(serviceIdentifier);
if (serviceHolder != null) {
serviceHolder.ownershipChanged(ownershipChange);
} else {
LOG.debug("ClusterSingletonServiceGroup was not found for serviceIdentifier {}", serviceIdentifier);
}
}
/**
* Method implementation registers a defined {@link GenericEntityOwnershipListenerRegistration} type
* EntityOwnershipListenerRegistration.
*
* @param entityType the type of the entity
* @param entityOwnershipServiceInst - EOS type
* @return instance of EntityOwnershipListenerRegistration
*/
protected abstract R registerListener(String entityType, S entityOwnershipServiceInst);
/**
* Creates an extended {@link GenericEntity} instance.
*
* @param entityType the type of the entity
* @param entityIdentifier the identifier of the entity
* @return instance of Entity extended GenericEntity type
*/
protected abstract E createEntity(String entityType, String entityIdentifier);
/**
* Method is responsible for parsing ServiceGroupIdentifier from E entity.
*
* @param entity instance of GenericEntity type
* @return ServiceGroupIdentifier parsed from entity key value.
*/
protected abstract String getServiceIdentifierFromEntity(E entity);
/**
* Method is called async. from close method in end of Provider lifecycle.
*/
final void cleanup() {
LOG.debug("Final cleaning ClusterSingletonServiceProvider {}", this);
if (asyncCloseEntityListenerReg != null) {
asyncCloseEntityListenerReg.close();
asyncCloseEntityListenerReg = null;
}
serviceGroupMap.clear();
}
}
the GenericEntityOwnershipService type
* @param > finalCloseFuture = Futures.allAsList(listGroupCloseListFuture);
Futures.addCallback(finalCloseFuture, new FutureCallback
>() {
@Override
public void onSuccess(final List> result) {
cleanup();
}
@Override
public void onFailure(final Throwable throwable) {
LOG.warn("Unexpected problem by closing ClusterSingletonServiceProvider {}",
AbstractClusterSingletonServiceProviderImpl.this, throwable);
cleanup();
}
});
}
@Override
public final void ownershipChanged(final C ownershipChange) {
LOG.debug("Ownership change for ClusterSingletonService Provider {}", ownershipChange);
final String serviceIdentifier = getServiceIdentifierFromEntity(ownershipChange.getEntity());
final ClusterSingletonServiceGroup