From e9cceae4b6689c7c3880b441c13d35cbe4b13ce0 Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Tue, 6 Oct 2015 15:28:27 -0700 Subject: [PATCH] Add Delayed Owner selection base on strategy Change-Id: I04fc216ffc7e5c3fd35b34b6d03a5030c359d77f Signed-off-by: Moiz Raja --- .../entityownership/EntityOwnersModel.java | 2 +- .../entityownership/EntityOwnershipShard.java | 72 +++++++++++++------ .../entityownership/messages/SelectOwner.java | 53 ++++++++++++++ .../EntityOwnerSelectionStrategyWrapper.java | 69 ++++++++++++++++++ .../EntityOwnershipShardTest.java | 30 ++++++++ .../LastCandidateSelectionStrategy.java | 26 +++++++ 6 files changed, 228 insertions(+), 24 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/SelectOwner.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/EntityOwnerSelectionStrategyWrapper.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/LastCandidateSelectionStrategy.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java index 0be765000f..aa8d654b65 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnersModel.java @@ -114,7 +114,7 @@ final class EntityOwnersModel { static String entityTypeFromEntityPath(YangInstanceIdentifier entityPath){ YangInstanceIdentifier parent = entityPath; while(!parent.isEmpty()) { - if (ENTITY_TYPE_QNAME.equals(parent.getLastPathArgument().getNodeType())) { + if (EntityType.QNAME.equals(parent.getLastPathArgument().getNodeType())) { YangInstanceIdentifier.NodeIdentifierWithPredicates entityTypeLastPathArgument = (YangInstanceIdentifier.NodeIdentifierWithPredicates) parent.getLastPathArgument(); return (String) entityTypeLastPathArgument.getKeyValues().get(ENTITY_TYPE_QNAME); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index e14ae7f537..334e093d7f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -26,6 +26,7 @@ import static org.opendaylight.controller.cluster.datastore.entityownership.Enti import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.pattern.Patterns; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -42,9 +43,11 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Ca import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateRemoved; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy; +import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyWrapper; import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.FirstCandidateSelectionStrategy; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -70,16 +73,13 @@ import scala.concurrent.Future; * @author Thomas Pantelis */ class EntityOwnershipShard extends Shard { - - private static final EntityOwnerSelectionStrategy DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY - = FirstCandidateSelectionStrategy.INSTANCE; - private final String localMemberName; private final EntityOwnershipShardCommitCoordinator commitCoordinator; private final EntityOwnershipListenerSupport listenerSupport; private final Set downPeerMemberNames = new HashSet<>(); private final Map peerIdToMemberNames = new HashMap<>(); - private final Map ownerSelectionStrategies = new HashMap<>(); + private final Map ownerSelectionStrategies = new HashMap<>(); + private final EntityOwnerSelectionStrategyWrapper defaultEntityOwnerSelectionStrategy; private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) { return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build(); @@ -90,6 +90,8 @@ class EntityOwnershipShard extends Shard { this.localMemberName = builder.localMemberName; this.commitCoordinator = new EntityOwnershipShardCommitCoordinator(builder.localMemberName, LOG); this.listenerSupport = new EntityOwnershipListenerSupport(getContext(), persistenceId()); + this.defaultEntityOwnerSelectionStrategy = + createEntityOwnerSelectionStrategyWrapper(FirstCandidateSelectionStrategy.INSTANCE); for(String peerId: getRaftActorContext().getPeerIds()) { ShardIdentifier shardId = ShardIdentifier.builder().fromShardIdString(peerId).build(); @@ -113,7 +115,7 @@ class EntityOwnershipShard extends Shard { @Override public void onReceiveCommand(final Object message) throws Exception { if(message instanceof RegisterCandidateLocal) { - onRegisterCandidateLocal((RegisterCandidateLocal)message); + onRegisterCandidateLocal((RegisterCandidateLocal) message); } else if(message instanceof UnregisterCandidateLocal) { onUnregisterCandidateLocal((UnregisterCandidateLocal)message); } else if(message instanceof CandidateAdded){ @@ -124,15 +126,25 @@ class EntityOwnershipShard extends Shard { onPeerDown((PeerDown) message); } else if(message instanceof PeerUp) { onPeerUp((PeerUp) message); - } if(message instanceof RegisterListenerLocal) { + } else if(message instanceof RegisterListenerLocal) { onRegisterListenerLocal((RegisterListenerLocal)message); - } if(message instanceof UnregisterListenerLocal) { - onUnregisterListenerLocal((UnregisterListenerLocal)message); + } else if(message instanceof UnregisterListenerLocal) { + onUnregisterListenerLocal((UnregisterListenerLocal) message); + } else if(message instanceof SelectOwner) { + onSelectOwner((SelectOwner) message); } else if(!commitCoordinator.handleMessage(message, this)) { super.onReceiveCommand(message); } } + private void onSelectOwner(SelectOwner selectOwner) { + String currentOwner = getCurrentOwner(selectOwner.getEntityPath()); + if(Strings.isNullOrEmpty(currentOwner)) { + writeNewOwner(selectOwner.getEntityPath(), newOwner(selectOwner.getAllCandidates(), + selectOwner.getOwnerSelectionStrategy())); + } + } + private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) { LOG.debug("{}: onRegisterCandidateLocal: {}", persistenceId(), registerCandidate); @@ -250,8 +262,8 @@ class EntityOwnershipShard extends Shard { if(isLeader()) { String currentOwner = getCurrentOwner(message.getEntityPath()); if(message.getRemovedCandidate().equals(currentOwner)){ - writeNewOwner(message.getEntityPath(), newOwner(message.getRemainingCandidates(), - getEntityOwnerElectionStrategy(message.getEntityPath()))); + writeNewOwner(message.getEntityPath(), + newOwner(message.getRemainingCandidates(), getEntityOwnerElectionStrategyWrapper(message.getEntityPath()))); } } else { // We're not the leader. If the removed candidate is our local member then check if we actually @@ -271,12 +283,12 @@ class EntityOwnershipShard extends Shard { } } - private EntityOwnerSelectionStrategy getEntityOwnerElectionStrategy(YangInstanceIdentifier entityPath) { + private EntityOwnerSelectionStrategyWrapper getEntityOwnerElectionStrategyWrapper(YangInstanceIdentifier entityPath) { String entityType = EntityOwnersModel.entityTypeFromEntityPath(entityPath); - EntityOwnerSelectionStrategy entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType); + EntityOwnerSelectionStrategyWrapper entityOwnerSelectionStrategy = ownerSelectionStrategies.get(entityType); if(entityOwnerSelectionStrategy == null){ - entityOwnerSelectionStrategy = DEFAULT_ENTITY_OWNER_SELECTION_STRATEGY; + entityOwnerSelectionStrategy = defaultEntityOwnerSelectionStrategy; ownerSelectionStrategies.put(entityType, entityOwnerSelectionStrategy); } @@ -296,13 +308,11 @@ class EntityOwnershipShard extends Shard { String currentOwner = getCurrentOwner(message.getEntityPath()); if(Strings.isNullOrEmpty(currentOwner)){ - EntityOwnerSelectionStrategy entityOwnerSelectionStrategy - = getEntityOwnerElectionStrategy(message.getEntityPath()); - if(entityOwnerSelectionStrategy.selectionDelayInMillis() == 0L) { - writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(), - entityOwnerSelectionStrategy)); + EntityOwnerSelectionStrategyWrapper strategy = getEntityOwnerElectionStrategyWrapper(message.getEntityPath()); + if(strategy.selectionDelayInMillis() == 0L) { + writeNewOwner(message.getEntityPath(), newOwner(message.getAllCandidates(), strategy)); } else { - throw new UnsupportedOperationException("Delayed selection not implemented yet"); + strategy.scheduleOwnerSelection(message.getEntityPath(), message.getAllCandidates()); } } } @@ -334,7 +344,7 @@ class EntityOwnershipShard extends Shard { node(entityTypeNode.getIdentifier()).node(ENTITY_NODE_ID).node(entityNode.getIdentifier()). node(ENTITY_OWNER_NODE_ID).build(); - Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategy(entityPath)); + Object newOwner = newOwner(getCandidateNames(entityNode), getEntityOwnerElectionStrategyWrapper(entityPath)); LOG.debug("{}: Found entity {}, writing new owner {}", persistenceId(), entityPath, newOwner); @@ -351,9 +361,9 @@ class EntityOwnershipShard extends Shard { searchForEntities(new EntityWalker() { @Override public void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode) { - if(hasCandidate(entityNode, owner)) { + if (hasCandidate(entityNode, owner)) { YangInstanceIdentifier entityId = - (YangInstanceIdentifier)entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME); + (YangInstanceIdentifier) entityNode.getIdentifier().getKeyValues().get(ENTITY_ID_QNAME); YangInstanceIdentifier candidatePath = candidatePath( entityTypeNode.getIdentifier().getKeyValues().get(ENTITY_TYPE_QNAME).toString(), entityId, owner); @@ -460,6 +470,22 @@ class EntityOwnershipShard extends Shard { void onEntity(MapEntryNode entityTypeNode, MapEntryNode entityNode); } + private EntityOwnerSelectionStrategyWrapper createEntityOwnerSelectionStrategyWrapper(EntityOwnerSelectionStrategy entityOwnerSelectionStrategy){ + return new EntityOwnerSelectionStrategyWrapper(context().system().scheduler(), self(), + context().system().dispatcher(), entityOwnerSelectionStrategy); + } + + @VisibleForTesting + void addEntityOwnerSelectionStrategy(String entityType, Class ownerSelectionStrategyClass){ + try { + EntityOwnerSelectionStrategyWrapper strategy = + createEntityOwnerSelectionStrategyWrapper(ownerSelectionStrategyClass.newInstance()); + ownerSelectionStrategies.put(entityType, strategy); + } catch (InstantiationException | IllegalAccessException e) { + LOG.error("Exception occurred when adding election strategy", e); + } + } + public static Builder newBuilder() { return new Builder(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/SelectOwner.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/SelectOwner.java new file mode 100644 index 0000000000..250abf6148 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/SelectOwner.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.entityownership.messages; + +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategy; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Message sent when a new owner needs to be selected + */ +public class SelectOwner { + private final YangInstanceIdentifier entityPath; + private final Collection allCandidates; + private final EntityOwnerSelectionStrategy ownerSelectionStrategy; + + public SelectOwner(YangInstanceIdentifier entityPath, Collection allCandidates, + EntityOwnerSelectionStrategy ownerSelectionStrategy) { + + this.entityPath = Preconditions.checkNotNull(entityPath, "entityPath should not be null"); + this.allCandidates = Preconditions.checkNotNull(allCandidates, "allCandidates should not be null"); + this.ownerSelectionStrategy = Preconditions.checkNotNull(ownerSelectionStrategy, + "ownerSelectionStrategy should not be null"); + } + + public YangInstanceIdentifier getEntityPath() { + return entityPath; + } + + public Collection getAllCandidates() { + return allCandidates; + } + + public EntityOwnerSelectionStrategy getOwnerSelectionStrategy() { + return ownerSelectionStrategy; + } + + @Override + public String toString() { + return "SelectOwner{" + + "entityPath=" + entityPath + + ", allCandidates=" + allCandidates + + ", ownerSelectionStrategy=" + ownerSelectionStrategy + + '}'; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/EntityOwnerSelectionStrategyWrapper.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/EntityOwnerSelectionStrategyWrapper.java new file mode 100644 index 0000000000..da3a4e0bf7 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/EntityOwnerSelectionStrategyWrapper.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy; + +import akka.actor.ActorRef; +import akka.actor.Cancellable; +import akka.actor.Scheduler; +import com.google.common.base.Preconditions; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.SelectOwner; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.duration.FiniteDuration; + +/** + * The EntityOwnerSelectionStrategyWrapper is an EntityOwnerSelectionStrategy decorator that adds the ability to + * schedule an owner selection job. + */ +public class EntityOwnerSelectionStrategyWrapper implements EntityOwnerSelectionStrategy { + private final Scheduler scheduler; + private final ActorRef shard; + private final ExecutionContextExecutor dispatcher; + private final EntityOwnerSelectionStrategy strategy; + + private Cancellable lastScheduledTask; + + public EntityOwnerSelectionStrategyWrapper(Scheduler scheduler, + ActorRef shard, + ExecutionContextExecutor dispatcher, + EntityOwnerSelectionStrategy strategy) { + this.scheduler = Preconditions.checkNotNull(scheduler); + this.shard = Preconditions.checkNotNull(shard); + this.dispatcher = Preconditions.checkNotNull(dispatcher); + this.strategy = Preconditions.checkNotNull(strategy); + } + + /** + * Schedule a new owner selection job. Cancelling any outstanding job if it has not been cancelled. + * + * @param entityPath + * @param allCandidates + */ + public void scheduleOwnerSelection(YangInstanceIdentifier entityPath, Collection allCandidates){ + if(lastScheduledTask != null && !lastScheduledTask.isCancelled()){ + lastScheduledTask.cancel(); + } + lastScheduledTask = scheduler.scheduleOnce( + FiniteDuration.apply(strategy.selectionDelayInMillis(), TimeUnit.MILLISECONDS) + , shard, new SelectOwner(entityPath, allCandidates, strategy) + , dispatcher, shard); + } + + @Override + public long selectionDelayInMillis(){ + return strategy.selectionDelayInMillis(); + } + + @Override + public String newOwner(Collection viableCandidates){ + return strategy.newOwner(viableCandidates); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java index 1e21329c38..cb8dade4cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -48,6 +48,7 @@ import org.opendaylight.controller.cluster.datastore.entityownership.messages.Re import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; @@ -872,6 +873,35 @@ public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { } } + + @Test + public void testDelayedEntityOwnerSelection() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + TestActorRef shard = actorFactory.createTestActor(newShardProps()); + shard.underlyingActor().addEntityOwnerSelectionStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class); + kit.waitUntilLeader(shard); + + Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1); + ShardDataTree shardDataTree = shard.underlyingActor().getDataStore(); + + // Add a remote candidate + + String remoteMemberName1 = "remoteMember1"; + writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree); + + + // Register local + + shard.tell(new RegisterCandidateLocal(entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Verify the local candidate becomes owner + + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1); + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + } + public static class MockLeader extends UntypedActor { volatile CountDownLatch modificationsReceived = new CountDownLatch(1); List receivedModifications = new ArrayList<>(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/LastCandidateSelectionStrategy.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/LastCandidateSelectionStrategy.java new file mode 100644 index 0000000000..db5d56b9dc --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/selectionstrategy/LastCandidateSelectionStrategy.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class LastCandidateSelectionStrategy implements EntityOwnerSelectionStrategy { + @Override + public long selectionDelayInMillis() { + return 500; + } + + @Override + public String newOwner(Collection viableCandidates) { + List candidates = new ArrayList<>(viableCandidates); + return candidates.get(candidates.size()-1); + } +} -- 2.36.6