Bug 4105: Implement RegisterCandidate in EntityOwnershipShard 93/26793/1
authorTom Pantelis <tpanteli@brocade.com>
Sun, 9 Aug 2015 04:22:02 +0000 (00:22 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 18:50:59 +0000 (14:50 -0400)
Change-Id: Idab615399d81a8451e22bfabd30aed9a98e4b037
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
14 files changed:
opendaylight/md-sal/sal-common-api/pom.xml
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/Entity.java
opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/entity-owners.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/SchemaContextHelper.java

index 9b30c72..438067b 100644 (file)
       <groupId>org.opendaylight.yangtools</groupId>
       <artifactId>yang-data-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
index 3b9f8a4..93d0012 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.md.sal.common.api.clustering;
 
 import com.google.common.base.Preconditions;
+import java.io.Serializable;
 import javax.annotation.Nonnull;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 
@@ -31,7 +32,8 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
  * referenced by the YangInstanceIdentifier if the inventory node stored in the data store.
  * </p>
  */
-public final class Entity {
+public final class Entity implements Serializable {
+    private static final long serialVersionUID = 1L;
 
     private final String type;
     private final YangInstanceIdentifier id;
@@ -90,9 +92,7 @@ public final class Entity {
 
     @Override
     public int hashCode() {
-        int result = type != null ? type.hashCode() : 0;
-        result = 31 * result + (id != null ? id.hashCode() : 0);
-        return result;
+        return 31 * type.hashCode() + id.hashCode();
     }
 
     @Override
diff --git a/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java b/opendaylight/md-sal/sal-common-api/src/test/java/org/opendaylight/controller/md/sal/common/api/clustering/EntityTest.java
new file mode 100644 (file)
index 0000000..8636de2
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.api.clustering;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for Entity.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityTest {
+    static String ENTITY_TYPE1 = "type1";
+    static String ENTITY_TYPE2 = "type2";
+    static final QName QNAME1 = QName.create("test", "2015-08-14", "1");
+    static final QName QNAME2 = QName.create("test", "2015-08-14", "2");
+    static final YangInstanceIdentifier YANGID1 = YangInstanceIdentifier.of(QNAME1);
+    static final YangInstanceIdentifier YANGID2 = YangInstanceIdentifier.of(QNAME2);
+
+    @Test
+    public void testHashCode() {
+        Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1);
+
+        assertEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE1, YANGID1).hashCode());
+        assertNotEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE2, YANGID2).hashCode());
+    }
+
+    @Test
+    public void testEquals() {
+        Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1);
+
+        assertEquals("Same", true, entity1.equals(entity1));
+        assertEquals("Same", true, entity1.equals(new Entity(ENTITY_TYPE1, YANGID1)));
+        assertEquals("Different entity type", false, entity1.equals(new Entity(ENTITY_TYPE2, YANGID1)));
+        assertEquals("Different yang ID", false, entity1.equals(new Entity(ENTITY_TYPE1, YANGID2)));
+        assertEquals("Different Object", false, entity1.equals(new Object()));
+        assertEquals("Equals null", false, entity1.equals(null));
+    }
+
+    @Test
+    public void testSerialization() {
+        Entity entity = new Entity(ENTITY_TYPE1, YANGID1);
+
+        Entity clone = SerializationUtils.clone(entity);
+
+        assertEquals("getType", entity.getType(), clone.getType());
+        assertEquals("getId", entity.getId(), clone.getId());
+    }
+}
index 7b34f5d..6aee29d 100644 (file)
@@ -408,6 +408,16 @@ public class Shard extends RaftActor {
         getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
     }
 
