import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
*
* @author Thomas Pantelis
*/
-public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
+public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
static String ENTITY_TYPE = "test";
static String ENTITY_TYPE2 = "test2";
static int ID_COUNTER = 1;
public void setUp() {
DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
shardInitializationTimeout(10, TimeUnit.SECONDS).build();
- dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(),
- new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
- dataStore.onGlobalContextUpdated(TestModel.createTestContext());
+ // FIXME - remove this MockConfiguration and use the production ConfigurationImpl class when the
+ // DistributedEntityOwnershipService is changed to setup the ShardStrategy for the entity-owners module.
+ MockConfiguration configuration = new MockConfiguration(Collections.<String, List<String>>emptyMap()) {
+ @Override
+ public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+ return Optional.of("entity-owners");
+ }
+
+ @Override
+ public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+ return ImmutableMap.<String, ShardStrategy>builder().put("entity-owners", new ShardStrategy() {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
+ }
+ }).build();
+ }
+ };
+
+ dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
+
+ dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
+
+ ShardStrategyFactory.setConfiguration(configuration);
}
@After
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
- Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
+ YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+ Entity entity = new Entity(ENTITY_TYPE, entityId);
EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
verifyEntityOwnershipCandidateRegistration(entity, reg);
verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+ verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE, entityId,
+ dataStore.getActorContext().getCurrentMemberName());
// Register the same entity - should throw exception
// Register a different entity - should succeed
- Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
+ Entity entity2 = new Entity(ENTITY_TYPE2, entityId);
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
verifyEntityOwnershipCandidateRegistration(entity2, reg2);
verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
+ verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE2, entityId,
+ dataStore.getActorContext().getCurrentMemberName());
service.close();
}
public void testRegisterListener() {
}
+ private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+ DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(EntityOwnershipShard.ENTITY_OWNERS_PATH).
+ checkedGet(5, TimeUnit.SECONDS);
+ if(optional.isPresent()) {
+ return optional.get();
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ return null;
+ }
+
private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
EntityOwnershipCandidate candidate) {
RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
}
static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
+ TestShardPropsCreator() {
+ super("member-1");
+ }
+
private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
- schemaContext, messageClass, messageReceived, receivedMessage);
+ schemaContext, "member-1", messageClass, messageReceived, receivedMessage);
}
@SuppressWarnings("unchecked")
private final AtomicReference<Class<?>> messageClass;
protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference<Class<?>> messageClass,
- AtomicReference<CountDownLatch> messageReceived, AtomicReference<Object> receivedMessage) {
- super(name, peerAddresses, datastoreContext, schemaContext);
+ DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName,
+ AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
+ AtomicReference<Object> receivedMessage) {
+ super(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
this.messageClass = messageClass;
this.messageReceived = messageReceived;
this.receivedMessage = receivedMessage;