From: Tom Pantelis Date: Sun, 9 Aug 2015 04:22:02 +0000 (-0400) Subject: Bug 4105: Implement RegisterCandidate in EntityOwnershipShard X-Git-Tag: release/beryllium~311 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=a507f19b518f36065c74f4f88c9327ede28ff640 Bug 4105: Implement RegisterCandidate in EntityOwnershipShard Change-Id: Idab615399d81a8451e22bfabd30aed9a98e4b037 Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-common-api/pom.xml b/opendaylight/md-sal/sal-common-api/pom.xml index 9b30c72d4c..438067b4d3 100644 --- a/opendaylight/md-sal/sal-common-api/pom.xml +++ b/opendaylight/md-sal/sal-common-api/pom.xml @@ -35,6 +35,16 @@ org.opendaylight.yangtools yang-data-api + + org.apache.commons + commons-lang3 + test + + + junit + junit + test + diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java index 3b9f8a425e..93d001216a 100644 --- a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.md.sal.common.api.clustering; import com.google.common.base.Preconditions; +import java.io.Serializable; import javax.annotation.Nonnull; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -31,7 +32,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; * referenced by the YangInstanceIdentifier if the inventory node stored in the data store. *

*/ -public final class Entity { +public final class Entity implements Serializable { + private static final long serialVersionUID = 1L; private final String type; private final YangInstanceIdentifier id; @@ -90,9 +92,7 @@ public final class Entity { @Override public int hashCode() { - int result = type != null ? type.hashCode() : 0; - result = 31 * result + (id != null ? id.hashCode() : 0); - return result; + return 31 * type.hashCode() + id.hashCode(); } @Override diff --git a/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java b/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java new file mode 100644 index 0000000000..8636de2be0 --- /dev/null +++ b/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.api.clustering; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.Test; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Unit tests for Entity. + * + * @author Thomas Pantelis + */ +public class EntityTest { + static String ENTITY_TYPE1 = "type1"; + static String ENTITY_TYPE2 = "type2"; + static final QName QNAME1 = QName.create("test", "2015-08-14", "1"); + static final QName QNAME2 = QName.create("test", "2015-08-14", "2"); + static final YangInstanceIdentifier YANGID1 = YangInstanceIdentifier.of(QNAME1); + static final YangInstanceIdentifier YANGID2 = YangInstanceIdentifier.of(QNAME2); + + @Test + public void testHashCode() { + Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1); + + assertEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE1, YANGID1).hashCode()); + assertNotEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE2, YANGID2).hashCode()); + } + + @Test + public void testEquals() { + Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1); + + assertEquals("Same", true, entity1.equals(entity1)); + assertEquals("Same", true, entity1.equals(new Entity(ENTITY_TYPE1, YANGID1))); + assertEquals("Different entity type", false, entity1.equals(new Entity(ENTITY_TYPE2, YANGID1))); + assertEquals("Different yang ID", false, entity1.equals(new Entity(ENTITY_TYPE1, YANGID2))); + assertEquals("Different Object", false, entity1.equals(new Object())); + assertEquals("Equals null", false, entity1.equals(null)); + } + + @Test + public void testSerialization() { + Entity entity = new Entity(ENTITY_TYPE1, YANGID1); + + Entity clone = SerializationUtils.clone(entity); + + assertEquals("getType", entity.getType(), clone.getType()); + assertEquals("getId", entity.getId(), clone.getId()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 7b34f5df60..6aee29dd40 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -408,6 +408,16 @@ public class Shard extends RaftActor { getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf()); } + protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) { + try { + commitCoordinator.handleBatchedModifications(batched, sender, this); + } catch (Exception e) { + LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), + batched.getTransactionID(), e); + sender.tell(new akka.actor.Status.Failure(e), getSelf()); + } + } + private void handleBatchedModifications(BatchedModifications batched) { // This message is sent to prepare the modifications transaction directly on the Shard as an // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last @@ -424,13 +434,7 @@ public class Shard extends RaftActor { if(isLeader()) { failIfIsolatedLeader(getSender()); - try { - commitCoordinator.handleBatchedModifications(batched, getSender(), this); - } catch (Exception e) { - LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), - batched.getTransactionID(), e); - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); - } + handleBatchedModificationsLocal(batched, getSender()); } else { ActorSelection leader = getLeader(); if(leader != null) { @@ -446,7 +450,7 @@ public class Shard extends RaftActor { } private boolean failIfIsolatedLeader(ActorRef sender) { - if(getRaftState() == RaftState.IsolatedLeader) { + if(isIsolatedLeader()) { sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( "Shard %s was the leader but has lost contact with all of its followers. Either all" + " other follower nodes are down or this node is isolated by a network partition.", @@ -457,6 +461,10 @@ public class Shard extends RaftActor { return false; } + protected boolean isIsolatedLeader() { + return getRaftState() == RaftState.IsolatedLeader; + } + private void handleReadyLocalTransaction(final ReadyLocalTransaction message) { if (isLeader()) { failIfIsolatedLeader(getSender()); @@ -684,6 +692,10 @@ public class Shard extends RaftActor { return commitCoordinator; } + protected DatastoreContext getDatastoreContext() { + return datastoreContext; + } + protected abstract static class AbstractShardCreator implements Creator { private static final long serialVersionUID = 1L; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java index 13ecfffefe..f51f579443 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java @@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.util.Timeout; +import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -138,6 +139,11 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService } protected EntityOwnershipShardPropsCreator newShardPropsCreator() { - return new EntityOwnershipShardPropsCreator(); + return new EntityOwnershipShardPropsCreator(datastore.getActorContext().getCurrentMemberName()); + } + + @VisibleForTesting + ActorRef getLocalEntityOwnershipShard() { + return localEntityOwnershipShard; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 52f85f481e..20a69022db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -7,30 +7,65 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import akka.actor.ActorSelection; import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.AskTimeoutException; +import akka.pattern.Patterns; +import java.util.ArrayList; +import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import scala.concurrent.Future; /** * Special Shard for EntityOwnership. * * @author Thomas Pantelis */ -public class EntityOwnershipShard extends Shard { +class EntityOwnershipShard extends Shard { + static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME); + static final QName ENTITY_QNAME = org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller. + md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity.QNAME; + static final QName CANDIDATE_NAME = QName.create(Candidate.QNAME, "name"); + static final QName ENTITY_ID = QName.create(ENTITY_QNAME, "id"); + static final QName ENTITY_TYPE = QName.create(EntityType.QNAME, "type"); + + private int transactionIDCounter = 0; + private final String localMemberName; + private final List retryModifications = new ArrayList<>(); private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) { return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build(); } protected EntityOwnershipShard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext) { + DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) { super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext); + this.localMemberName = localMemberName; } @Override @@ -50,31 +85,127 @@ public class EntityOwnershipShard extends Shard { } private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) { - // TODO - implement + LOG.debug("onRegisterCandidateLocal: {}", registerCandidate); + + // TODO - add the listener locally. + + BatchedModifications modifications = new BatchedModifications( + TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(), + DataStoreVersions.CURRENT_VERSION, ""); + modifications.setDoCommitOnReady(true); + modifications.setReady(true); + modifications.setTotalMessagesSent(1); + + NormalizedNode entityOwners = createEntityOwnersWithCandidate(registerCandidate.getEntity(), localMemberName); + modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners)); + + tryCommitModifications(modifications); + getSender().tell(SuccessReply.INSTANCE, getSelf()); } + private NormalizedNode createEntityOwnersWithCandidate(Entity entity, String memberName) { + MapNode candidateNode = ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild( + ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME, memberName)).build(); + + MapEntryNode entityNode = ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID, entity.getId()). + addChild(candidateNode).build(); + + MapEntryNode entityTypeNode = ImmutableNodes.mapEntryBuilder(EntityType.QNAME, ENTITY_TYPE, entity.getType()). + addChild(ImmutableNodes.mapNodeBuilder(ENTITY_QNAME).addChild(entityNode).build()).build(); + + return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(EntityOwners.QNAME)). + addChild(ImmutableNodes.mapNodeBuilder(EntityType.QNAME).addChild(entityTypeNode).build()).build(); + } + + private void tryCommitModifications(final BatchedModifications modifications) { + if(isLeader()) { + if(isIsolatedLeader()) { + LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID()); + + retryModifications.add(modifications); + } else { + LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID()); + + // Note that it's possible the commit won't get consensus and will timeout and not be applied + // to the state. However we don't need to retry it in that case b/c it will be committed to + // the journal first and, once a majority of followers come back on line and it is replicated, + // it will be applied at that point. + handleBatchedModificationsLocal(modifications, self()); + } + } else { + final ActorSelection leader = getLeader(); + if (leader != null) { + LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader); + + Future future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis( + getDatastoreContext().getShardTransactionCommitTimeoutInSeconds())); + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if(failure != null) { + if(failure instanceof AskTimeoutException) { + LOG.debug("BatchedModifications {} to leader {} timed out - retrying", + modifications.getTransactionID(), leader); + tryCommitModifications(modifications); + } else { + LOG.error("BatchedModifications {} to leader {} failed", + modifications.getTransactionID(), leader, failure); + } + } else { + LOG.debug("BatchedModifications {} to leader {} succeeded", + modifications.getTransactionID(), leader); + } + } + }, getContext().dispatcher()); + } else { + LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID()); + + retryModifications.add(modifications); + } + } + } + + @Override + protected void onStateChanged() { + super.onStateChanged(); + + if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) { + LOG.debug("# BatchedModifications to retry {}", retryModifications.size()); + + List retryModificationsCopy = new ArrayList<>(retryModifications); + retryModifications.clear(); + for(BatchedModifications mods: retryModificationsCopy) { + tryCommitModifications(mods); + } + } + } + private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) { // TODO - implement getSender().tell(SuccessReply.INSTANCE, getSelf()); } public static Props props(final ShardIdentifier name, final Map peerAddresses, - final DatastoreContext datastoreContext, final SchemaContext schemaContext) { - return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext)); + final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) { + return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName)); } private static class Creator extends AbstractShardCreator { private static final long serialVersionUID = 1L; + private final String localMemberName; + Creator(final ShardIdentifier name, final Map peerAddresses, - final DatastoreContext datastoreContext, final SchemaContext schemaContext) { + final DatastoreContext datastoreContext, final SchemaContext schemaContext, + final String localMemberName) { super(name, peerAddresses, datastoreContext, schemaContext); + this.localMemberName = localMemberName; } @Override public Shard create() throws Exception { - return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext); + return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java index e28db39456..a00f514d79 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java @@ -20,10 +20,15 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * @author Thomas Pantelis */ class EntityOwnershipShardPropsCreator implements ShardPropsCreator { + private final String localMemberName; + + EntityOwnershipShardPropsCreator(String localMemberName) { + this.localMemberName = localMemberName; + } @Override public Props newProps(ShardIdentifier shardId, Map peerAddresses, DatastoreContext datastoreContext, SchemaContext schemaContext) { - return EntityOwnershipShard.props(shardId, peerAddresses, datastoreContext, schemaContext); + return EntityOwnershipShard.props(shardId, peerAddresses, datastoreContext, schemaContext, localMemberName); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/entity-owners.yang b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/entity-owners.yang index 40067198ab..24fe7a36f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/entity-owners.yang +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/yang/entity-owners.yang @@ -49,13 +49,17 @@ module entity-owners { list entity { key id; leaf id { - type string; + type instance-identifier; } + leaf owner { + type string; + } + // A list of all the candidates that would like to own the entity list candidate { - key id; - leaf id { + key name; + leaf name { type string; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 3453aac750..97c9e923a9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -13,7 +13,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; - import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; @@ -242,7 +241,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ return cohort; } - public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) + public static NormalizedNode readStore(final TestActorRef shard, final YangInstanceIdentifier id) throws ExecutionException, InterruptedException { return readStore(shard.underlyingActor().getDataStore().getDataTree(), id); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java index 7887290707..281a190e94 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java @@ -25,11 +25,11 @@ import scala.concurrent.duration.FiniteDuration; public class ShardTestKit extends JavaTestKit { - protected ShardTestKit(ActorSystem actorSystem) { + public ShardTestKit(ActorSystem actorSystem) { super(actorSystem); } - protected void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ + public void waitForLogMessage(final Class logLevel, ActorRef subject, String logMessage){ // Wait for a specific log message to show up final boolean result = new JavaTestKit.EventFilter(logLevel @@ -46,7 +46,7 @@ public class ShardTestKit extends JavaTestKit { } - protected void waitUntilLeader(ActorRef shard) { + public void waitUntilLeader(ActorRef shard) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(shard, new FindLeader(), new Timeout(duration)); @@ -68,7 +68,7 @@ public class ShardTestKit extends JavaTestKit { Assert.fail("Leader not found for shard " + shard.path()); } - protected void waitUntilNoLeader(ActorRef shard) { + public void waitUntilNoLeader(ActorRef shard) { FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS); for(int i = 0; i < 20 * 5; i++) { Future future = Patterns.ask(shard, new FindLeader(), new Timeout(duration)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java new file mode 100644 index 0000000000..1f074ecde5 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import com.google.common.base.Optional; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType; +import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; +import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.MapNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Abstract base class providing utility methods. + * + * @author Thomas Pantelis + */ +public class AbstractEntityOwnershipTest extends AbstractActorTest { + protected void verifyEntityCandidate(NormalizedNode node, String entityType, + YangInstanceIdentifier entityId, String candidateName) { + try { + assertNotNull("Missing " + EntityOwners.QNAME.toString(), node); + assertTrue(node instanceof ContainerNode); + + ContainerNode entityOwnersNode = (ContainerNode) node; + + MapEntryNode entityTypeEntry = getMapEntryNodeChild(entityOwnersNode, EntityType.QNAME, + EntityOwnershipShard.ENTITY_TYPE, entityType); + + MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, EntityOwnershipShard.ENTITY_QNAME, + EntityOwnershipShard.ENTITY_ID, entityId); + + getMapEntryNodeChild(entityEntry, Candidate.QNAME, EntityOwnershipShard.CANDIDATE_NAME, candidateName); + } catch(AssertionError e) { + throw new AssertionError("Verification of enitity candidate failed - returned data was: " + node, e); + } + } + + protected MapEntryNode getMapEntryNodeChild(DataContainerNode parent, QName childMap, + QName child, Object key) { + Optional> childNode = + parent.getChild(new NodeIdentifier(childMap)); + assertEquals("Missing " + childMap.toString(), true, childNode.isPresent()); + + MapNode entityTypeMapNode = (MapNode) childNode.get(); + Optional entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates( + childMap, child, key)); + assertEquals("Missing " + childMap.toString() + " entry for " + key, true, entityTypeEntry.isPresent()); + return entityTypeEntry.get(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index 860d3dc809..c2d997f702 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -16,6 +16,9 @@ import static org.mockito.Mockito.mock; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.actor.Props; +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; import java.util.List; @@ -26,21 +29,24 @@ import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy; +import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; -import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; +import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; @@ -51,7 +57,7 @@ import scala.concurrent.duration.Duration; * * @author Thomas Pantelis */ -public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { +public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest { static String ENTITY_TYPE = "test"; static String ENTITY_TYPE2 = "test2"; static int ID_COUNTER = 1; @@ -64,10 +70,31 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { public void setUp() { DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType). shardInitializationTimeout(10, TimeUnit.SECONDS).build(); - dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), - new MockConfiguration(Collections.>emptyMap()), datastoreContext ); - dataStore.onGlobalContextUpdated(TestModel.createTestContext()); + // FIXME - remove this MockConfiguration and use the production ConfigurationImpl class when the + // DistributedEntityOwnershipService is changed to setup the ShardStrategy for the entity-owners module. + MockConfiguration configuration = new MockConfiguration(Collections.>emptyMap()) { + @Override + public Optional getModuleNameFromNameSpace(String nameSpace) { + return Optional.of("entity-owners"); + } + + @Override + public Map getModuleNameToShardStrategyMap() { + return ImmutableMap.builder().put("entity-owners", new ShardStrategy() { + @Override + public String findShard(YangInstanceIdentifier path) { + return DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME; + } + }).build(); + } + }; + + dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext ); + + dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners()); + + ShardStrategyFactory.setConfiguration(configuration); } @After @@ -102,13 +129,16 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class); - Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME)); + YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME); + Entity entity = new Entity(ENTITY_TYPE, entityId); EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate); verifyEntityOwnershipCandidateRegistration(entity, reg); verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate); + verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE, entityId, + dataStore.getActorContext().getCurrentMemberName()); // Register the same entity - should throw exception @@ -124,13 +154,15 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { // Register a different entity - should succeed - Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME)); + Entity entity2 = new Entity(ENTITY_TYPE2, entityId); shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class); EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate); verifyEntityOwnershipCandidateRegistration(entity2, reg2); verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate); + verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE2, entityId, + dataStore.getActorContext().getCurrentMemberName()); service.close(); } @@ -179,6 +211,22 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { public void testRegisterListener() { } + private NormalizedNode readEntityOwners(ActorRef shard) throws Exception { + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) { + DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction(); + Optional> optional = readTx.read(EntityOwnershipShard.ENTITY_OWNERS_PATH). + checkedGet(5, TimeUnit.SECONDS); + if(optional.isPresent()) { + return optional.get(); + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + return null; + } + private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity, EntityOwnershipCandidate candidate) { RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage(); @@ -192,6 +240,10 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { } static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator { + TestShardPropsCreator() { + super("member-1"); + } + private final AtomicReference messageReceived = new AtomicReference<>(); private final AtomicReference receivedMessage = new AtomicReference<>(); private final AtomicReference> messageClass = new AtomicReference<>(); @@ -200,7 +252,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { public Props newProps(ShardIdentifier shardId, Map peerAddresses, DatastoreContext datastoreContext, SchemaContext schemaContext) { return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext, - schemaContext, messageClass, messageReceived, receivedMessage); + schemaContext, "member-1", messageClass, messageReceived, receivedMessage); } @SuppressWarnings("unchecked") @@ -224,9 +276,10 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { private final AtomicReference> messageClass; protected TestEntityOwnershipShard(ShardIdentifier name, Map peerAddresses, - DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference> messageClass, - AtomicReference messageReceived, AtomicReference receivedMessage) { - super(name, peerAddresses, datastoreContext, schemaContext); + DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName, + AtomicReference> messageClass, AtomicReference messageReceived, + AtomicReference receivedMessage) { + super(name, peerAddresses, datastoreContext, schemaContext, localMemberName); this.messageClass = messageClass; this.messageReceived = messageReceived; this.receivedMessage = receivedMessage; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java new file mode 100644 index 0000000000..7a251b9784 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java @@ -0,0 +1,324 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.dispatch.Dispatchers; +import akka.testkit.TestActorRef; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.After; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractShardTest; +import org.opendaylight.controller.cluster.datastore.DataStoreVersions; +import org.opendaylight.controller.cluster.datastore.DatastoreContext; +import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder; +import org.opendaylight.controller.cluster.datastore.ShardTestKit; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; +import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; +import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; +import org.opendaylight.controller.cluster.datastore.modification.MergeModification; +import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.messages.AppendEntries; +import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; +import org.opendaylight.controller.cluster.raft.messages.RequestVote; +import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply; +import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; + +/** + * Unit tests for EntityOwnershipShard. + * + * @author Thomas Pantelis + */ +public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest { + private static final String ENTITY_TYPE = "test type"; + private static final YangInstanceIdentifier ENTITY_ID1 = + YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1")); + private static final YangInstanceIdentifier ENTITY_ID2 = + YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2")); + private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners(); + private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger(); + private static final String LOCAL_MEMBER_NAME = "member-1"; + + private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME) + .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build(); + + private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder(); + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + + @After + public void tearDown() { + actorFactory.close(); + } + + @Test + public void testOnRegisterCandidateLocal() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + + TestActorRef shard = actorFactory.createTestActor(newShardProps()); + + kit.waitUntilLeader(shard); + + YangInstanceIdentifier entityId = ENTITY_ID1; + Entity entity = new Entity(ENTITY_TYPE, entityId); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + } + + @Test + public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2); + + String peerId = actorFactory.generateActorId("follower"); + TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId). + withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + + TestActorRef shard = actorFactory.createTestActor(newShardProps( + ImmutableMap.builder().put(peerId, peer.path().toString()).build()). + withDispatcher(Dispatchers.DefaultDispatcherId())); + + YangInstanceIdentifier entityId = ENTITY_ID1; + Entity entity = new Entity(ENTITY_TYPE, entityId); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Now grant the vote so the shard becomes the leader. This should retry the commit. + peer.underlyingActor().grantVote = true; + + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + } + + @Test + public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). + shardTransactionCommitTimeoutInSeconds(1); + + String peerId = actorFactory.generateActorId("follower"); + TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId). + withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + + MockFollower follower = peer.underlyingActor(); + follower.grantVote = true; + + // Drop AppendEntries so consensus isn't reached. + follower.dropAppendEntries = true; + + TestActorRef shard = actorFactory.createTestActor(newShardProps( + ImmutableMap.builder().put(peerId, peer.path().toString()).build()). + withDispatcher(Dispatchers.DefaultDispatcherId())); + + kit.waitUntilLeader(shard); + + YangInstanceIdentifier entityId = ENTITY_ID1; + Entity entity = new Entity(ENTITY_TYPE, entityId); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Wait enough time for the commit to timeout. + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + + // Resume AppendEntries - the follower should ack the commit which should then result in the candidate + // write being applied to the state. + follower.dropAppendEntries = false; + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + } + + @Test + public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2). + shardIsolatedLeaderCheckIntervalInMillis(50); + + String peerId = actorFactory.generateActorId("follower"); + TestActorRef peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId). + withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + + MockFollower follower = peer.underlyingActor(); + follower.grantVote = true; + + TestActorRef shard = actorFactory.createTestActor(newShardProps( + ImmutableMap.builder().put(peerId, peer.path().toString()).build()). + withDispatcher(Dispatchers.DefaultDispatcherId())); + + kit.waitUntilLeader(shard); + + // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader. + follower.dropAppendEntries = true; + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + + YangInstanceIdentifier entityId = ENTITY_ID1; + Entity entity = new Entity(ENTITY_TYPE, entityId); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + // Resume AppendEntries - the candidate write should now be committed. + follower.dropAppendEntries = false; + verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME); + } + + @Test + public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception { + ShardTestKit kit = new ShardTestKit(getSystem()); + + dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100); + + String peerId = actorFactory.generateActorId("leader"); + TestActorRef peer = actorFactory.createTestActor(Props.create(MockLeader.class). + withDispatcher(Dispatchers.DefaultDispatcherId()), peerId); + + TestActorRef shard = actorFactory.createTestActor(newShardProps( + ImmutableMap.builder().put(peerId, peer.path().toString()).build()). + withDispatcher(Dispatchers.DefaultDispatcherId())); + + shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.emptyList(), -1L, -1L, + DataStoreVersions.CURRENT_VERSION), peer); + + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + MockLeader leader = peer.underlyingActor(); + assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( + leader.modificationsReceived, 5, TimeUnit.SECONDS)); + verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME); + + leader.modificationsReceived = new CountDownLatch(2); + leader.sendReply = false; + + shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender()); + + shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef()); + kit.expectMsgClass(SuccessReply.class); + + assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly( + leader.modificationsReceived, 5, TimeUnit.SECONDS)); + verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME); + } + + private void verifyCommittedEntityCandidate(TestActorRef shard, String entityType, + YangInstanceIdentifier entityId, String candidateName) throws Exception { + verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName); + } + + private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType, + YangInstanceIdentifier entityId, String candidateName) throws Exception { + assertEquals("BatchedModifications size", 1, mods.getModifications().size()); + assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass()); + verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType, + entityId, candidateName); + } + + private NormalizedNode readEntityOwners(TestActorRef shard) throws Exception { + Stopwatch sw = Stopwatch.createStarted(); + while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) { + NormalizedNode node = AbstractShardTest.readStore(shard, EntityOwnershipShard.ENTITY_OWNERS_PATH); + if(node != null) { + return node; + } + + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } + + return null; + } + + private Props newShardProps() { + return newShardProps(Collections.emptyMap()); + } + + private Props newShardProps(Map peers) { + return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT, + LOCAL_MEMBER_NAME); + } + + public static class MockFollower extends UntypedActor { + volatile boolean grantVote; + volatile boolean dropAppendEntries; + private final String myId; + + public MockFollower(String myId) { + this.myId = myId; + } + + @Override + public void onReceive(Object message) { + if(message instanceof RequestVote) { + if(grantVote) { + getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf()); + } + } else if(message instanceof AppendEntries) { + if(!dropAppendEntries) { + AppendEntries req = (AppendEntries) message; + long lastIndex = req.getLeaderCommit(); + if (req.getEntries().size() > 0) { + for(ReplicatedLogEntry entry : req.getEntries()) { + lastIndex = entry.getIndex(); + } + } + + getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(), + DataStoreVersions.CURRENT_VERSION), getSelf()); + } + } + } + } + + public static class MockLeader extends UntypedActor { + volatile CountDownLatch modificationsReceived = new CountDownLatch(1); + volatile BatchedModifications receivedModifications; + volatile boolean sendReply = true; + + @Override + public void onReceive(Object message) { + if(message instanceof BatchedModifications) { + receivedModifications = (BatchedModifications) message; + modificationsReceived.countDown(); + if(sendReply) { + getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); + } else { + sendReply = true; + } + } + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java index d09e4b9690..85e9893815 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java @@ -8,12 +8,17 @@ package org.opendaylight.controller.md.cluster.datastore.model; +import com.google.common.io.Resources; +import java.io.File; +import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException; import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; public class SchemaContextHelper { @@ -42,4 +47,13 @@ public class SchemaContextHelper { return parser.resolveSchemaContext(modules); } + public static SchemaContext entityOwners() { + YangParserImpl parser = new YangParserImpl(); + try { + File file = new File("src/main/yang/entity-owners.yang"); + return parser.parseSources(Arrays.asList(Resources.asByteSource(file.toURI().toURL()))); + } catch (IOException | YangSyntaxErrorException e) { + throw new ExceptionInInitializerError(e); + } + } }