import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
throws CandidateAlreadyRegisteredException {
- EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity,candidate);
+ EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity, candidate);
if(currentCandidate != null) {
throw new CandidateAlreadyRegisteredException(entity, currentCandidate);
}
RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity);
- LOG.debug("Registering candidate with message: " + registerCandidate);
+ LOG.debug("Registering candidate with message: {}", registerCandidate);
executeLocalEntityOwnershipShardOperation(registerCandidate);
- return new DistributedEntityOwnershipCandidateRegistration(candidate, entity);
+ return new DistributedEntityOwnershipCandidateRegistration(candidate, entity, this);
+ }
+
+ void unregisterCandidate(Entity entity) {
+ LOG.debug("Unregistering candidate for {}", entity);
+
+ executeLocalEntityOwnershipShardOperation(new UnregisterCandidateLocal(entity));
+ registeredEntities.remove(entity);
}
@Override
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
public void onReceiveCommand(final Object message) throws Exception {
if(message instanceof RegisterCandidateLocal) {
onRegisterCandidateLocal((RegisterCandidateLocal)message);
+ } else if(message instanceof UnregisterCandidateLocal) {
+ onUnregisterCandidateLocal((UnregisterCandidateLocal)message);
} else {
super.onReceiveCommand(message);
}
}
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
+ // TODO - implement
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
+ private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
+ // TODO - implement
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+
+/**
+ * Message sent to the local EntityOwnershipShard to unregister a candidate.
+ *
+ * @author Thomas Pantelis
+ */
+public class UnregisterCandidateLocal {
+ private final Entity entity;
+
+ public UnregisterCandidateLocal(Entity entity) {
+ this.entity = entity;
+ }
+
+ public Entity getEntity() {
+ return entity;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("UnregisterCandidateLocal [entity=").append(entity).append("]");
+ return builder.toString();
+ }
+}
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
import akka.actor.Props;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
private DistributedDataStore dataStore;
@Before
- public void setUp() throws Exception {
+ public void setUp() {
DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
shardInitializationTimeout(10, TimeUnit.SECONDS).build();
dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(),
dataStore.onGlobalContextUpdated(TestModel.createTestContext());
}
+ @After
+ public void tearDown() {
+ dataStore.getActorContext().getShardManager().tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
@Test
public void testEntityOwnershipShardCreated() throws Exception {
DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
verifyEntityOwnershipCandidateRegistration(entity, reg);
verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
- // Test same entity - should throw exception
+ // Register the same entity - should throw exception
EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
try {
assertEquals("getEntity", entity, e.getEntity());
}
- // Test different entity
+ // Register a different entity - should succeed
Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
service.close();
}
+ @Test
+ public void testCloseCandidateRegistration() throws Exception {
+ final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
+ DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
+ @Override
+ protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+ return shardPropsCreator;
+ }
+ };
+
+ service.start();
+
+ shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
+
+ Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
+
+ verifyEntityOwnershipCandidateRegistration(entity, reg);
+ verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+
+ shardPropsCreator.expectShardMessage(UnregisterCandidateLocal.class);
+
+ reg.close();
+
+ UnregisterCandidateLocal unregCandidate = shardPropsCreator.waitForShardMessage();
+ assertEquals("getEntity", entity, unregCandidate.getEntity());
+
+ // Re-register - should succeed.
+
+ shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
+
+ service.registerCandidate(entity, candidate);
+
+ verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+
+ service.close();
+ }
+
@Test
public void testRegisterListener() {
}
@SuppressWarnings("unchecked")
<T> T waitForShardMessage() {
- assertEquals("Message received", true, Uninterruptibles.awaitUninterruptibly(
- messageReceived.get(), 5, TimeUnit.SECONDS));
+ assertTrue("Message " + messageClass.get().getSimpleName() + " was not received",
+ Uninterruptibles.awaitUninterruptibly(messageReceived.get(), 5, TimeUnit.SECONDS));
assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
return (T) receivedMessage.get();
}