+    protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+        try {
+            commitCoordinator.handleBatchedModifications(batched, sender, this);
+        } catch (Exception e) {
+            LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+                    batched.getTransactionID(), e);
+            sender.tell(new akka.actor.Status.Failure(e), getSelf());
+        }
+    }
+
     private void handleBatchedModifications(BatchedModifications batched) {
         // This message is sent to prepare the modifications transaction directly on the Shard as an
         // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
@@ -424,13 +434,7 @@ public class Shard extends RaftActor {
         if(isLeader()) {
             failIfIsolatedLeader(getSender());
 
-            try {
-                commitCoordinator.handleBatchedModifications(batched, getSender(), this);
-            } catch (Exception e) {
-                LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
-                        batched.getTransactionID(), e);
-                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
-            }
+            handleBatchedModificationsLocal(batched, getSender());
         } else {
             ActorSelection leader = getLeader();
             if(leader != null) {
@@ -446,7 +450,7 @@ public class Shard extends RaftActor {
     }
 
     private boolean failIfIsolatedLeader(ActorRef sender) {
-        if(getRaftState() == RaftState.IsolatedLeader) {
+        if(isIsolatedLeader()) {
             sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
                     "Shard %s was the leader but has lost contact with all of its followers. Either all" +
                     " other follower nodes are down or this node is isolated by a network partition.",
@@ -457,6 +461,10 @@ public class Shard extends RaftActor {
         return false;
     }
 
+    protected boolean isIsolatedLeader() {
+        return getRaftState() == RaftState.IsolatedLeader;
+    }
+
     private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
         if (isLeader()) {
             failIfIsolatedLeader(getSender());
@@ -684,6 +692,10 @@ public class Shard extends RaftActor {
         return commitCoordinator;
     }
 
+    protected DatastoreContext getDatastoreContext() {
+        return datastoreContext;
+    }
+
     protected abstract static class AbstractShardCreator implements Creator<Shard> {
         private static final long serialVersionUID = 1L;
 
index 13ecfff..f51f579 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
@@ -138,6 +139,11 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     }
 
     protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
-        return new EntityOwnershipShardPropsCreator();
+        return new EntityOwnershipShardPropsCreator(datastore.getActorContext().getCurrentMemberName());
+    }
+
+    @VisibleForTesting
+    ActorRef getLocalEntityOwnershipShard() {
+        return localEntityOwnershipShard;
     }
 }
index 52f85f4..20a6902 100644 (file)
@@ -7,30 +7,65 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import akka.actor.ActorSelection;
 import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.Shard;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
 
 /**
  * Special Shard for EntityOwnership.
  *
  * @author Thomas Pantelis
  */
-public class EntityOwnershipShard extends Shard {
+class EntityOwnershipShard extends Shard {
+    static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
+    static final  QName ENTITY_QNAME = org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.
+            md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity.QNAME;
+    static final QName CANDIDATE_NAME = QName.create(Candidate.QNAME, "name");
+    static final QName ENTITY_ID = QName.create(ENTITY_QNAME, "id");
+    static final QName ENTITY_TYPE = QName.create(EntityType.QNAME, "type");
+
+    private int transactionIDCounter = 0;
+    private final String localMemberName;
+    private final List<BatchedModifications> retryModifications = new ArrayList<>();
 
     private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
         return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
     }
 
     protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
-            DatastoreContext datastoreContext, SchemaContext schemaContext) {
+            DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
         super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
+        this.localMemberName = localMemberName;
     }
 
     @Override
@@ -50,31 +85,127 @@ public class EntityOwnershipShard extends Shard {
     }
 
     private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
-        // TODO - implement
+        LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
+
+        // TODO - add the listener locally.
+
+        BatchedModifications modifications = new BatchedModifications(
+                TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
+                DataStoreVersions.CURRENT_VERSION, "");
+        modifications.setDoCommitOnReady(true);
+        modifications.setReady(true);
+        modifications.setTotalMessagesSent(1);
+
+        NormalizedNode<?, ?> entityOwners = createEntityOwnersWithCandidate(registerCandidate.getEntity(), localMemberName);
+        modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
+
+        tryCommitModifications(modifications);
+
         getSender().tell(SuccessReply.INSTANCE, getSelf());
     }
 
+    private NormalizedNode<?, ?> createEntityOwnersWithCandidate(Entity entity, String memberName) {
+        MapNode candidateNode = ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild(
+                        ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME, memberName)).build();
+
+        MapEntryNode entityNode = ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID, entity.getId()).
+                addChild(candidateNode).build();
+
+        MapEntryNode entityTypeNode = ImmutableNodes.mapEntryBuilder(EntityType.QNAME, ENTITY_TYPE, entity.getType()).
+                addChild(ImmutableNodes.mapNodeBuilder(ENTITY_QNAME).addChild(entityNode).build()).build();
+
+        return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(EntityOwners.QNAME)).
+                addChild(ImmutableNodes.mapNodeBuilder(EntityType.QNAME).addChild(entityTypeNode).build()).build();
+    }
+
+    private void tryCommitModifications(final BatchedModifications modifications) {
+        if(isLeader()) {
+            if(isIsolatedLeader()) {
+                LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+                retryModifications.add(modifications);
+            } else {
+                LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+                // Note that it's possible the commit won't get consensus and will timeout and not be applied
+                // to the state. However we don't need to retry it in that case b/c it will be committed to
+                // the journal first and, once a majority of followers come back on line and it is replicated,
+                // it will be applied at that point.
+                handleBatchedModificationsLocal(modifications, self());
+            }
+        } else {
+            final ActorSelection leader = getLeader();
+            if (leader != null) {
+                LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
+
+                Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
+                        getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
+                future.onComplete(new OnComplete<Object>() {
+                    @Override
+                    public void onComplete(Throwable failure, Object response) {
+                        if(failure != null) {
+                            if(failure instanceof AskTimeoutException) {
+                                LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
+                                        modifications.getTransactionID(), leader);
+                                tryCommitModifications(modifications);
+                            } else {
+                                LOG.error("BatchedModifications {} to leader {} failed",
+                                        modifications.getTransactionID(), leader, failure);
+                            }
+                        } else {
+                            LOG.debug("BatchedModifications {} to leader {} succeeded",
+                                    modifications.getTransactionID(), leader);
+                        }
+                    }
+                }, getContext().dispatcher());
+            } else {
+                LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+                retryModifications.add(modifications);
+            }
+        }
+    }
+
+    @Override
+    protected void onStateChanged() {
+        super.onStateChanged();
+
+        if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
+            LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
+
+            List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
+            retryModifications.clear();
+            for(BatchedModifications mods: retryModificationsCopy) {
+                tryCommitModifications(mods);
+            }
+        }
+    }
+
     private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
         // TODO - implement
         getSender().tell(SuccessReply.INSTANCE, getSelf());
     }
 
     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
