import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyWrapper;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.FirstCandidateSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
* @author Thomas Pantelis
*/
class EntityOwnershipShard extends Shard {
-
- private static final EntityOwnerSelectionStrategy DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY
- = FirstCandidateSelectionStrategy.INSTANCE;
-
private final String localMemberName;
private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private final EntityOwnershipListenerSupport listenerSupport;
private final Set<String> downPeerMemberNames = new HashSet<>();
private final Map<String, String> peerIdToMemberNames = new HashMap<>();
- private final Map<String, EntityOwnerSelectionStrategy> ownerSelectionStrategies = new HashMap<>();
+ private final Map<String, EntityOwnerSelectionStrategyWrapper> ownerSelectionStrategies = new HashMap<>();
+ private final EntityOwnerSelectionStrategyWrapper defaultEntityOwnerSelectionStrategy;
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
this.localMemberName = builder.localMemberName;
this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(builder.localMemberName, LOG);
this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
+ this.defaultEntityOwnerSelectionStrategy =
+ createEntityOwnerSelectionStrategyWrapper(FirstCandidateSelectionStrategy.INSTANCE);
for(String peerId: getRaftActorContext().getPeerIds()) {
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof RegisterCandidateLocal) {
- onRegisterCandidateLocal((RegisterCandidateLocal)message);
+ onRegisterCandidateLocal((RegisterCandidateLocal) message);
} else if(message instanceof UnregisterCandidateLocal) {
onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
} else if(message instanceof CandidateAdded){
onPeerDown((PeerDown) message);
} else if(message instanceof PeerUp) {
onPeerUp((PeerUp) message);
- } if(message instanceof RegisterListenerLocal) {
+ } else if(message instanceof RegisterListenerLocal) {
onRegisterListenerLocal((RegisterListenerLocal)message);
- } if(message instanceof UnregisterListenerLocal) {
- onUnregisterListenerLocal((UnregisterListenerLocal)message);
+ } else if(message instanceof UnregisterListenerLocal) {
+ onUnregisterListenerLocal((UnregisterListenerLocal) message);
+ } else if(message instanceof SelectOwner) {
+ onSelectOwner((SelectOwner) message);
} else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
}
+ private void onSelectOwner(SelectOwner selectOwner) {
+ String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
+ if(Strings.isNullOrEmpty(currentOwner)) {
+ writeNewOwner(selectOwner.getEntityPath(), newOwner(selectOwner.getAllCandidates(),
+ selectOwner.getOwnerSelectionStrategy()));
+ }
+ }
+
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
if(isLeader()) {
String currentOwner = getCurrentOwner(message.getEntityPath());
if(message.getRemovedCandidate().equals(currentOwner)){
- writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates(),
- getEntityOwnerElectionStrategy(message.getEntityPath())));
+ writeNewOwner(message.getEntityPath(),
+ newOwner(message.getRemainingCandidates(), getEntityOwnerElectionStrategyWrapper(message.getEntityPath())));
}
} else {
// We're not the leader. If the removed candidate is our local member then check if we actually
}
}
- private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(YangInstanceIdentifier entityPath) {
+ private EntityOwnerSelectionStrategyWrapper getEntityOwnerElectionStrategyWrapper(YangInstanceIdentifier entityPath) {
String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
- EntityOwnerSelectionStrategy entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType);
+ EntityOwnerSelectionStrategyWrapper entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType);
if(entityOwnerSelectionStrategy == null){
- entityOwnerSelectionStrategy = DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY;
+ entityOwnerSelectionStrategy = defaultEntityOwnerSelectionStrategy;
ownerSelectionStrategies.put(entityType, entityOwnerSelectionStrategy);
}
String currentOwner = getCurrentOwner(message.getEntityPath());
if(Strings.isNullOrEmpty(currentOwner)){
- EntityOwnerSelectionStrategy entityOwnerSelectionStrategy
- = getEntityOwnerElectionStrategy(message.getEntityPath());
- if(entityOwnerSelectionStrategy.selectionDelayInMillis() == 0L) {
- writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(),
- entityOwnerSelectionStrategy));
+ EntityOwnerSelectionStrategyWrapper strategy = getEntityOwnerElectionStrategyWrapper(message.getEntityPath());
+ if(strategy.selectionDelayInMillis() == 0L) {
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(), strategy));
} else {
- throw new UnsupportedOperationException("Delayed selection not implemented yet");
+ strategy.scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates());
}
}
}
node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
node(ENTITY_OWNER_NODE_ID).build();
- Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategy(entityPath));
+ Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategyWrapper(entityPath));
LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
searchForEntities(new EntityWalker() {
@Override
public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- if(hasCandidate(entityNode, owner)) {
+ if (hasCandidate(entityNode, owner)) {
YangInstanceIdentifier entityId =
- (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+ (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
YangInstanceIdentifier candidatePath = candidatePath(
entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
entityId, owner);
void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
}
+ private EntityOwnerSelectionStrategyWrapper createEntityOwnerSelectionStrategyWrapper(EntityOwnerSelectionStrategy entityOwnerSelectionStrategy){
+ return new EntityOwnerSelectionStrategyWrapper(context().system().scheduler(), self(),
+ context().system().dispatcher(), entityOwnerSelectionStrategy);
+ }
+
+ @VisibleForTesting
+ void addEntityOwnerSelectionStrategy(String entityType, Class<? extends EntityOwnerSelectionStrategy> ownerSelectionStrategyClass){
+ try {
+ EntityOwnerSelectionStrategyWrapper strategy =
+ createEntityOwnerSelectionStrategyWrapper(ownerSelectionStrategyClass.newInstance());
+ ownerSelectionStrategies.put(entityType, strategy);
+ } catch (InstantiationException | IllegalAccessException e) {
+ LOG.error("Exception occurred when adding election strategy", e);
+ }
+ }
+
public static Builder newBuilder() {
return new Builder();
}