import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
-import akka.actor.Props;
import akka.pattern.Patterns;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.ArrayList;
import java.util.Collection;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyWrapper;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.FirstCandidateSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Future;
/**
private final EntityOwnershipListenerSupport listenerSupport;
private final Set<String> downPeerMemberNames = new HashSet<>();
private final Map<String, String> peerIdToMemberNames = new HashMap<>();
+ private final Map<String, EntityOwnerSelectionStrategyWrapper> ownerSelectionStrategies = new HashMap<>();
+ private final EntityOwnerSelectionStrategyWrapper defaultEntityOwnerSelectionStrategy;
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
}
- protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
- super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
- this.localMemberName = localMemberName;
- this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(localMemberName, LOG);
+ protected EntityOwnershipShard(Builder builder) {
+ super(builder);
+ this.localMemberName = builder.localMemberName;
+ this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(builder.localMemberName, LOG);
this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
+ this.defaultEntityOwnerSelectionStrategy =
+ createEntityOwnerSelectionStrategyWrapper(FirstCandidateSelectionStrategy.INSTANCE);
- for(String peerId: peerAddresses.keySet()) {
+ for(String peerId: getRaftActorContext().getPeerIds()) {
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
peerIdToMemberNames.put(peerId, shardId.getMemberName());
}
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof RegisterCandidateLocal) {
- onRegisterCandidateLocal((RegisterCandidateLocal)message);
+ onRegisterCandidateLocal((RegisterCandidateLocal) message);
} else if(message instanceof UnregisterCandidateLocal) {
onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
} else if(message instanceof CandidateAdded){
onPeerDown((PeerDown) message);
} else if(message instanceof PeerUp) {
onPeerUp((PeerUp) message);
- } if(message instanceof RegisterListenerLocal) {
+ } else if(message instanceof RegisterListenerLocal) {
onRegisterListenerLocal((RegisterListenerLocal)message);
- } if(message instanceof UnregisterListenerLocal) {
- onUnregisterListenerLocal((UnregisterListenerLocal)message);
+ } else if(message instanceof UnregisterListenerLocal) {
+ onUnregisterListenerLocal((UnregisterListenerLocal) message);
+ } else if(message instanceof SelectOwner) {
+ onSelectOwner((SelectOwner) message);
} else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
}
+ private void onSelectOwner(SelectOwner selectOwner) {
+ String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
+ if(Strings.isNullOrEmpty(currentOwner)) {
+ writeNewOwner(selectOwner.getEntityPath(), newOwner(selectOwner.getAllCandidates(),
+ selectOwner.getOwnerSelectionStrategy()));
+ }
+ }
+
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
- listenerSupport.addEntityOwnershipListener(registerCandidate.getEntity(), registerCandidate.getCandidate());
+ listenerSupport.setHasCandidateForEntity(registerCandidate.getEntity());
NormalizedNode<?, ?> entityOwners = entityOwnersWithCandidate(registerCandidate.getEntity().getType(),
registerCandidate.getEntity().getId(), localMemberName);
LOG.debug("{}: onUnregisterCandidateLocal: {}", persistenceId(), unregisterCandidate);
Entity entity = unregisterCandidate.getEntity();
- listenerSupport.removeEntityOwnershipListener(entity, unregisterCandidate.getCandidate());
+ listenerSupport.unsetHasCandidateForEntity(entity);
YangInstanceIdentifier candidatePath = candidatePath(entity.getType(), entity.getId(), localMemberName);
commitCoordinator.commitModification(new DeleteModification(candidatePath), this);
Optional<DataContainerChild<? extends PathArgument, ?>> possibleType =
entityTypeNode.getChild(ENTITY_TYPE_NODE_ID);
String entityType = possibleType.isPresent() ? possibleType.get().getValue().toString() : null;
- if(registerListener.getEntityType().equals(entityType)) {
+ if (registerListener.getEntityType().equals(entityType)) {
Entity entity = new Entity(entityType,
(YangInstanceIdentifier) entityNode.getChild(ENTITY_ID_NODE_ID).get().getValue());
listenerSupport.notifyEntityOwnershipListener(entity, false, true, true, registerListener.getListener());
if(isLeader()) {
String currentOwner = getCurrentOwner(message.getEntityPath());
if(message.getRemovedCandidate().equals(currentOwner)){
- writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates()));
+ writeNewOwner(message.getEntityPath(),
+ newOwner(message.getRemainingCandidates(), getEntityOwnerElectionStrategyWrapper(message.getEntityPath())));
}
} else {
// We're not the leader. If the removed candidate is our local member then check if we actually
}
}
+ private EntityOwnerSelectionStrategyWrapper getEntityOwnerElectionStrategyWrapper(YangInstanceIdentifier entityPath) {
+ String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
+ EntityOwnerSelectionStrategyWrapper entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType);
+
+ if(entityOwnerSelectionStrategy == null){
+ entityOwnerSelectionStrategy = defaultEntityOwnerSelectionStrategy;
+ ownerSelectionStrategies.put(entityType, entityOwnerSelectionStrategy);
+ }
+
+ return entityOwnerSelectionStrategy;
+ }
+
private void onCandidateAdded(CandidateAdded message) {
if(!isLeader()){
return;
String currentOwner = getCurrentOwner(message.getEntityPath());
if(Strings.isNullOrEmpty(currentOwner)){
- writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates()));
+ EntityOwnerSelectionStrategyWrapper strategy = getEntityOwnerElectionStrategyWrapper(message.getEntityPath());
+ if(strategy.selectionDelayInMillis() == 0L) {
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(), strategy));
+ } else {
+ strategy.scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates());
+ }
}
}
searchForEntitiesOwnedBy(owner, new EntityWalker() {
@Override
public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- Object newOwner = newOwner(getCandidateNames(entityNode));
+
YangInstanceIdentifier entityPath = YangInstanceIdentifier.builder(ENTITY_TYPES_PATH).
node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
node(ENTITY_OWNER_NODE_ID).build();
+ Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategyWrapper(entityPath));
+
LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
modifications.addModification(new WriteModification(entityPath,
searchForEntities(new EntityWalker() {
@Override
public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- if(hasCandidate(entityNode, owner)) {
+ if (hasCandidate(entityNode, owner)) {
YangInstanceIdentifier entityId =
- (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+ (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
YangInstanceIdentifier candidatePath = candidatePath(
entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
entityId, owner);
commitCoordinator.commitModifications(modifications, this);
}
- private boolean hasCandidate(MapEntryNode entity, String candidateName) {
+ private static boolean hasCandidate(MapEntryNode entity, String candidateName) {
return ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getChild(candidateNodeKey(candidateName)).isPresent();
}
private void searchForEntitiesOwnedBy(final String owner, final EntityWalker walker) {
- DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
- Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+ Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
if(!possibleEntityTypes.isPresent()) {
return;
}
}
private void searchForEntities(EntityWalker walker) {
- DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
- Optional<NormalizedNode<?, ?>> possibleEntityTypes = snapshot.readNode(ENTITY_TYPES_PATH);
+ Optional<NormalizedNode<?, ?>> possibleEntityTypes = getDataStore().readNode(ENTITY_TYPES_PATH);
if(!possibleEntityTypes.isPresent()) {
return;
}
}
}
- private Collection<String> getCandidateNames(MapEntryNode entity) {
+ private static Collection<String> getCandidateNames(MapEntryNode entity) {
Collection<MapEntryNode> candidates = ((MapNode)entity.getChild(CANDIDATE_NODE_ID).get()).getValue();
Collection<String> candidateNames = new ArrayList<>(candidates.size());
for(MapEntryNode candidate: candidates) {
ImmutableNodes.leafNode(ENTITY_OWNER_NODE_ID, newOwner)), this);
}
- private String newOwner(Collection<String> candidates) {
- for(String candidate: candidates) {
- if(!downPeerMemberNames.contains(candidate)) {
- return candidate;
- }
+ private String newOwner(Collection<String> candidates, EntityOwnerSelectionStrategy ownerSelectionStrategy) {
+ Collection<String> viableCandidates = getViableCandidates(candidates);
+ if(viableCandidates.size() == 0){
+ return "";
}
+ return ownerSelectionStrategy.newOwner(viableCandidates);
+ }
+
+ private Collection<String> getViableCandidates(Collection<String> candidates) {
+ Collection<String> viableCandidates = new ArrayList<>();
- return "";
+ for (String candidate : candidates) {
+ if (!downPeerMemberNames.contains(candidate)) {
+ viableCandidates.add(candidate);
+ }
+ }
+ return viableCandidates;
}
private String getCurrentOwner(YangInstanceIdentifier entityId) {
- DataTreeSnapshot snapshot = getDataStore().getDataTree().takeSnapshot();
- Optional<NormalizedNode<?, ?>> optionalEntityOwner = snapshot.readNode(entityId.node(ENTITY_OWNER_QNAME));
+ Optional<NormalizedNode<?, ?>> optionalEntityOwner = getDataStore().readNode(entityId.node(ENTITY_OWNER_QNAME));
if(optionalEntityOwner.isPresent()){
return optionalEntityOwner.get().getValue().toString();
}
return null;
}
- public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
- return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
+ private static interface EntityWalker {
+ void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
+ }
+
+ private EntityOwnerSelectionStrategyWrapper createEntityOwnerSelectionStrategyWrapper(EntityOwnerSelectionStrategy entityOwnerSelectionStrategy){
+ return new EntityOwnerSelectionStrategyWrapper(context().system().scheduler(), self(),
+ context().system().dispatcher(), entityOwnerSelectionStrategy);
}
- private static class Creator extends AbstractShardCreator {
- private static final long serialVersionUID = 1L;
+ @VisibleForTesting
+ void addEntityOwnerSelectionStrategy(String entityType, Class<? extends EntityOwnerSelectionStrategy> ownerSelectionStrategyClass){
+ try {
+ EntityOwnerSelectionStrategyWrapper strategy =
+ createEntityOwnerSelectionStrategyWrapper(ownerSelectionStrategyClass.newInstance());
+ ownerSelectionStrategies.put(entityType, strategy);
+ } catch (InstantiationException | IllegalAccessException e) {
+ LOG.error("Exception occurred when adding election strategy", e);
+ }
+ }
- private final String localMemberName;
+ public static Builder newBuilder() {
+ return new Builder();
+ }
- Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext,
- final String localMemberName) {
- super(name, peerAddresses, datastoreContext, schemaContext);
+ static class Builder extends Shard.AbstractBuilder<Builder, EntityOwnershipShard> {
+ private String localMemberName;
+
+ protected Builder() {
+ super(EntityOwnershipShard.class);
+ }
+
+ Builder localMemberName(String localMemberName) {
+ checkSealed();
this.localMemberName = localMemberName;
+ return this;
}
@Override
- public Shard create() throws Exception {
- return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
+ protected void verify() {
+ super.verify();
+ Preconditions.checkNotNull(localMemberName, "localMemberName should not be null");
}
}
-
- private static interface EntityWalker {
- void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
- }
}