import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.pattern.Patterns;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyWrapper;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.FirstCandidateSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
* @author Thomas Pantelis
*/
class EntityOwnershipShard extends Shard {
-
- private static final EntityOwnerSelectionStrategy DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY
- = FirstCandidateSelectionStrategy.INSTANCE;
-
private final String localMemberName;
private final EntityOwnershipShardCommitCoordinator commitCoordinator;
private final EntityOwnershipListenerSupport listenerSupport;
private final Set<String> downPeerMemberNames = new HashSet<>();
private final Map<String, String> peerIdToMemberNames = new HashMap<>();
- private final Map<String, EntityOwnerSelectionStrategy> ownerSelectionStrategies = new HashMap<>();
+ private final Map<String, EntityOwnerSelectionStrategyWrapper> ownerSelectionStrategies = new HashMap<>();
+ private final EntityOwnerSelectionStrategyWrapper defaultEntityOwnerSelectionStrategy;
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
this.localMemberName = builder.localMemberName;
this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(builder.localMemberName, LOG);
this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId());
+ this.defaultEntityOwnerSelectionStrategy =
+ createEntityOwnerSelectionStrategyWrapper(FirstCandidateSelectionStrategy.INSTANCE);
for(String peerId: getRaftActorContext().getPeerIds()) {
ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build();
@Override
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof RegisterCandidateLocal) {
- onRegisterCandidateLocal((RegisterCandidateLocal)message);
+ onRegisterCandidateLocal((RegisterCandidateLocal) message);
} else if(message instanceof UnregisterCandidateLocal) {
onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
} else if(message instanceof CandidateAdded){
onPeerDown((PeerDown) message);
} else if(message instanceof PeerUp) {
onPeerUp((PeerUp) message);
- } if(message instanceof RegisterListenerLocal) {
+ } else if(message instanceof RegisterListenerLocal) {
onRegisterListenerLocal((RegisterListenerLocal)message);
- } if(message instanceof UnregisterListenerLocal) {
- onUnregisterListenerLocal((UnregisterListenerLocal)message);
+ } else if(message instanceof UnregisterListenerLocal) {
+ onUnregisterListenerLocal((UnregisterListenerLocal) message);
+ } else if(message instanceof SelectOwner) {
+ onSelectOwner((SelectOwner) message);
} else if(!commitCoordinator.handleMessage(message, this)) {
super.onReceiveCommand(message);
}
}
+ private void onSelectOwner(SelectOwner selectOwner) {
+ String currentOwner = getCurrentOwner(selectOwner.getEntityPath());
+ if(Strings.isNullOrEmpty(currentOwner)) {
+ writeNewOwner(selectOwner.getEntityPath(), newOwner(selectOwner.getAllCandidates(),
+ selectOwner.getOwnerSelectionStrategy()));
+ }
+ }
+
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate);
if(isLeader()) {
String currentOwner = getCurrentOwner(message.getEntityPath());
if(message.getRemovedCandidate().equals(currentOwner)){
- writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates(),
- getEntityOwnerElectionStrategy(message.getEntityPath())));
+ writeNewOwner(message.getEntityPath(),
+ newOwner(message.getRemainingCandidates(), getEntityOwnerElectionStrategyWrapper(message.getEntityPath())));
}
} else {
// We're not the leader. If the removed candidate is our local member then check if we actually
}
}
- private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(YangInstanceIdentifier entityPath) {
+ private EntityOwnerSelectionStrategyWrapper getEntityOwnerElectionStrategyWrapper(YangInstanceIdentifier entityPath) {
String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath);
- EntityOwnerSelectionStrategy entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType);
+ EntityOwnerSelectionStrategyWrapper entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType);
if(entityOwnerSelectionStrategy == null){
- entityOwnerSelectionStrategy = DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY;
+ entityOwnerSelectionStrategy = defaultEntityOwnerSelectionStrategy;
ownerSelectionStrategies.put(entityType, entityOwnerSelectionStrategy);
}
String currentOwner = getCurrentOwner(message.getEntityPath());
if(Strings.isNullOrEmpty(currentOwner)){
- EntityOwnerSelectionStrategy entityOwnerSelectionStrategy
- = getEntityOwnerElectionStrategy(message.getEntityPath());
- if(entityOwnerSelectionStrategy.selectionDelayInMillis() == 0L) {
- writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(),
- entityOwnerSelectionStrategy));
+ EntityOwnerSelectionStrategyWrapper strategy = getEntityOwnerElectionStrategyWrapper(message.getEntityPath());
+ if(strategy.selectionDelayInMillis() == 0L) {
+ writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(), strategy));
} else {
- throw new UnsupportedOperationException("Delayed selection not implemented yet");
+ strategy.scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates());
}
}
}
node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()).
node(ENTITY_OWNER_NODE_ID).build();
- Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategy(entityPath));
+ Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategyWrapper(entityPath));
LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner);
searchForEntities(new EntityWalker() {
@Override
public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) {
- if(hasCandidate(entityNode, owner)) {
+ if (hasCandidate(entityNode, owner)) {
YangInstanceIdentifier entityId =
- (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
+ (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME);
YangInstanceIdentifier candidatePath = candidatePath(
entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(),
entityId, owner);
void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode);
}
+ private EntityOwnerSelectionStrategyWrapper createEntityOwnerSelectionStrategyWrapper(EntityOwnerSelectionStrategy entityOwnerSelectionStrategy){
+ return new EntityOwnerSelectionStrategyWrapper(context().system().scheduler(), self(),
+ context().system().dispatcher(), entityOwnerSelectionStrategy);
+ }
+
+ @VisibleForTesting
+ void addEntityOwnerSelectionStrategy(String entityType, Class<? extends EntityOwnerSelectionStrategy> ownerSelectionStrategyClass){
+ try {
+ EntityOwnerSelectionStrategyWrapper strategy =
+ createEntityOwnerSelectionStrategyWrapper(ownerSelectionStrategyClass.newInstance());
+ ownerSelectionStrategies.put(entityType, strategy);
+ } catch (InstantiationException | IllegalAccessException e) {
+ LOG.error("Exception occurred when adding election strategy", e);
+ }
+ }
+
public static Builder newBuilder() {
return new Builder();
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Message sent when a new owner needs to be selected
+ */
+public class SelectOwner {
+ private final YangInstanceIdentifier entityPath;
+ private final Collection<String> allCandidates;
+ private final EntityOwnerSelectionStrategy ownerSelectionStrategy;
+
+ public SelectOwner(YangInstanceIdentifier entityPath, Collection<String> allCandidates,
+ EntityOwnerSelectionStrategy ownerSelectionStrategy) {
+
+ this.entityPath = Preconditions.checkNotNull(entityPath, "entityPath should not be null");
+ this.allCandidates = Preconditions.checkNotNull(allCandidates, "allCandidates should not be null");
+ this.ownerSelectionStrategy = Preconditions.checkNotNull(ownerSelectionStrategy,
+ "ownerSelectionStrategy should not be null");
+ }
+
+ public YangInstanceIdentifier getEntityPath() {
+ return entityPath;
+ }
+
+ public Collection<String> getAllCandidates() {
+ return allCandidates;
+ }
+
+ public EntityOwnerSelectionStrategy getOwnerSelectionStrategy() {
+ return ownerSelectionStrategy;
+ }
+
+ @Override
+ public String toString() {
+ return "SelectOwner{" +
+ "entityPath=" + entityPath +
+ ", allCandidates=" + allCandidates +
+ ", ownerSelectionStrategy=" + ownerSelectionStrategy +
+ '}';
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy;
+
+import akka.actor.ActorRef;
+import akka.actor.Cancellable;
+import akka.actor.Scheduler;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * The EntityOwnerSelectionStrategyWrapper is an EntityOwnerSelectionStrategy decorator that adds the ability to
+ * schedule an owner selection job.
+ */
+public class EntityOwnerSelectionStrategyWrapper implements EntityOwnerSelectionStrategy {
+ private final Scheduler scheduler;
+ private final ActorRef shard;
+ private final ExecutionContextExecutor dispatcher;
+ private final EntityOwnerSelectionStrategy strategy;
+
+ private Cancellable lastScheduledTask;
+
+ public EntityOwnerSelectionStrategyWrapper(Scheduler scheduler,
+ ActorRef shard,
+ ExecutionContextExecutor dispatcher,
+ EntityOwnerSelectionStrategy strategy) {
+ this.scheduler = Preconditions.checkNotNull(scheduler);
+ this.shard = Preconditions.checkNotNull(shard);
+ this.dispatcher = Preconditions.checkNotNull(dispatcher);
+ this.strategy = Preconditions.checkNotNull(strategy);
+ }
+
+ /**
+ * Schedule a new owner selection job. Cancelling any outstanding job if it has not been cancelled.
+ *
+ * @param entityPath
+ * @param allCandidates
+ */
+ public void scheduleOwnerSelection(YangInstanceIdentifier entityPath, Collection<String> allCandidates){
+ if(lastScheduledTask != null && !lastScheduledTask.isCancelled()){
+ lastScheduledTask.cancel();
+ }
+ lastScheduledTask = scheduler.scheduleOnce(
+ FiniteDuration.apply(strategy.selectionDelayInMillis(), TimeUnit.MILLISECONDS)
+ , shard, new SelectOwner(entityPath, allCandidates, strategy)
+ , dispatcher, shard);
+ }
+
+ @Override
+ public long selectionDelayInMillis(){
+ return strategy.selectionDelayInMillis();
+ }
+
+ @Override
+ public String newOwner(Collection<String> viableCandidates){
+ return strategy.newOwner(viableCandidates);
+ }
+}
\ No newline at end of file
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
}
}
+
+ @Test
+ public void testDelayedEntityOwnerSelection() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ shard.underlyingActor().addEntityOwnerSelectionStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class);
+ kit.waitUntilLeader(shard);
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+ // Add a remote candidate
+
+ String remoteMemberName1 = "remoteMember1";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+
+ // Register local
+
+ shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Verify the local candidate becomes owner
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ }
+
public static class MockLeader extends UntypedActor {
volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
List<Modification> receivedModifications = new ArrayList<>();