From 1280c1ce40241738edd5ebae40bf5dfddec68198 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Sat, 8 Aug 2015 06:57:03 -0400 Subject: [PATCH] Bug 4105: Implement candidate registration close Added an UnregisterCandidateLocal message which is sent when a DistributedEntityOwnershipCandidateRegistration is closed. Change-Id: I6336e1b83a7764bfb4abc2fc37e196175c008dc3 Signed-off-by: Tom Pantelis --- ...dEntityOwnershipCandidateRegistration.java | 7 ++- .../DistributedEntityOwnershipService.java | 14 ++++- .../entityownership/EntityOwnershipShard.java | 9 +++ .../messages/UnregisterCandidateLocal.java | 34 +++++++++++ ...DistributedEntityOwnershipServiceTest.java | 59 +++++++++++++++++-- 5 files changed, 113 insertions(+), 10 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterCandidateLocal.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java index 1089ec2803..70c454e3af 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipCandidateRegistration.java @@ -17,13 +17,16 @@ import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityO * @author Thomas Pantelis */ class DistributedEntityOwnershipCandidateRegistration extends AbstractEntityOwnershipCandidateRegistration { + private final DistributedEntityOwnershipService service; - DistributedEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) { + DistributedEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity, + DistributedEntityOwnershipService service) { super(candidate, entity); + this.service = service; } @Override public void close() { - // TODO - need to send unregister message. + service.unregisterCandidate(getEntity()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java index 90720eead6..13ecfffefe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipService.java @@ -15,6 +15,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.messages.CreateShard; import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException; import org.opendaylight.controller.md.sal.common.api.clustering.Entity; @@ -106,17 +107,24 @@ public class DistributedEntityOwnershipService implements EntityOwnershipService public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate) throws CandidateAlreadyRegisteredException { - EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity,candidate); + EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate); if(currentCandidate != null) { throw new CandidateAlreadyRegisteredException(entity, currentCandidate); } RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity); - LOG.debug("Registering candidate with message: " + registerCandidate); + LOG.debug("Registering candidate with message: {}", registerCandidate); executeLocalEntityOwnershipShardOperation(registerCandidate); - return new DistributedEntityOwnershipCandidateRegistration(candidate, entity); + return new DistributedEntityOwnershipCandidateRegistration(candidate, entity, this); + } + + void unregisterCandidate(Entity entity) { + LOG.debug("Unregistering candidate for {}", entity); + + executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity)); + registeredEntities.remove(entity); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java index 230b597f2d..52f85f481e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipShard.java @@ -12,6 +12,7 @@ import java.util.Map; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.messages.SuccessReply; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -41,12 +42,20 @@ public class EntityOwnershipShard extends Shard { public void onReceiveCommand(final Object message) throws Exception { if(message instanceof RegisterCandidateLocal) { onRegisterCandidateLocal((RegisterCandidateLocal)message); + } else if(message instanceof UnregisterCandidateLocal) { + onUnregisterCandidateLocal((UnregisterCandidateLocal)message); } else { super.onReceiveCommand(message); } } private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) { + // TODO - implement + getSender().tell(SuccessReply.INSTANCE, getSelf()); + } + + private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) { + // TODO - implement getSender().tell(SuccessReply.INSTANCE, getSelf()); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterCandidateLocal.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterCandidateLocal.java new file mode 100644 index 0000000000..d99ec7b154 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/UnregisterCandidateLocal.java @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership.messages; + +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; + +/** + * Message sent to the local EntityOwnershipShard to unregister a candidate. + * + * @author Thomas Pantelis + */ +public class UnregisterCandidateLocal { + private final Entity entity; + + public UnregisterCandidateLocal(Entity entity) { + this.entity = entity; + } + + public Entity getEntity() { + return entity; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("UnregisterCandidateLocal [entity=").append(entity).append("]"); + return builder.toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java index ee5b1c5a2d..860d3dc809 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java @@ -10,9 +10,11 @@ package org.opendaylight.controller.cluster.datastore.entityownership; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import akka.actor.ActorRef; +import akka.actor.PoisonPill; import akka.actor.Props; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; @@ -21,12 +23,14 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.opendaylight.controller.cluster.datastore.AbstractActorTest; import org.opendaylight.controller.cluster.datastore.DatastoreContext; import org.opendaylight.controller.cluster.datastore.DistributedDataStore; import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal; import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier; import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper; import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration; @@ -57,7 +61,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { private DistributedDataStore dataStore; @Before - public void setUp() throws Exception { + public void setUp() { DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType). shardInitializationTimeout(10, TimeUnit.SECONDS).build(); dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), @@ -66,6 +70,11 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { dataStore.onGlobalContextUpdated(TestModel.createTestContext()); } + @After + public void tearDown() { + dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + @Test public void testEntityOwnershipShardCreated() throws Exception { DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore); @@ -101,7 +110,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { verifyEntityOwnershipCandidateRegistration(entity, reg); verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate); - // Test same entity - should throw exception + // Register the same entity - should throw exception EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class); try { @@ -113,7 +122,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { assertEquals("getEntity", entity, e.getEntity()); } - // Test different entity + // Register a different entity - should succeed Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME)); shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class); @@ -126,6 +135,46 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { service.close(); } + @Test + public void testCloseCandidateRegistration() throws Exception { + final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator(); + DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) { + @Override + protected EntityOwnershipShardPropsCreator newShardPropsCreator() { + return shardPropsCreator; + } + }; + + service.start(); + + shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class); + + Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME)); + EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class); + + EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate); + + verifyEntityOwnershipCandidateRegistration(entity, reg); + verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate); + + shardPropsCreator.expectShardMessage(UnregisterCandidateLocal.class); + + reg.close(); + + UnregisterCandidateLocal unregCandidate = shardPropsCreator.waitForShardMessage(); + assertEquals("getEntity", entity, unregCandidate.getEntity()); + + // Re-register - should succeed. + + shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class); + + service.registerCandidate(entity, candidate); + + verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate); + + service.close(); + } + @Test public void testRegisterListener() { } @@ -156,8 +205,8 @@ public class DistributedEntityOwnershipServiceTest extends AbstractActorTest { @SuppressWarnings("unchecked") T waitForShardMessage() { - assertEquals("Message received", true, Uninterruptibles.awaitUninterruptibly( - messageReceived.get(), 5, TimeUnit.SECONDS)); + assertTrue("Message " + messageClass.get().getSimpleName() + " was not received", + Uninterruptibles.awaitUninterruptibly(messageReceived.get(), 5, TimeUnit.SECONDS)); assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass()); return (T) receivedMessage.get(); } -- 2.36.6