import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.config.Configuration;
import org.opendaylight.controller.cluster.datastore.config.ModuleShardConfiguration;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.shardstrategy.ModuleShardStrategy;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
* @author Thomas Pantelis
*/
public class DistributedEntityOwnershipService implements EntityOwnershipService, AutoCloseable {
* @author Thomas Pantelis
*/
public class DistributedEntityOwnershipService implements EntityOwnershipService, AutoCloseable {
private volatile ActorRef localEntityOwnershipShard;
private volatile DataTree localEntityOwnershipShardDataTree;
private volatile ActorRef localEntityOwnershipShard;
private volatile DataTree localEntityOwnershipShardDataTree;
- public DistributedEntityOwnershipService(DistributedDataStore datastore, EntityOwnerSelectionStrategyConfig strategyConfig) {
- this.datastore = Preconditions.checkNotNull(datastore);
- this.strategyConfig = Preconditions.checkNotNull(strategyConfig);
+ private DistributedEntityOwnershipService(final ActorContext context) {
+ this.context = Preconditions.checkNotNull(context);
- public void start() {
- ActorRef shardManagerActor = datastore.getActorContext().getShardManager();
+ public static DistributedEntityOwnershipService start(final ActorContext context,
+ final EntityOwnerSelectionStrategyConfig strategyConfig) {
+ ActorRef shardManagerActor = context.getShardManager();
Collection<String> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
CreateShard createShard = new CreateShard(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
"entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames),
Collection<String> entityOwnersMemberNames = configuration.getUniqueMemberNamesForAllShards();
CreateShard createShard = new CreateShard(new ModuleShardConfiguration(EntityOwners.QNAME.getNamespace(),
"entity-owners", ENTITY_OWNERSHIP_SHARD_NAME, ModuleShardStrategy.NAME, entityOwnersMemberNames),
createShard, MESSAGE_TIMEOUT);
createFuture.onComplete(new OnComplete<Object>() {
@Override
createShard, MESSAGE_TIMEOUT);
createFuture.onComplete(new OnComplete<Object>() {
@Override
LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
} else {
LOG.info("Successfully created {} shard", ENTITY_OWNERSHIP_SHARD_NAME);
}
}
LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
} else {
LOG.info("Successfully created {} shard", ENTITY_OWNERSHIP_SHARD_NAME);
}
}
- Future<Object> future = datastore.getActorContext().executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT);
+ Future<Object> future = context.executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT);
if(failure != null) {
LOG.debug("Error sending message {} to {}", message, shardActor, failure);
} else {
LOG.debug("{} message to {} succeeded", message, shardActor, failure);
}
}
if(failure != null) {
LOG.debug("Error sending message {} to {}", message, shardActor, failure);
} else {
LOG.debug("{} message to {} succeeded", message, shardActor, failure);
}
}
throws CandidateAlreadyRegisteredException {
Preconditions.checkNotNull(entity, "entity cannot be null");
throws CandidateAlreadyRegisteredException {
Preconditions.checkNotNull(entity, "entity cannot be null");
LOG.debug("Unregistering candidate for {}", entity);
executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity));
LOG.debug("Unregistering candidate for {}", entity);
executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity));
Preconditions.checkNotNull(entityType, "entityType cannot be null");
Preconditions.checkNotNull(listener, "listener cannot be null");
Preconditions.checkNotNull(entityType, "entityType cannot be null");
Preconditions.checkNotNull(listener, "listener cannot be null");
Preconditions.checkNotNull(forEntity, "forEntity cannot be null");
DataTree dataTree = getLocalEntityOwnershipShardDataTree();
Preconditions.checkNotNull(forEntity, "forEntity cannot be null");
DataTree dataTree = getLocalEntityOwnershipShardDataTree();
Optional<DataContainerChild<? extends PathArgument, ?>> ownerLeaf = entity.getChild(ENTITY_OWNER_NODE_ID);
String owner = ownerLeaf.isPresent() ? ownerLeaf.get().getValue().toString() : null;
boolean hasOwner = !Strings.isNullOrEmpty(owner);
Optional<DataContainerChild<? extends PathArgument, ?>> ownerLeaf = entity.getChild(ENTITY_OWNER_NODE_ID);
String owner = ownerLeaf.isPresent() ? ownerLeaf.get().getValue().toString() : null;
boolean hasOwner = !Strings.isNullOrEmpty(owner);
- private DataTree getLocalEntityOwnershipShardDataTree() {
- if(localEntityOwnershipShardDataTree == null) {
+ @VisibleForTesting
+ DataTree getLocalEntityOwnershipShardDataTree() {
+ if (localEntityOwnershipShardDataTree == null) {
LOG.debug("Unregistering listener {} for entity type {}", listener, entityType);
executeLocalEntityOwnershipShardOperation(new UnregisterListenerLocal(listener, entityType));
LOG.debug("Unregistering listener {} for entity type {}", listener, entityType);
executeLocalEntityOwnershipShardOperation(new UnregisterListenerLocal(listener, entityType));
- protected EntityOwnershipShard.Builder newShardBuilder() {
- return EntityOwnershipShard.newBuilder().localMemberName(datastore.getActorContext().getCurrentMemberName())
- .ownerSelectionStrategyConfig(this.strategyConfig);
+ private static EntityOwnershipShard.Builder newShardBuilder(final ActorContext context,
+ final EntityOwnerSelectionStrategyConfig strategyConfig) {
+ return EntityOwnershipShard.newBuilder().localMemberName(context.getCurrentMemberName())
+ .ownerSelectionStrategyConfig(strategyConfig);