-            final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
-        return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext));
+            final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
+        return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
     }
 
     private static class Creator extends AbstractShardCreator {
         private static final long serialVersionUID = 1L;
 
+        private final String localMemberName;
+
         Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
-                final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
+                final DatastoreContext datastoreContext, final SchemaContext schemaContext,
+                final String localMemberName) {
             super(name, peerAddresses, datastoreContext, schemaContext);
+            this.localMemberName = localMemberName;
         }
 
         @Override
         public Shard create() throws Exception {
-            return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext);
+            return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
         }
     }
 }
index e28db39..a00f514 100644 (file)
@@ -20,10 +20,15 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * @author Thomas Pantelis
  */
 class EntityOwnershipShardPropsCreator implements ShardPropsCreator {
+    private final String localMemberName;
+
+    EntityOwnershipShardPropsCreator(String localMemberName) {
+        this.localMemberName = localMemberName;
+    }
 
     @Override
     public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
             DatastoreContext datastoreContext, SchemaContext schemaContext) {
-        return EntityOwnershipShard.props(shardId, peerAddresses, datastoreContext, schemaContext);
+        return EntityOwnershipShard.props(shardId, peerAddresses, datastoreContext, schemaContext, localMemberName);
     }
 }
index 4006719..24fe7a3 100644 (file)
@@ -49,13 +49,17 @@ module entity-owners {
             list entity {
                 key id;
                 leaf id {
-                    type string;
+                    type instance-identifier;
                 }
 
+                leaf owner {
+                    type string;
+                }
+                
                 // A list of all the candidates that would like to own the entity
                 list candidate {
-                    key id;
-                    leaf id {
+                    key name;
+                    leaf name {
                         type string;
                     }
                 }
index 3453aac..97c9e92 100644 (file)
@@ -13,7 +13,6 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -242,7 +241,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         return cohort;
     }
 
-    public static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
+    public static NormalizedNode<?,?> readStore(final TestActorRef<? extends Shard> shard, final YangInstanceIdentifier id)
             throws ExecutionException, InterruptedException {
         return readStore(shard.underlyingActor().getDataStore().getDataTree(), id);
     }
index 7887290..281a190 100644 (file)
@@ -25,11 +25,11 @@ import scala.concurrent.duration.FiniteDuration;
 
 public class ShardTestKit extends JavaTestKit {
 
-    protected ShardTestKit(ActorSystem actorSystem) {
+    public ShardTestKit(ActorSystem actorSystem) {
         super(actorSystem);
     }
 
-    protected void waitForLogMessage(final Class<?> logLevel, ActorRef subject, String logMessage){
+    public void waitForLogMessage(final Class<?> logLevel, ActorRef subject, String logMessage){
         // Wait for a specific log message to show up
         final boolean result =
             new JavaTestKit.EventFilter<Boolean>(logLevel
@@ -46,7 +46,7 @@ public class ShardTestKit extends JavaTestKit {
 
     }
 
-    protected void waitUntilLeader(ActorRef shard) {
+    public void waitUntilLeader(ActorRef shard) {
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
@@ -68,7 +68,7 @@ public class ShardTestKit extends JavaTestKit {
         Assert.fail("Leader not found for shard " + shard.path());
     }
 
-    protected void waitUntilNoLeader(ActorRef shard) {
+    public void waitUntilNoLeader(ActorRef shard) {
         FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
         for(int i = 0; i < 20 * 5; i++) {
             Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/AbstractEntityOwnershipTest.java
new file mode 100644 (file)
index 0000000..1f074ec
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Abstract base class providing utility methods.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbstractEntityOwnershipTest extends AbstractActorTest {
+    protected void verifyEntityCandidate(NormalizedNode<?, ?> node, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) {
+        try {
+            assertNotNull("Missing " + EntityOwners.QNAME.toString(), node);
+            assertTrue(node instanceof ContainerNode);
+
+            ContainerNode entityOwnersNode = (ContainerNode) node;
+
+            MapEntryNode entityTypeEntry = getMapEntryNodeChild(entityOwnersNode, EntityType.QNAME,
+                    EntityOwnershipShard.ENTITY_TYPE, entityType);
+
+            MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, EntityOwnershipShard.ENTITY_QNAME,
+                    EntityOwnershipShard.ENTITY_ID, entityId);
+
+            getMapEntryNodeChild(entityEntry, Candidate.QNAME, EntityOwnershipShard.CANDIDATE_NAME, candidateName);
+        } catch(AssertionError e) {
+            throw new AssertionError("Verification of enitity candidate failed - returned data was: " + node, e);
+        }
+    }
+
+    protected MapEntryNode getMapEntryNodeChild(DataContainerNode<? extends PathArgument> parent, QName childMap,
+            QName child, Object key) {
+        Optional<DataContainerChild<? extends PathArgument, ?>> childNode =
+                parent.getChild(new NodeIdentifier(childMap));
+        assertEquals("Missing " + childMap.toString(), true, childNode.isPresent());
+
+        MapNode entityTypeMapNode = (MapNode) childNode.get();
+        Optional<MapEntryNode> entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates(
+                childMap, child, key));
+        assertEquals("Missing " + childMap.toString() + " entry for " + key, true, entityTypeEntry.isPresent());
+        return entityTypeEntry.get();
+    }
+}
index 860d3dc..c2d997f 100644 (file)
@@ -16,6 +16,9 @@ import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
 import java.util.List;
@@ -26,21 +29,24 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.yangtools.yang.common.QName;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
@@ -51,7 +57,7 @@ import scala.concurrent.duration.Duration;
  *
  * @author Thomas Pantelis
  */
-public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
+public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
     static String ENTITY_TYPE = "test";
     static String ENTITY_TYPE2 = "test2";
     static int ID_COUNTER = 1;
@@ -64,10 +70,31 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
     public void setUp() {
         DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
                 shardInitializationTimeout(10, TimeUnit.SECONDS).build();
-        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(),
-                new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
 
-        dataStore.onGlobalContextUpdated(TestModel.createTestContext());
+        // FIXME - remove this MockConfiguration and use the production ConfigurationImpl class when the
+        // DistributedEntityOwnershipService is changed to setup the ShardStrategy for the entity-owners module.
+        MockConfiguration configuration = new MockConfiguration(Collections.<String, List<String>>emptyMap()) {
+            @Override
+            public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+                return Optional.of("entity-owners");
+            }
+
+            @Override
+            public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+                return ImmutableMap.<String, ShardStrategy>builder().put("entity-owners", new ShardStrategy() {
+                    @Override
+                    public String findShard(YangInstanceIdentifier path) {
+                        return DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
+                    }
+                }).build();
+            }
+        };
+
+        dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
+
+        dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
+
+        ShardStrategyFactory.setConfiguration(configuration);
     }
 
     @After
@@ -102,13 +129,16 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
 
         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
 
-        Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
+        YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+        Entity entity = new Entity(ENTITY_TYPE, entityId);
         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
 
         EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
 
         verifyEntityOwnershipCandidateRegistration(entity, reg);
         verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+        verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE, entityId,
+                dataStore.getActorContext().getCurrentMemberName());
 
         // Register the same entity - should throw exception
 
@@ -124,13 +154,15 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
 
         // Register a different entity - should succeed
 
-        Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
+        Entity entity2 = new Entity(ENTITY_TYPE2, entityId);
         shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
 
         EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
 
         verifyEntityOwnershipCandidateRegistration(entity2, reg2);
         verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
+        verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE2, entityId,
+                dataStore.getActorContext().getCurrentMemberName());
 
         service.close();
     }
@@ -179,6 +211,22 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
     public void testRegisterListener() {
     }
 
+    private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+            DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+            Optional<NormalizedNode<?, ?>> optional = readTx.read(EntityOwnershipShard.ENTITY_OWNERS_PATH).
+                    checkedGet(5, TimeUnit.SECONDS);
+            if(optional.isPresent()) {
+                return optional.get();
+            }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+
+        return null;
+    }
+
     private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
             EntityOwnershipCandidate candidate) {
         RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
@@ -192,6 +240,10 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
     }
 
     static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
+        TestShardPropsCreator() {
+            super("member-1");
+        }
+
         private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
         private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
         private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
@@ -200,7 +252,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
         public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
                 DatastoreContext datastoreContext, SchemaContext schemaContext) {
             return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
-                    schemaContext, messageClass, messageReceived, receivedMessage);
+                    schemaContext, "member-1", messageClass, messageReceived, receivedMessage);
         }
 
         @SuppressWarnings("unchecked")
