Bug 4105: Implement DistributedEntityOwnershipService#registerCandidate 91/26791/1
authorTom Pantelis <tpanteli@brocade.com>
Sat, 8 Aug 2015 08:20:11 +0000 (04:20 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 18:50:05 +0000 (14:50 -0400)
Added a RegisterCandidateLocal message and implemented registerCandidate
to send the message to the local EntityOwnershipShard.

Change-Id: If941401d00912ce34f74e54188af0430a5ec6fcc
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/CandidateAlreadyRegisteredException.java
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SuccessReply.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java

index f509b4002b95e092dfb4808d5921f7d6539dafe8..7e35bc9439a134780c9b8e8a617c07eb0aa5166a 100644 (file)
@@ -16,25 +16,18 @@ import javax.annotation.Nonnull;
  * duplicate registration or two different components within the same process trying to register a Candidate.
  */
 public class CandidateAlreadyRegisteredException extends Exception {
+    private static final long serialVersionUID = 1L;
+
     private final Entity entity;
     private final EntityOwnershipCandidate registeredCandidate;
 
     public CandidateAlreadyRegisteredException(@Nonnull Entity entity,
-                                               @Nonnull EntityOwnershipCandidate registeredCandidate,
-                                               String message) {
-        super(message);
-        this.entity = Preconditions.checkNotNull(entity, "entity should not be null");
-        this.registeredCandidate = Preconditions.checkNotNull(registeredCandidate,
-                "registeredCandidate should not be null");
-    }
-
-    public CandidateAlreadyRegisteredException(@Nonnull Entity entity,
-                                               @Nonnull EntityOwnershipCandidate registeredCandidate,
-                                               String message, Throwable throwable) {
-        super(message, throwable);
-        this.entity = Preconditions.checkNotNull(entity, "entity should not be null");
-        this.registeredCandidate = Preconditions.checkNotNull(registeredCandidate,
-                "registeredCandidate should not be null");
+                                               @Nonnull EntityOwnershipCandidate registeredCandidate) {
+        super(String.format("Candidate %s has already been registered for %s",
+                Preconditions.checkNotNull(registeredCandidate, "registeredCandidate should not be null"),
+                Preconditions.checkNotNull(entity, "entity should not be null")));
+        this.entity = entity;
+        this.registeredCandidate = registeredCandidate;
     }
 
     /**
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java
new file mode 100644 (file)
index 0000000..e118458
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.clustering;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+
+/**
+ * Abstract base class for an EntityOwnershipCandidateRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractEntityOwnershipCandidateRegistration
+        extends AbstractEntityOwnershipListenerRegistration<EntityOwnershipCandidate>
+        implements EntityOwnershipCandidateRegistration {
+
+    protected AbstractEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) {
+        super(candidate, entity);
+    }
+}
diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java
new file mode 100644 (file)
index 0000000..881d662
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.clustering;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+
+/**
+ * Abstract base class for an EntityOwnershipListenerRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractEntityOwnershipListenerRegistration<T extends EntityOwnershipListener>
+        implements EntityOwnershipListenerRegistration {
+    private final T listener;
+    private final Entity entity;
+
+    protected AbstractEntityOwnershipListenerRegistration(T listener, Entity entity) {
+        this.listener = listener;
+        this.entity = entity;
+    }
+
+    @Override
+    public T getInstance() {
+        return listener;
+    }
+
+    @Override
+    public Entity getEntity() {
+        return entity;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java
new file mode 100644 (file)
index 0000000..1089ec2
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityOwnershipCandidateRegistration;
+
+/**
+ * Implementation of EntityOwnershipCandidateRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+class DistributedEntityOwnershipCandidateRegistration extends AbstractEntityOwnershipCandidateRegistration {
+
+    DistributedEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) {
+        super(candidate, entity);
+    }
+
+    @Override
+    public void close() {
+        // TODO - need to send unregister message.
+    }
+}
index ca315435cf9035abb3d68e956cc81323d3eda8bf..90720eead6f180b4037356e2723a4f58910723a5 100644 (file)
@@ -10,8 +10,11 @@ package org.opendaylight.controller.cluster.datastore.entityownership;
 import akka.actor.ActorRef;
 import akka.dispatch.OnComplete;
 import akka.util.Timeout;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
 import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
@@ -35,6 +38,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
 
     private final DistributedDataStore datastore;
+    private final ConcurrentMap<Entity, EntityOwnershipCandidate> registeredEntities = new ConcurrentHashMap<>();
+    private volatile ActorRef localEntityOwnershipShard;
 
     public DistributedEntityOwnershipService(DistributedDataStore datastore) {
         this.datastore = datastore;
@@ -45,7 +50,7 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
 
         CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
                 datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(),
-                new EntityOwnershipShardPropsCreator(), null);
+                newShardPropsCreator(), null);
 
         Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
                 createShard, MESSAGE_TIMEOUT);
@@ -62,11 +67,56 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
         }, datastore.getActorContext().getClientDispatcher());
     }
 
+    private void executeEntityOwnershipShardOperation(final ActorRef shardActor, final Object message) {
+        Future<Object> future = datastore.getActorContext().executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT);
+        future.onComplete(new OnComplete<Object>() {
+            @Override
+            public void onComplete(Throwable failure, Object response) {
+                if(failure != null) {
+                    LOG.debug("Error sending message {} to {}", message, shardActor, failure);
+                    // TODO - queue for retry
+                } else {
+                    LOG.debug("{} message to {} succeeded", message, shardActor, failure);
+                }
+            }
+        }, datastore.getActorContext().getClientDispatcher());
+    }
+
+    private void executeLocalEntityOwnershipShardOperation(final Object message) {
+        if(localEntityOwnershipShard == null) {
+            Future<ActorRef> future = datastore.getActorContext().findLocalShardAsync(ENTITY_OWNERSHIP_SHARD_NAME);
+            future.onComplete(new OnComplete<ActorRef>() {
+                @Override
+                public void onComplete(Throwable failure, ActorRef shardActor) {
+                    if(failure != null) {
+                        LOG.error("Failed to find local {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
+                    } else {
+                        localEntityOwnershipShard = shardActor;
+                        executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+                    }
+                }
+            }, datastore.getActorContext().getClientDispatcher());
+
+        } else {
+            executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+        }
+    }
+
     @Override
     public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
             throws CandidateAlreadyRegisteredException {
-        // TODO Auto-generated method stub
-        return null;
+
+        EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity,candidate);
+        if(currentCandidate != null) {
+            throw new CandidateAlreadyRegisteredException(entity, currentCandidate);
+        }
+
+        RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity);
+
+        LOG.debug("Registering candidate with message: " + registerCandidate);
+
+        executeLocalEntityOwnershipShardOperation(registerCandidate);
+        return new DistributedEntityOwnershipCandidateRegistration(candidate, entity);
     }
 
     @Override
@@ -78,4 +128,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService
     @Override
     public void close() {
     }
+
+    protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+        return new EntityOwnershipShardPropsCreator();
+    }
 }
index 2fabf0a878ecf38783eea74eb39332d8ad27e712..230b597f2d42ffde56f5f0aa8b91c9da62470dbe 100644 (file)
@@ -11,7 +11,9 @@ import akka.actor.Props;
 import java.util.Map;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -35,6 +37,19 @@ public class EntityOwnershipShard extends Shard {
         super.onDatastoreContext(noPersistenceDatastoreContext(context));
     }
 
+    @Override
+    public void onReceiveCommand(final Object message) throws Exception {
+        if(message instanceof RegisterCandidateLocal) {
+            onRegisterCandidateLocal((RegisterCandidateLocal)message);
+        } else {
+            super.onReceiveCommand(message);
+        }
+    }
+
+    private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
+        getSender().tell(SuccessReply.INSTANCE, getSelf());
+    }
+
     public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
             final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
         return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext));
index 2335d2948b2e07ba53c2740266d1a8c16d8275e6..e28db39456451aa0b4a8ee563c3929fb0dcf48e2 100644 (file)
@@ -19,7 +19,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  *
  * @author Thomas Pantelis
  */
-public class EntityOwnershipShardPropsCreator implements ShardPropsCreator {
+class EntityOwnershipShardPropsCreator implements ShardPropsCreator {
 
     @Override
     public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java
new file mode 100644 (file)
index 0000000..6ba09a0
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+
+/**
+ * Message sent to the local EntityOwnershipShard to register a candidate.
+ *
+ * @author Thomas Pantelis
+ */
+public class RegisterCandidateLocal {
+    private final EntityOwnershipCandidate candidate;
+    private final Entity entity;
+
+    public RegisterCandidateLocal(EntityOwnershipCandidate candidate, Entity entity) {
+        this.candidate = candidate;
+        this.entity = entity;
+    }
+
+    public EntityOwnershipCandidate getCandidate() {
+        return candidate;
+    }
+
+    public Entity getEntity() {
+        return entity;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder builder = new StringBuilder();
+        builder.append("RegisterCandidateLocal [candidate=").append(candidate.getClass()).append(", entity=")
+                .append(entity).append("]");
+        return builder.toString();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SuccessReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SuccessReply.java
new file mode 100644 (file)
index 0000000..f8ac916
--- /dev/null
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Serializable;
+
+/**
+ * A reply message indicating success.
+ *
+ * @author Thomas Pantelis
+ */
+public class SuccessReply implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public static SuccessReply INSTANCE = new SuccessReply();
+}
index b7f75c98f3b49276be2ed4dc96bb768792627df3..ee5b1c5a2d3d1f5a896e05e56c20b18ad2a010ba 100644 (file)
@@ -7,19 +7,37 @@
  */
 package org.opendaylight.controller.cluster.datastore.entityownership;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
 import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -30,10 +48,12 @@ import scala.concurrent.duration.Duration;
  * @author Thomas Pantelis
  */
 public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
-    private static int ID_COUNTER = 1;
+    static String ENTITY_TYPE = "test";
+    static String ENTITY_TYPE2 = "test2";
+    static int ID_COUNTER = 1;
+    static final QName QNAME = QName.create("test", "2015-08-11", "foo");
 
     private final String dataStoreType = "config" + ID_COUNTER++;
-    private DistributedEntityOwnershipService service;
     private DistributedDataStore dataStore;
 
     @Before
@@ -44,25 +64,136 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
                 new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
 
         dataStore.onGlobalContextUpdated(TestModel.createTestContext());
-
-        service = new DistributedEntityOwnershipService(dataStore);
     }
 
     @Test
     public void testEntityOwnershipShardCreated() throws Exception {
+        DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
         service.start();
 
         Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
                 DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
         assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
+
+        service.close();
     }
 
     @Test
-    public void testRegisterCandidate() {
+    public void testRegisterCandidate() throws Exception {
+        final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
+        DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
+            @Override
+            protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+                return shardPropsCreator;
+            }
+        };
+
+        service.start();
+
+        shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
+
+        Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
+        EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+        EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
+
+        verifyEntityOwnershipCandidateRegistration(entity, reg);
+        verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+
+        // Test same entity - should throw exception
+
+        EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
+        try {
+            service.registerCandidate(entity, candidate2);
+            fail("Expected CandidateAlreadyRegisteredException");
+        } catch(CandidateAlreadyRegisteredException e) {
+            // expected
+            assertSame("getCandidate", candidate, e.getRegisteredCandidate());
+            assertEquals("getEntity", entity, e.getEntity());
+        }
+
+        // Test different entity
+
+        Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
+        shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
+
+        EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
+
+        verifyEntityOwnershipCandidateRegistration(entity2, reg2);
+        verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
+
+        service.close();
     }
 
     @Test
     public void testRegisterListener() {
     }
+
+    private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
+            EntityOwnershipCandidate candidate) {
+        RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
+        assertSame("getCandidate", candidate, regCandidate.getCandidate());
+        assertEquals("getEntity", entity, regCandidate.getEntity());
+    }
+
+    private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
+        assertNotNull("EntityOwnershipCandidateRegistration null", reg);
+        assertEquals("getEntity", entity, reg.getEntity());
+    }
+
+    static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
+        private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
+        private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
+        private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
+
+        @Override
+        public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
+                DatastoreContext datastoreContext, SchemaContext schemaContext) {
+            return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
+                    schemaContext, messageClass, messageReceived, receivedMessage);
+        }
+
+        @SuppressWarnings("unchecked")
+        <T> T waitForShardMessage() {
+            assertEquals("Message received", true, Uninterruptibles.awaitUninterruptibly(
+                    messageReceived.get(), 5, TimeUnit.SECONDS));
+            assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
+            return (T) receivedMessage.get();
+        }
+
+        void expectShardMessage(Class<?> ofType) {
+            messageReceived.set(new CountDownLatch(1));
+            receivedMessage.set(null);
+            messageClass.set(ofType);
+        }
+    }
+
+    static class TestEntityOwnershipShard extends EntityOwnershipShard {
+        private final AtomicReference<CountDownLatch> messageReceived;
+        private final AtomicReference<Object> receivedMessage;
+        private final AtomicReference<Class<?>> messageClass;
+
+        protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
+                DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference<Class<?>> messageClass,
+                AtomicReference<CountDownLatch> messageReceived, AtomicReference<Object> receivedMessage) {
+            super(name, peerAddresses, datastoreContext, schemaContext);
+            this.messageClass = messageClass;
+            this.messageReceived = messageReceived;
+            this.receivedMessage = receivedMessage;
+        }
+
+        @Override
+        public void onReceiveCommand(final Object message) throws Exception {
+            try {
+                super.onReceiveCommand(message);
+            } finally {
+                Class<?> expMsgClass = messageClass.get();
+                if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
+                    receivedMessage.set(message);
+                    messageReceived.get().countDown();
+                }
+            }
+        }
+    }
 }