* duplicate registration or two different components within the same process trying to register a Candidate.
*/
public class CandidateAlreadyRegisteredException extends Exception {
+ private static final long serialVersionUID = 1L;
+
private final Entity entity;
private final EntityOwnershipCandidate registeredCandidate;
public CandidateAlreadyRegisteredException(@Nonnull Entity entity,
- @Nonnull EntityOwnershipCandidate registeredCandidate,
- String message) {
- super(message);
- this.entity = Preconditions.checkNotNull(entity, "entity should not be null");
- this.registeredCandidate = Preconditions.checkNotNull(registeredCandidate,
- "registeredCandidate should not be null");
- }
-
- public CandidateAlreadyRegisteredException(@Nonnull Entity entity,
- @Nonnull EntityOwnershipCandidate registeredCandidate,
- String message, Throwable throwable) {
- super(message, throwable);
- this.entity = Preconditions.checkNotNull(entity, "entity should not be null");
- this.registeredCandidate = Preconditions.checkNotNull(registeredCandidate,
- "registeredCandidate should not be null");
+ @Nonnull EntityOwnershipCandidate registeredCandidate) {
+ super(String.format("Candidate %s has already been registered for %s",
+ Preconditions.checkNotNull(registeredCandidate, "registeredCandidate should not be null"),
+ Preconditions.checkNotNull(entity, "entity should not be null")));
+ this.entity = entity;
+ this.registeredCandidate = registeredCandidate;
}
/**
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.clustering;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+
+/**
+ * Abstract base class for an EntityOwnershipCandidateRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractEntityOwnershipCandidateRegistration
+ extends AbstractEntityOwnershipListenerRegistration<EntityOwnershipCandidate>
+ implements EntityOwnershipCandidateRegistration {
+
+ protected AbstractEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) {
+ super(candidate, entity);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.impl.clustering;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListenerRegistration;
+
+/**
+ * Abstract base class for an EntityOwnershipListenerRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+public abstract class AbstractEntityOwnershipListenerRegistration<T extends EntityOwnershipListener>
+ implements EntityOwnershipListenerRegistration {
+ private final T listener;
+ private final Entity entity;
+
+ protected AbstractEntityOwnershipListenerRegistration(T listener, Entity entity) {
+ this.listener = listener;
+ this.entity = entity;
+ }
+
+ @Override
+ public T getInstance() {
+ return listener;
+ }
+
+ @Override
+ public Entity getEntity() {
+ return entity;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.impl.clustering.AbstractEntityOwnershipCandidateRegistration;
+
+/**
+ * Implementation of EntityOwnershipCandidateRegistration.
+ *
+ * @author Thomas Pantelis
+ */
+class DistributedEntityOwnershipCandidateRegistration extends AbstractEntityOwnershipCandidateRegistration {
+
+ DistributedEntityOwnershipCandidateRegistration(EntityOwnershipCandidate candidate, Entity entity) {
+ super(candidate, entity);
+ }
+
+ @Override
+ public void close() {
+ // TODO - need to send unregister message.
+ }
+}
import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.messages.CreateShard;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
private static final Timeout MESSAGE_TIMEOUT = new Timeout(1, TimeUnit.MINUTES);
private final DistributedDataStore datastore;
+ private final ConcurrentMap<Entity, EntityOwnershipCandidate> registeredEntities = new ConcurrentHashMap<>();
+ private volatile ActorRef localEntityOwnershipShard;
public DistributedEntityOwnershipService(DistributedDataStore datastore) {
this.datastore = datastore;
CreateShard createShard = new CreateShard(ENTITY_OWNERSHIP_SHARD_NAME,
datastore.getActorContext().getConfiguration().getUniqueMemberNamesForAllShards(),
- new EntityOwnershipShardPropsCreator(), null);
+ newShardPropsCreator(), null);
Future<Object> createFuture = datastore.getActorContext().executeOperationAsync(shardManagerActor,
createShard, MESSAGE_TIMEOUT);
}, datastore.getActorContext().getClientDispatcher());
}
+ private void executeEntityOwnershipShardOperation(final ActorRef shardActor, final Object message) {
+ Future<Object> future = datastore.getActorContext().executeOperationAsync(shardActor, message, MESSAGE_TIMEOUT);
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if(failure != null) {
+ LOG.debug("Error sending message {} to {}", message, shardActor, failure);
+ // TODO - queue for retry
+ } else {
+ LOG.debug("{} message to {} succeeded", message, shardActor, failure);
+ }
+ }
+ }, datastore.getActorContext().getClientDispatcher());
+ }
+
+ private void executeLocalEntityOwnershipShardOperation(final Object message) {
+ if(localEntityOwnershipShard == null) {
+ Future<ActorRef> future = datastore.getActorContext().findLocalShardAsync(ENTITY_OWNERSHIP_SHARD_NAME);
+ future.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(Throwable failure, ActorRef shardActor) {
+ if(failure != null) {
+ LOG.error("Failed to find local {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
+ } else {
+ localEntityOwnershipShard = shardActor;
+ executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+ }
+ }
+ }, datastore.getActorContext().getClientDispatcher());
+
+ } else {
+ executeEntityOwnershipShardOperation(localEntityOwnershipShard, message);
+ }
+ }
+
@Override
public EntityOwnershipCandidateRegistration registerCandidate(Entity entity, EntityOwnershipCandidate candidate)
throws CandidateAlreadyRegisteredException {
- // TODO Auto-generated method stub
- return null;
+
+ EntityOwnershipCandidate currentCandidate = registeredEntities.putIfAbsent(entity,candidate);
+ if(currentCandidate != null) {
+ throw new CandidateAlreadyRegisteredException(entity, currentCandidate);
+ }
+
+ RegisterCandidateLocal registerCandidate = new RegisterCandidateLocal(candidate, entity);
+
+ LOG.debug("Registering candidate with message: " + registerCandidate);
+
+ executeLocalEntityOwnershipShardOperation(registerCandidate);
+ return new DistributedEntityOwnershipCandidateRegistration(candidate, entity);
}
@Override
@Override
public void close() {
}
+
+ protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+ return new EntityOwnershipShardPropsCreator();
+ }
}
import java.util.Map;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
/**
super.onDatastoreContext(noPersistenceDatastoreContext(context));
}
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ if(message instanceof RegisterCandidateLocal) {
+ onRegisterCandidateLocal((RegisterCandidateLocal)message);
+ } else {
+ super.onReceiveCommand(message);
+ }
+ }
+
+ private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
+ getSender().tell(SuccessReply.INSTANCE, getSelf());
+ }
+
public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext));
*
* @author Thomas Pantelis
*/
-public class EntityOwnershipShardPropsCreator implements ShardPropsCreator {
+class EntityOwnershipShardPropsCreator implements ShardPropsCreator {
@Override
public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+
+/**
+ * Message sent to the local EntityOwnershipShard to register a candidate.
+ *
+ * @author Thomas Pantelis
+ */
+public class RegisterCandidateLocal {
+ private final EntityOwnershipCandidate candidate;
+ private final Entity entity;
+
+ public RegisterCandidateLocal(EntityOwnershipCandidate candidate, Entity entity) {
+ this.candidate = candidate;
+ this.entity = entity;
+ }
+
+ public EntityOwnershipCandidate getCandidate() {
+ return candidate;
+ }
+
+ public Entity getEntity() {
+ return entity;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("RegisterCandidateLocal [candidate=").append(candidate.getClass()).append(", entity=")
+ .append(entity).append("]");
+ return builder.toString();
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.Serializable;
+
+/**
+ * A reply message indicating success.
+ *
+ * @author Thomas Pantelis
+ */
+public class SuccessReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ public static SuccessReply INSTANCE = new SuccessReply();
+}
*/
package org.opendaylight.controller.cluster.datastore.entityownership;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.Test;
import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
* @author Thomas Pantelis
*/
public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
- private static int ID_COUNTER = 1;
+ static String ENTITY_TYPE = "test";
+ static String ENTITY_TYPE2 = "test2";
+ static int ID_COUNTER = 1;
+ static final QName QNAME = QName.create("test", "2015-08-11", "foo");
private final String dataStoreType = "config" + ID_COUNTER++;
- private DistributedEntityOwnershipService service;
private DistributedDataStore dataStore;
@Before
new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
dataStore.onGlobalContextUpdated(TestModel.createTestContext());
-
- service = new DistributedEntityOwnershipService(dataStore);
}
@Test
public void testEntityOwnershipShardCreated() throws Exception {
+ DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore);
service.start();
Future<ActorRef> future = dataStore.getActorContext().findLocalShardAsync(
DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME);
ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
assertNotNull(DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME + " not found", shardActor);
+
+ service.close();
}
@Test
- public void testRegisterCandidate() {
+ public void testRegisterCandidate() throws Exception {
+ final TestShardPropsCreator shardPropsCreator = new TestShardPropsCreator();
+ DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(dataStore) {
+ @Override
+ protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
+ return shardPropsCreator;
+ }
+ };
+
+ service.start();
+
+ shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
+
+ Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
+
+ verifyEntityOwnershipCandidateRegistration(entity, reg);
+ verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+
+ // Test same entity - should throw exception
+
+ EntityOwnershipCandidate candidate2 = mock(EntityOwnershipCandidate.class);
+ try {
+ service.registerCandidate(entity, candidate2);
+ fail("Expected CandidateAlreadyRegisteredException");
+ } catch(CandidateAlreadyRegisteredException e) {
+ // expected
+ assertSame("getCandidate", candidate, e.getRegisteredCandidate());
+ assertEquals("getEntity", entity, e.getEntity());
+ }
+
+ // Test different entity
+
+ Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
+ shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
+
+ EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
+
+ verifyEntityOwnershipCandidateRegistration(entity2, reg2);
+ verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
+
+ service.close();
}
@Test
public void testRegisterListener() {
}
+
+ private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
+ EntityOwnershipCandidate candidate) {
+ RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
+ assertSame("getCandidate", candidate, regCandidate.getCandidate());
+ assertEquals("getEntity", entity, regCandidate.getEntity());
+ }
+
+ private void verifyEntityOwnershipCandidateRegistration(Entity entity, EntityOwnershipCandidateRegistration reg) {
+ assertNotNull("EntityOwnershipCandidateRegistration null", reg);
+ assertEquals("getEntity", entity, reg.getEntity());
+ }
+
+ static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
+ private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
+ private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
+ private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
+
+ @Override
+ public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
+ schemaContext, messageClass, messageReceived, receivedMessage);
+ }
+
+ @SuppressWarnings("unchecked")
+ <T> T waitForShardMessage() {
+ assertEquals("Message received", true, Uninterruptibles.awaitUninterruptibly(
+ messageReceived.get(), 5, TimeUnit.SECONDS));
+ assertEquals("Message type", messageClass.get(), receivedMessage.get().getClass());
+ return (T) receivedMessage.get();
+ }
+
+ void expectShardMessage(Class<?> ofType) {
+ messageReceived.set(new CountDownLatch(1));
+ receivedMessage.set(null);
+ messageClass.set(ofType);
+ }
+ }
+
+ static class TestEntityOwnershipShard extends EntityOwnershipShard {
+ private final AtomicReference<CountDownLatch> messageReceived;
+ private final AtomicReference<Object> receivedMessage;
+ private final AtomicReference<Class<?>> messageClass;
+
+ protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
+ DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference<Class<?>> messageClass,
+ AtomicReference<CountDownLatch> messageReceived, AtomicReference<Object> receivedMessage) {
+ super(name, peerAddresses, datastoreContext, schemaContext);
+ this.messageClass = messageClass;
+ this.messageReceived = messageReceived;
+ this.receivedMessage = receivedMessage;
+ }
+
+ @Override
+ public void onReceiveCommand(final Object message) throws Exception {
+ try {
+ super.onReceiveCommand(message);
+ } finally {
+ Class<?> expMsgClass = messageClass.get();
+ if(expMsgClass != null && expMsgClass.equals(message.getClass())) {
+ receivedMessage.set(message);
+ messageReceived.get().countDown();
+ }
+ }
+ }
+ }
}