@@ -224,9 +276,10 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
         private final AtomicReference<Class<?>> messageClass;
 
         protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
-                DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference<Class<?>> messageClass,
-                AtomicReference<CountDownLatch> messageReceived, AtomicReference<Object> receivedMessage) {
-            super(name, peerAddresses, datastoreContext, schemaContext);
+                DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName,
+                AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
+                AtomicReference<Object> receivedMessage) {
+            super(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
             this.messageClass = messageClass;
             this.messageReceived = messageReceived;
             this.receivedMessage = receivedMessage;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardTest.java
new file mode 100644 (file)
index 0000000..7a251b9
--- /dev/null
@@ -0,0 +1,324 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Unit tests for EntityOwnershipShard.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
+    private static final String ENTITY_TYPE = "test type";
+    private static final YangInstanceIdentifier ENTITY_ID1 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
+    private static final YangInstanceIdentifier ENTITY_ID2 =
+            YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
+    private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
+    private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
+    private static final String LOCAL_MEMBER_NAME = "member-1";
+
+    private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
+            .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
+
+    private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+    @After
+    public void tearDown() {
+        actorFactory.close();
+    }
+
+    @Test
+    public void testOnRegisterCandidateLocal() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+
+        kit.waitUntilLeader(shard);
+
+        YangInstanceIdentifier entityId = ENTITY_ID1;
+        Entity entity = new Entity(ENTITY_TYPE, entityId);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+    }
+
+    @Test
+    public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+        String peerId = actorFactory.generateActorId("follower");
+        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
+                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+                withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+        YangInstanceIdentifier entityId = ENTITY_ID1;
+        Entity entity = new Entity(ENTITY_TYPE, entityId);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        // Now grant the vote so the shard becomes the leader. This should retry the commit.
+        peer.underlyingActor().grantVote = true;
+
+        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+    }
+
+    @Test
+    public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
+                shardTransactionCommitTimeoutInSeconds(1);
+
+        String peerId = actorFactory.generateActorId("follower");
+        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
+                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+        MockFollower follower = peer.underlyingActor();
+        follower.grantVote = true;
+
+        // Drop AppendEntries so consensus isn't reached.
+        follower.dropAppendEntries = true;
+
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+                withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+        kit.waitUntilLeader(shard);
+
+        YangInstanceIdentifier entityId = ENTITY_ID1;
+        Entity entity = new Entity(ENTITY_TYPE, entityId);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        // Wait enough time for the commit to timeout.
+        Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+        // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
+        // write being applied to the state.
+        follower.dropAppendEntries = false;
+        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+    }
+
+    @Test
+    public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
+                shardIsolatedLeaderCheckIntervalInMillis(50);
+
+        String peerId = actorFactory.generateActorId("follower");
+        TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
+                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+        MockFollower follower = peer.underlyingActor();
+        follower.grantVote = true;
+
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+                withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+        kit.waitUntilLeader(shard);
+
+        // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
+        follower.dropAppendEntries = true;
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+        YangInstanceIdentifier entityId = ENTITY_ID1;
+        Entity entity = new Entity(ENTITY_TYPE, entityId);
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        // Resume AppendEntries - the candidate write should now be committed.
+        follower.dropAppendEntries = false;
+        verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+    }
+
+    @Test
+    public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
+        ShardTestKit kit = new ShardTestKit(getSystem());
+
+        dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
+
+        String peerId = actorFactory.generateActorId("leader");
+        TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
+                withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+        TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+                ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+                withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+        shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
+                DataStoreVersions.CURRENT_VERSION), peer);
+
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        MockLeader leader = peer.underlyingActor();
+        assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+                leader.modificationsReceived, 5, TimeUnit.SECONDS));
+        verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+        leader.modificationsReceived = new CountDownLatch(2);
+        leader.sendReply = false;
+
+        shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
+
+        shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+        kit.expectMsgClass(SuccessReply.class);
+
+        assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+                leader.modificationsReceived, 5, TimeUnit.SECONDS));
+        verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
+    }
+
+    private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) throws Exception {
+        verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
+    }
+
+    private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
+            YangInstanceIdentifier entityId, String candidateName) throws Exception {
+        assertEquals("BatchedModifications size", 1, mods.getModifications().size());
+        assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
+        verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
+                entityId, candidateName);
+    }
+
+    private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
+        Stopwatch sw = Stopwatch.createStarted();
+        while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+            NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, EntityOwnershipShard.ENTITY_OWNERS_PATH);
+            if(node != null) {
+                return node;
+            }
+
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        }
+
+        return null;
+    }
+
+    private Props newShardProps() {
+        return newShardProps(Collections.<String,String>emptyMap());
+    }
+
+    private Props newShardProps(Map<String,String> peers) {
+        return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
+                LOCAL_MEMBER_NAME);
+    }
+
+    public static class MockFollower extends UntypedActor {
+        volatile boolean grantVote;
+        volatile boolean dropAppendEntries;
+        private final String myId;
+
+        public MockFollower(String myId) {
+            this.myId = myId;
+        }
+
+        @Override
+        public void onReceive(Object message) {
+            if(message instanceof RequestVote) {
+                if(grantVote) {
+                    getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
+                }
+            } else if(message instanceof AppendEntries) {
+                if(!dropAppendEntries) {
+                    AppendEntries req = (AppendEntries) message;
+                    long lastIndex = req.getLeaderCommit();
+                    if (req.getEntries().size() > 0) {
+                        for(ReplicatedLogEntry entry : req.getEntries()) {
+                            lastIndex = entry.getIndex();
+                        }
+                    }
+
+                    getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
+                            DataStoreVersions.CURRENT_VERSION), getSelf());
+                }
+            }
+        }
+    }
+
+    public static class MockLeader extends UntypedActor {
+        volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
+        volatile BatchedModifications receivedModifications;
+        volatile boolean sendReply = true;
+
+        @Override
+        public void onReceive(Object message) {
+            if(message instanceof BatchedModifications) {
+                receivedModifications = (BatchedModifications) message;
+                modificationsReceived.countDown();
+                if(sendReply) {
+                    getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+                } else {
+                    sendReply = true;
+                }
+            }
+        }
+    }
+}
index d09e4b9..85e9893 100644 (file)
@@ -8,12 +8,17 @@
 
 package org.opendaylight.controller.md.cluster.datastore.model;
 
+import com.google.common.io.Resources;
+import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.opendaylight.yangtools.yang.model.api.Module;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException;
 import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
 
 public class SchemaContextHelper {
@@ -42,4 +47,13 @@ public class SchemaContextHelper {
         return parser.resolveSchemaContext(modules);
     }
 
+    public static SchemaContext entityOwners() {
+        YangParserImpl parser = new YangParserImpl();
+        try {
+            File file = new File("src/main/yang/entity-owners.yang");
+            return parser.parseSources(Arrays.asList(Resources.asByteSource(file.toURI().toURL())));
+        } catch (IOException | YangSyntaxErrorException e) {
+            throw new ExceptionInInitializerError(e);
+        }
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.