From: Tom Pantelis Date: Sat, 8 Aug 2015 08:20:11 +0000 (-0400) Subject: Bug 4105: Implement DistributedEntityOwnershipService#registerCandidate X-Git-Tag: release/beryllium~313 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=9883165b61255af5bfcb060b76b16f69a56b1f82 Bug 4105: Implement DistributedEntityOwnershipService#registerCandidate Added a RegisterCandidateLocal message and implemented registerCandidate to send the message to the local EntityOwnershipShard. Change-Id: If941401d00912ce34f74e54188af0430a5ec6fcc Signed-off-by: Tom Pantelis --- diff --git a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/CandidateAlreadyRegisteredException.java b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/CandidateAlreadyRegisteredException.java index f509b4002b..7e35bc9439 100644 --- a/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/CandidateAlreadyRegisteredException.java +++ b/opendaylight/md-sal/sal-common-api/src/main/java/org/opendaylight/controller/md/sal/common/api/clustering/CandidateAlreadyRegisteredException.java @@ -16,25 +16,18 @@ import javax.annotation.Nonnull; * duplicate registration or two different components within the same process trying to register a Candidate. */ public class CandidateAlreadyRegisteredException extends Exception { + private static final long serialVersionUID = 1L; + private final Entity entity; private final EntityOwnershipCandidate registeredCandidate; public CandidateAlreadyRegisteredException(@Nonnull Entity entity, - @Nonnull EntityOwnershipCandidate registeredCandidate, - String message) { - super(message); - this.entity = Preconditions.checkNotNull(entity, "entity should not be null"); - this.registeredCandidate = Preconditions.checkNotNull(registeredCandidate, - "registeredCandidate should not be null"); - } - - public CandidateAlreadyRegisteredException(@Nonnull Entity entity, - @Nonnull EntityOwnershipCandidate registeredCandidate, - String message, Throwable throwable) { - super(message, throwable); - this.entity = Preconditions.checkNotNull(entity, "entity should not be null"); - this.registeredCandidate = Preconditions.checkNotNull(registeredCandidate, - "registeredCandidate should not be null"); + @Nonnull EntityOwnershipCandidate registeredCandidate) { + super(String.format("Candidate %s has already been registered for %s", + Preconditions.checkNotNull(registeredCandidate, "registeredCandidate should not be null"), + Preconditions.checkNotNull(entity, "entity should not be null"))); + this.entity = entity; + this.registeredCandidate = registeredCandidate; } /** diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java new file mode 100644 index 0000000000..e1184583f1 --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipCandidateRegistration.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.impl.clustering; + +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; + +/** + * Abstract base class for an EntityOwnershipCandidateRegistration. + * + * @author Thomas Pantelis + */ +public abstract class AbstractEntityOwnershipCandidateRegistration + extends AbstractEntityOwnershipListenerRegistration + implements EntityOwnershipCandidateRegistration { + + protected AbstractEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) { + super(candidate, entity); + } +} diff --git a/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java new file mode 100644 index 0000000000..881d6624d6 --- /dev/null +++ b/opendaylight/md-sal/sal-common-impl/src/main/java/org/opendaylight/controller/md/sal/common/impl/clustering/AbstractEntityOwnershipListenerRegistration.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.md.sal.common.impl.clustering; + +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration; + +/** + * Abstract base class for an EntityOwnershipListenerRegistration. + * + * @author Thomas Pantelis + */ +public abstract class AbstractEntityOwnershipListenerRegistration + implements EntityOwnershipListenerRegistration { + private final T listener; + private final Entity entity; + + protected AbstractEntityOwnershipListenerRegistration(T listener, Entity entity) { + this.listener = listener; + this.entity = entity; + } + + @Override + public T getInstance() { + return listener; + } + + @Override + public Entity getEntity() { + return entity; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java new file mode 100644 index 0000000000..1089ec2803 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; +import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityOwnershipCandidateRegistration; + +/** + * Implementation of EntityOwnershipCandidateRegistration. + * + * @author Thomas Pantelis + */ +class DistributedEntityOwnershipCandidateRegistration extends AbstractEntityOwnershipCandidateRegistration { + + DistributedEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) { + super(candidate, entity); + } + + @Override + public void close() { + // TODO - need to send unregister message. + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java index ca315435cf..90720eead6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java @@ -10,8 +10,11 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import akka.actor.ActorRef; import akka.dispatch.OnComplete; import akka.util.Timeout; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; @@ -35,6 +38,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES); private final DistributedDataStore datastore; + private final ConcurrentMap registeredEntities = new ConcurrentHashMap<>(); + private volatile ActorRef localEntityOwnershipShard; public DistributedEntityOwnershipService(DistributedDataStore datastore) { this.datastore = datastore; @@ -45,7 +50,7 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME, datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(), - new EntityOwnershipShardPropsCreator(), null); + newShardPropsCreator(), null); Future createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor, createShard, MESSAGE_TIMEOUT); @@ -62,11 +67,56 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService }, datastore.getActorContext().getClientDispatcher()); } + private void executeEntityOwnershipShardOperation(final ActorRef shardActor, final Object message) { + Future future = datastore.getActorContext().executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT); + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object response) { + if(failure != null) { + LOG.debug("Error sending message {} to {}", message, shardActor, failure); + // TODO - queue for retry + } else { + LOG.debug("{} message to {} succeeded", message, shardActor, failure); + } + } + }, datastore.getActorContext().getClientDispatcher()); + } + + private void executeLocalEntityOwnershipShardOperation(final Object message) { + if(localEntityOwnershipShard == null) { + Future future = datastore.getActorContext().findLocalShardAsync(ENTITY_OWNERSHIP_SHARD_NAME); + future.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, ActorRef shardActor) { + if(failure != null) { + LOG.error("Failed to find local {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure); + } else { + localEntityOwnershipShard = shardActor; + executeEntityOwnershipShardOperation(localEntityOwnershipShard, message); + } + } + }, datastore.getActorContext().getClientDispatcher()); + + } else { + executeEntityOwnershipShardOperation(localEntityOwnershipShard, message); + } + } + @Override public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate) throws CandidateAlreadyRegisteredException { - // TODO Auto-generated method stub - return null; + + EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity,candidate); + if(currentCandidate != null) { + throw new CandidateAlreadyRegisteredException(entity, currentCandidate); + } + + RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity); + + LOG.debug("Registering candidate with message: " + registerCandidate); + + executeLocalEntityOwnershipShardOperation(registerCandidate); + return new DistributedEntityOwnershipCandidateRegistration(candidate, entity); } @Override @@ -78,4 +128,8 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService @Override public void close() { } + + protected EntityOwnershipShardPropsCreator newShardPropsCreator() { + return new EntityOwnershipShardPropsCreator(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 2fabf0a878..230b597f2d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -11,7 +11,9 @@ import akka.actor.Props; import java.util.Map; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.Shard; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; +import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** @@ -35,6 +37,19 @@ public class EntityOwnershipShard extends Shard { super.onDatastoreContext(noPersistenceDatastoreContext(context)); } + @Override + public void onReceiveCommand(final Object message) throws Exception { + if(message instanceof RegisterCandidateLocal) { + onRegisterCandidateLocal((RegisterCandidateLocal)message); + } else { + super.onReceiveCommand(message); + } + } + + private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) { + getSender().tell(SuccessReply.INSTANCE, getSelf()); + } + public static Props props(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java index 2335d2948b..e28db39456 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShardPropsCreator.java @@ -19,7 +19,7 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext; * * @author Thomas Pantelis */ -public class EntityOwnershipShardPropsCreator implements ShardPropsCreator { +class EntityOwnershipShardPropsCreator implements ShardPropsCreator { @Override public Props newProps(ShardIdentifier shardId, Map peerAddresses, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java new file mode 100644 index 0000000000..6ba09a0729 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/RegisterCandidateLocal.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership.messages; + +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; + +/** + * Message sent to the local EntityOwnershipShard to register a candidate. + * + * @author Thomas Pantelis + */ +public class RegisterCandidateLocal { + private final EntityOwnershipCandidate candidate; + private final Entity entity; + + public RegisterCandidateLocal(EntityOwnershipCandidate candidate, Entity entity) { + this.candidate = candidate; + this.entity = entity; + } + + public EntityOwnershipCandidate getCandidate() { + return candidate; + } + + public Entity getEntity() { + return entity; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("RegisterCandidateLocal [candidate=").append(candidate.getClass()).append(", entity=") + .append(entity).append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SuccessReply.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SuccessReply.java new file mode 100644 index 0000000000..f8ac916c79 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/SuccessReply.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.messages; + +import java.io.Serializable; + +/** + * A reply message indicating success. + * + * @author Thomas Pantelis + */ +public class SuccessReply implements Serializable { + private static final long serialVersionUID = 1L; + + public static SuccessReply INSTANCE = new SuccessReply(); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index b7f75c98f3..ee5b1c5a2d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -7,19 +7,37 @@ */ package org.opendaylight.controller.cluster.datastore.entityownership; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; import akka.actor.ActorRef; +import akka.actor.Props; +import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; @@ -30,10 +48,12 @@ import scala.concurrent.duration.Duration; * @author Thomas Pantelis */ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { - private static int ID_COUNTER = 1; + static String ENTITY_TYPE = "test"; + static String ENTITY_TYPE2 = "test2"; + static int ID_COUNTER = 1; + static final QName QNAME = QName.create("test", "2015-08-11", "foo"); private final String dataStoreType = "config" + ID_COUNTER++; - private DistributedEntityOwnershipService service; private DistributedDataStore dataStore; @Before @@ -44,25 +64,136 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { new MockConfiguration(Collections.>emptyMap()), datastoreContext ); dataStore.onGlobalContextUpdated(TestModel.createTestContext()); - - service = new DistributedEntityOwnershipService(dataStore); } @Test public void testEntityOwnershipShardCreated() throws Exception { + DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore); service.start(); Future future = dataStore.getActorContext().findLocalShardAsync( DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME); ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS)); assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor); + + service.close(); } @Test - public void testRegisterCandidate() { + public void testRegisterCandidate() throws Exception { + final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) { + @Override + protected EntityOwnershipShardPropsCreator newShardPropsCreator() { + return shardPropsCreator; + } + }; + + service.start(); + + shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class); + + Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME)); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate); + + verifyEntityOwnershipCandidateRegistration(entity, reg); + verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate); + + // Test same entity - should throw exception + + EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class); + try { + service.registerCandidate(entity, candidate2); + fail("Expected CandidateAlreadyRegisteredException"); + } catch(CandidateAlreadyRegisteredException e) { + // expected + assertSame("getCandidate", candidate, e.getRegisteredCandidate()); + assertEquals("getEntity", entity, e.getEntity()); + } + + // Test different entity + + Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME)); + shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class); + + EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate); + + verifyEntityOwnershipCandidateRegistration(entity2, reg2); + verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate); + + service.close(); } @Test public void testRegisterListener() { } + + private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity, + EntityOwnershipCandidate candidate) { + RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage(); + assertSame("getCandidate", candidate, regCandidate.getCandidate()); + assertEquals("getEntity", entity, regCandidate.getEntity()); + } + + private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) { + assertNotNull("EntityOwnershipCandidateRegistration null", reg); + assertEquals("getEntity", entity, reg.getEntity()); + } + + static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator { + private final AtomicReference messageReceived = new AtomicReference<>(); + private final AtomicReference receivedMessage = new AtomicReference<>(); + private final AtomicReference> messageClass = new AtomicReference<>(); + + @Override + public Props newProps(ShardIdentifier shardId, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext) { + return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext, + schemaContext, messageClass, messageReceived, receivedMessage); + } + + @SuppressWarnings("unchecked") + T waitForShardMessage() { + assertEquals("Message received", true, Uninterruptibles.awaitUninterruptibly( + messageReceived.get(), 5, TimeUnit.SECONDS)); + assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass()); + return (T) receivedMessage.get(); + } + + void expectShardMessage(Class ofType) { + messageReceived.set(new CountDownLatch(1)); + receivedMessage.set(null); + messageClass.set(ofType); + } + } + + static class TestEntityOwnershipShard extends EntityOwnershipShard { + private final AtomicReference messageReceived; + private final AtomicReference receivedMessage; + private final AtomicReference> messageClass; + + protected TestEntityOwnershipShard(ShardIdentifier name, Map peerAddresses, + DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference> messageClass, + AtomicReference messageReceived, AtomicReference receivedMessage) { + super(name, peerAddresses, datastoreContext, schemaContext); + this.messageClass = messageClass; + this.messageReceived = messageReceived; + this.receivedMessage = receivedMessage; + } + + @Override + public void onReceiveCommand(final Object message) throws Exception { + try { + super.onReceiveCommand(message); + } finally { + Class expMsgClass = messageClass.get(); + if(expMsgClass != null && expMsgClass.equals(message.getClass())) { + receivedMessage.set(message); + messageReceived.get().countDown(); + } + } + } + } }