<groupId>org.opendaylight.yangtools</groupId>
<artifactId>yang-data-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
package org.opendaylight.controller.md.sal.common.api.clustering;
import com.google.common.base.Preconditions;
+import java.io.Serializable;
import javax.annotation.Nonnull;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
* referenced by the YangInstanceIdentifier if the inventory node stored in the data store.
* </p>
*/
-public final class Entity {
+public final class Entity implements Serializable {
+ private static final long serialVersionUID = 1L;
private final String type;
private final YangInstanceIdentifier id;
@Override
public int hashCode() {
- int result = type != null ? type.hashCode() : 0;
- result = 31 * result + (id != null ? id.hashCode() : 0);
- return result;
+ return 31 * type.hashCode() + id.hashCode();
}
@Override
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.md.sal.common.api.clustering;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for Entity.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityTest {
+ static String ENTITY_TYPE1 = "type1";
+ static String ENTITY_TYPE2 = "type2";
+ static final QName QNAME1 = QName.create("test", "2015-08-14", "1");
+ static final QName QNAME2 = QName.create("test", "2015-08-14", "2");
+ static final YangInstanceIdentifier YANGID1 = YangInstanceIdentifier.of(QNAME1);
+ static final YangInstanceIdentifier YANGID2 = YangInstanceIdentifier.of(QNAME2);
+
+ @Test
+ public void testHashCode() {
+ Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1);
+
+ assertEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE1, YANGID1).hashCode());
+ assertNotEquals("hashCode", entity1.hashCode(), new Entity(ENTITY_TYPE2, YANGID2).hashCode());
+ }
+
+ @Test
+ public void testEquals() {
+ Entity entity1 = new Entity(ENTITY_TYPE1, YANGID1);
+
+ assertEquals("Same", true, entity1.equals(entity1));
+ assertEquals("Same", true, entity1.equals(new Entity(ENTITY_TYPE1, YANGID1)));
+ assertEquals("Different entity type", false, entity1.equals(new Entity(ENTITY_TYPE2, YANGID1)));
+ assertEquals("Different yang ID", false, entity1.equals(new Entity(ENTITY_TYPE1, YANGID2)));
+ assertEquals("Different Object", false, entity1.equals(new Object()));
+ assertEquals("Equals null", false, entity1.equals(null));
+ }
+
+ @Test
+ public void testSerialization() {
+ Entity entity = new Entity(ENTITY_TYPE1, YANGID1);
+
+ Entity clone = SerializationUtils.clone(entity);
+
+ assertEquals("getType", entity.getType(), clone.getType());
+ assertEquals("getId", entity.getId(), clone.getId());
+ }
+}
getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(errMessage, persistenceId())), getSelf());
}
+ protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
+ try {
+ commitCoordinator.handleBatchedModifications(batched, sender, this);
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ }
+
private void handleBatchedModifications(BatchedModifications batched) {
// This message is sent to prepare the modifications transaction directly on the Shard as an
// optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
if(isLeader()) {
failIfIsolatedLeader(getSender());
- try {
- commitCoordinator.handleBatchedModifications(batched, getSender(), this);
- } catch (Exception e) {
- LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
- batched.getTransactionID(), e);
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
- }
+ handleBatchedModificationsLocal(batched, getSender());
} else {
ActorSelection leader = getLeader();
if(leader != null) {
}
private boolean failIfIsolatedLeader(ActorRef sender) {
- if(getRaftState() == RaftState.IsolatedLeader) {
+ if(isIsolatedLeader()) {
sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
"Shard %s was the leader but has lost contact with all of its followers. Either all" +
" other follower nodes are down or this node is isolated by a network partition.",
return false;
}
+ protected boolean isIsolatedLeader() {
+ return getRaftState() == RaftState.IsolatedLeader;
+ }
+
private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
if (isLeader()) {
failIfIsolatedLeader(getSender());
return commitCoordinator;
}
+ protected DatastoreContext getDatastoreContext() {
+ return datastoreContext;
+ }
+
protected abstract static class AbstractShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
import akka.actor.ActorRef;
import akka.dispatch.OnComplete;
import akka.util.Timeout;
+import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
}
protected EntityOwnershipShardPropsCreator newShardPropsCreator() {
- return new EntityOwnershipShardPropsCreator();
+ return new EntityOwnershipShardPropsCreator(datastore.getActorContext().getCurrentMemberName());
+ }
+
+ @VisibleForTesting
+ ActorRef getLocalEntityOwnershipShard() {
+ return localEntityOwnershipShard;
}
}
*/
package org.opendaylight.controller.cluster.datastore.entityownership;
+import akka.actor.ActorSelection;
import akka.actor.Props;
+import akka.dispatch.OnComplete;
+import akka.pattern.AskTimeoutException;
+import akka.pattern.Patterns;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.Shard;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.identifiers.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Future;
/**
* Special Shard for EntityOwnership.
*
* @author Thomas Pantelis
*/
-public class EntityOwnershipShard extends Shard {
+class EntityOwnershipShard extends Shard {
+ static final YangInstanceIdentifier ENTITY_OWNERS_PATH = YangInstanceIdentifier.of(EntityOwners.QNAME);
+ static final QName ENTITY_QNAME = org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.
+ md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.Entity.QNAME;
+ static final QName CANDIDATE_NAME = QName.create(Candidate.QNAME, "name");
+ static final QName ENTITY_ID = QName.create(ENTITY_QNAME, "id");
+ static final QName ENTITY_TYPE = QName.create(EntityType.QNAME, "type");
+
+ private int transactionIDCounter = 0;
+ private final String localMemberName;
+ private final List<BatchedModifications> retryModifications = new ArrayList<>();
private static DatastoreContext noPersistenceDatastoreContext(DatastoreContext datastoreContext) {
return DatastoreContext.newBuilderFrom(datastoreContext).persistent(false).build();
}
protected EntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext) {
+ DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName) {
super(name, peerAddresses, noPersistenceDatastoreContext(datastoreContext), schemaContext);
+ this.localMemberName = localMemberName;
}
@Override
}
private void onRegisterCandidateLocal(RegisterCandidateLocal registerCandidate) {
- // TODO - implement
+ LOG.debug("onRegisterCandidateLocal: {}", registerCandidate);
+
+ // TODO - add the listener locally.
+
+ BatchedModifications modifications = new BatchedModifications(
+ TransactionIdentifier.create(localMemberName, ++transactionIDCounter).toString(),
+ DataStoreVersions.CURRENT_VERSION, "");
+ modifications.setDoCommitOnReady(true);
+ modifications.setReady(true);
+ modifications.setTotalMessagesSent(1);
+
+ NormalizedNode<?, ?> entityOwners = createEntityOwnersWithCandidate(registerCandidate.getEntity(), localMemberName);
+ modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, entityOwners));
+
+ tryCommitModifications(modifications);
+
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
+ private NormalizedNode<?, ?> createEntityOwnersWithCandidate(Entity entity, String memberName) {
+ MapNode candidateNode = ImmutableNodes.mapNodeBuilder(Candidate.QNAME).addChild(
+ ImmutableNodes.mapEntry(Candidate.QNAME, CANDIDATE_NAME, memberName)).build();
+
+ MapEntryNode entityNode = ImmutableNodes.mapEntryBuilder(ENTITY_QNAME, ENTITY_ID, entity.getId()).
+ addChild(candidateNode).build();
+
+ MapEntryNode entityTypeNode = ImmutableNodes.mapEntryBuilder(EntityType.QNAME, ENTITY_TYPE, entity.getType()).
+ addChild(ImmutableNodes.mapNodeBuilder(ENTITY_QNAME).addChild(entityNode).build()).build();
+
+ return ImmutableContainerNodeBuilder.create().withNodeIdentifier(new NodeIdentifier(EntityOwners.QNAME)).
+ addChild(ImmutableNodes.mapNodeBuilder(EntityType.QNAME).addChild(entityTypeNode).build()).build();
+ }
+
+ private void tryCommitModifications(final BatchedModifications modifications) {
+ if(isLeader()) {
+ if(isIsolatedLeader()) {
+ LOG.debug("Leader is isolated - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+ retryModifications.add(modifications);
+ } else {
+ LOG.debug("Committing BatchedModifications {} locally", modifications.getTransactionID());
+
+ // Note that it's possible the commit won't get consensus and will timeout and not be applied
+ // to the state. However we don't need to retry it in that case b/c it will be committed to
+ // the journal first and, once a majority of followers come back on line and it is replicated,
+ // it will be applied at that point.
+ handleBatchedModificationsLocal(modifications, self());
+ }
+ } else {
+ final ActorSelection leader = getLeader();
+ if (leader != null) {
+ LOG.debug("Sending BatchedModifications {} to leader {}", modifications.getTransactionID(), leader);
+
+ Future<Object> future = Patterns.ask(leader, modifications, TimeUnit.SECONDS.toMillis(
+ getDatastoreContext().getShardTransactionCommitTimeoutInSeconds()));
+ future.onComplete(new OnComplete<Object>() {
+ @Override
+ public void onComplete(Throwable failure, Object response) {
+ if(failure != null) {
+ if(failure instanceof AskTimeoutException) {
+ LOG.debug("BatchedModifications {} to leader {} timed out - retrying",
+ modifications.getTransactionID(), leader);
+ tryCommitModifications(modifications);
+ } else {
+ LOG.error("BatchedModifications {} to leader {} failed",
+ modifications.getTransactionID(), leader, failure);
+ }
+ } else {
+ LOG.debug("BatchedModifications {} to leader {} succeeded",
+ modifications.getTransactionID(), leader);
+ }
+ }
+ }, getContext().dispatcher());
+ } else {
+ LOG.debug("No leader - adding BatchedModifications {} for retry", modifications.getTransactionID());
+
+ retryModifications.add(modifications);
+ }
+ }
+ }
+
+ @Override
+ protected void onStateChanged() {
+ super.onStateChanged();
+
+ if(!retryModifications.isEmpty() && getLeader() != null && !isIsolatedLeader()) {
+ LOG.debug("# BatchedModifications to retry {}", retryModifications.size());
+
+ List<BatchedModifications> retryModificationsCopy = new ArrayList<>(retryModifications);
+ retryModifications.clear();
+ for(BatchedModifications mods: retryModificationsCopy) {
+ tryCommitModifications(mods);
+ }
+ }
+ }
+
private void onUnregisterCandidateLocal(UnregisterCandidateLocal unregisterCandidate) {
// TODO - implement
getSender().tell(SuccessReply.INSTANCE, getSelf());
}
public static Props props(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext));
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext, final String localMemberName) {
+ return Props.create(new Creator(name, peerAddresses, datastoreContext, schemaContext, localMemberName));
}
private static class Creator extends AbstractShardCreator {
private static final long serialVersionUID = 1L;
+ private final String localMemberName;
+
Creator(final ShardIdentifier name, final Map<String, String> peerAddresses,
- final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
+ final DatastoreContext datastoreContext, final SchemaContext schemaContext,
+ final String localMemberName) {
super(name, peerAddresses, datastoreContext, schemaContext);
+ this.localMemberName = localMemberName;
}
@Override
public Shard create() throws Exception {
- return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext);
+ return new EntityOwnershipShard(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
}
}
}
* @author Thomas Pantelis
*/
class EntityOwnershipShardPropsCreator implements ShardPropsCreator {
+ private final String localMemberName;
+
+ EntityOwnershipShardPropsCreator(String localMemberName) {
+ this.localMemberName = localMemberName;
+ }
@Override
public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
- return EntityOwnershipShard.props(shardId, peerAddresses, datastoreContext, schemaContext);
+ return EntityOwnershipShard.props(shardId, peerAddresses, datastoreContext, schemaContext, localMemberName);
}
}
list entity {
key id;
leaf id {
- type string;
+ type instance-identifier;
}
+ leaf owner {
+ type string;
+ }
+
// A list of all the candidates that would like to own the entity
list candidate {
- key id;
- leaf id {
+ key name;
+ leaf name {
type string;
}
}
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
return cohort;
}
- public static NormalizedNode<?,?> readStore(final TestActorRef<Shard> shard, final YangInstanceIdentifier id)
+ public static NormalizedNode<?,?> readStore(final TestActorRef<? extends Shard> shard, final YangInstanceIdentifier id)
throws ExecutionException, InterruptedException {
return readStore(shard.underlyingActor().getDataStore().getDataTree(), id);
}
public class ShardTestKit extends JavaTestKit {
- protected ShardTestKit(ActorSystem actorSystem) {
+ public ShardTestKit(ActorSystem actorSystem) {
super(actorSystem);
}
- protected void waitForLogMessage(final Class<?> logLevel, ActorRef subject, String logMessage){
+ public void waitForLogMessage(final Class<?> logLevel, ActorRef subject, String logMessage){
// Wait for a specific log message to show up
final boolean result =
new JavaTestKit.EventFilter<Boolean>(logLevel
}
- protected void waitUntilLeader(ActorRef shard) {
+ public void waitUntilLeader(ActorRef shard) {
FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 20 * 5; i++) {
Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
Assert.fail("Leader not found for shard " + shard.path());
}
- protected void waitUntilNoLeader(ActorRef shard) {
+ public void waitUntilNoLeader(ActorRef shard) {
FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 20 * 5; i++) {
Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import com.google.common.base.Optional;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.EntityOwners;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.EntityType;
+import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifierWithPredicates;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
+import org.opendaylight.yangtools.yang.data.api.schema.DataContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * Abstract base class providing utility methods.
+ *
+ * @author Thomas Pantelis
+ */
+public class AbstractEntityOwnershipTest extends AbstractActorTest {
+ protected void verifyEntityCandidate(NormalizedNode<?, ?> node, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ try {
+ assertNotNull("Missing " + EntityOwners.QNAME.toString(), node);
+ assertTrue(node instanceof ContainerNode);
+
+ ContainerNode entityOwnersNode = (ContainerNode) node;
+
+ MapEntryNode entityTypeEntry = getMapEntryNodeChild(entityOwnersNode, EntityType.QNAME,
+ EntityOwnershipShard.ENTITY_TYPE, entityType);
+
+ MapEntryNode entityEntry = getMapEntryNodeChild(entityTypeEntry, EntityOwnershipShard.ENTITY_QNAME,
+ EntityOwnershipShard.ENTITY_ID, entityId);
+
+ getMapEntryNodeChild(entityEntry, Candidate.QNAME, EntityOwnershipShard.CANDIDATE_NAME, candidateName);
+ } catch(AssertionError e) {
+ throw new AssertionError("Verification of enitity candidate failed - returned data was: " + node, e);
+ }
+ }
+
+ protected MapEntryNode getMapEntryNodeChild(DataContainerNode<? extends PathArgument> parent, QName childMap,
+ QName child, Object key) {
+ Optional<DataContainerChild<? extends PathArgument, ?>> childNode =
+ parent.getChild(new NodeIdentifier(childMap));
+ assertEquals("Missing " + childMap.toString(), true, childNode.isPresent());
+
+ MapNode entityTypeMapNode = (MapNode) childNode.get();
+ Optional<MapEntryNode> entityTypeEntry = entityTypeMapNode.getChild(new NodeIdentifierWithPredicates(
+ childMap, child, key));
+ assertEquals("Missing " + childMap.toString() + " entry for " + key, true, entityTypeEntry.isPresent());
+ return entityTypeEntry.get();
+ }
+}
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.actor.Props;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Collections;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategy;
+import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
import org.opendaylight.controller.cluster.datastore.utils.MockClusterWrapper;
import org.opendaylight.controller.cluster.datastore.utils.MockConfiguration;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.yangtools.yang.common.QName;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.Await;
import scala.concurrent.Future;
*
* @author Thomas Pantelis
*/
-public class DistributedEntityOwnershipServiceTest extends AbstractActorTest {
+public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnershipTest {
static String ENTITY_TYPE = "test";
static String ENTITY_TYPE2 = "test2";
static int ID_COUNTER = 1;
public void setUp() {
DatastoreContext datastoreContext = DatastoreContext.newBuilder().dataStoreType(dataStoreType).
shardInitializationTimeout(10, TimeUnit.SECONDS).build();
- dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(),
- new MockConfiguration(Collections.<String, List<String>>emptyMap()), datastoreContext );
- dataStore.onGlobalContextUpdated(TestModel.createTestContext());
+ // FIXME - remove this MockConfiguration and use the production ConfigurationImpl class when the
+ // DistributedEntityOwnershipService is changed to setup the ShardStrategy for the entity-owners module.
+ MockConfiguration configuration = new MockConfiguration(Collections.<String, List<String>>emptyMap()) {
+ @Override
+ public Optional<String> getModuleNameFromNameSpace(String nameSpace) {
+ return Optional.of("entity-owners");
+ }
+
+ @Override
+ public Map<String, ShardStrategy> getModuleNameToShardStrategyMap() {
+ return ImmutableMap.<String, ShardStrategy>builder().put("entity-owners", new ShardStrategy() {
+ @Override
+ public String findShard(YangInstanceIdentifier path) {
+ return DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
+ }
+ }).build();
+ }
+ };
+
+ dataStore = new DistributedDataStore(getSystem(), new MockClusterWrapper(), configuration, datastoreContext );
+
+ dataStore.onGlobalContextUpdated(SchemaContextHelper.entityOwners());
+
+ ShardStrategyFactory.setConfiguration(configuration);
}
@After
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
- Entity entity = new Entity(ENTITY_TYPE, YangInstanceIdentifier.of(QNAME));
+ YangInstanceIdentifier entityId = YangInstanceIdentifier.of(QNAME);
+ Entity entity = new Entity(ENTITY_TYPE, entityId);
EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
EntityOwnershipCandidateRegistration reg = service.registerCandidate(entity, candidate);
verifyEntityOwnershipCandidateRegistration(entity, reg);
verifyRegisterCandidateLocal(shardPropsCreator, entity, candidate);
+ verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE, entityId,
+ dataStore.getActorContext().getCurrentMemberName());
// Register the same entity - should throw exception
// Register a different entity - should succeed
- Entity entity2 = new Entity(ENTITY_TYPE2, YangInstanceIdentifier.of(QNAME));
+ Entity entity2 = new Entity(ENTITY_TYPE2, entityId);
shardPropsCreator.expectShardMessage(RegisterCandidateLocal.class);
EntityOwnershipCandidateRegistration reg2 = service.registerCandidate(entity2, candidate);
verifyEntityOwnershipCandidateRegistration(entity2, reg2);
verifyRegisterCandidateLocal(shardPropsCreator, entity2, candidate);
+ verifyEntityCandidate(readEntityOwners(service.getLocalEntityOwnershipShard()), ENTITY_TYPE2, entityId,
+ dataStore.getActorContext().getCurrentMemberName());
service.close();
}
public void testRegisterListener() {
}
+ private NormalizedNode<?, ?> readEntityOwners(ActorRef shard) throws Exception {
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+ DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
+ Optional<NormalizedNode<?, ?>> optional = readTx.read(EntityOwnershipShard.ENTITY_OWNERS_PATH).
+ checkedGet(5, TimeUnit.SECONDS);
+ if(optional.isPresent()) {
+ return optional.get();
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ return null;
+ }
+
private void verifyRegisterCandidateLocal(final TestShardPropsCreator shardPropsCreator, Entity entity,
EntityOwnershipCandidate candidate) {
RegisterCandidateLocal regCandidate = shardPropsCreator.waitForShardMessage();
}
static class TestShardPropsCreator extends EntityOwnershipShardPropsCreator {
+ TestShardPropsCreator() {
+ super("member-1");
+ }
+
private final AtomicReference<CountDownLatch> messageReceived = new AtomicReference<>();
private final AtomicReference<Object> receivedMessage = new AtomicReference<>();
private final AtomicReference<Class<?>> messageClass = new AtomicReference<>();
public Props newProps(ShardIdentifier shardId, Map<String, String> peerAddresses,
DatastoreContext datastoreContext, SchemaContext schemaContext) {
return Props.create(TestEntityOwnershipShard.class, shardId, peerAddresses, datastoreContext,
- schemaContext, messageClass, messageReceived, receivedMessage);
+ schemaContext, "member-1", messageClass, messageReceived, receivedMessage);
}
@SuppressWarnings("unchecked")
private final AtomicReference<Class<?>> messageClass;
protected TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
- DatastoreContext datastoreContext, SchemaContext schemaContext, AtomicReference<Class<?>> messageClass,
- AtomicReference<CountDownLatch> messageReceived, AtomicReference<Object> receivedMessage) {
- super(name, peerAddresses, datastoreContext, schemaContext);
+ DatastoreContext datastoreContext, SchemaContext schemaContext, String localMemberName,
+ AtomicReference<Class<?>> messageClass, AtomicReference<CountDownLatch> messageReceived,
+ AtomicReference<Object> receivedMessage) {
+ super(name, peerAddresses, datastoreContext, schemaContext, localMemberName);
this.messageClass = messageClass;
this.messageReceived = messageReceived;
this.receivedMessage = receivedMessage;
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.dispatch.Dispatchers;
+import akka.testkit.TestActorRef;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext;
+import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
+import org.opendaylight.controller.cluster.datastore.ShardTestKit;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
+import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
+import org.opendaylight.controller.cluster.raft.messages.RequestVote;
+import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
+/**
+ * Unit tests for EntityOwnershipShard.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
+ private static final String ENTITY_TYPE = "test type";
+ private static final YangInstanceIdentifier ENTITY_ID1 =
+ YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
+ private static final YangInstanceIdentifier ENTITY_ID2 =
+ YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
+ private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
+ private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
+ private static final String LOCAL_MEMBER_NAME = "member-1";
+
+ private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
+ .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
+
+ private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
+ private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+ @After
+ public void tearDown() {
+ actorFactory.close();
+ }
+
+ @Test
+ public void testOnRegisterCandidateLocal() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+
+ kit.waitUntilLeader(shard);
+
+ YangInstanceIdentifier entityId = ENTITY_ID1;
+ Entity entity = new Entity(ENTITY_TYPE, entityId);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+ }
+
+ @Test
+ public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
+
+ String peerId = actorFactory.generateActorId("follower");
+ TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+ ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ YangInstanceIdentifier entityId = ENTITY_ID1;
+ Entity entity = new Entity(ENTITY_TYPE, entityId);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Now grant the vote so the shard becomes the leader. This should retry the commit.
+ peer.underlyingActor().grantVote = true;
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+ }
+
+ @Test
+ public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
+ shardTransactionCommitTimeoutInSeconds(1);
+
+ String peerId = actorFactory.generateActorId("follower");
+ TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+ MockFollower follower = peer.underlyingActor();
+ follower.grantVote = true;
+
+ // Drop AppendEntries so consensus isn't reached.
+ follower.dropAppendEntries = true;
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+ ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ kit.waitUntilLeader(shard);
+
+ YangInstanceIdentifier entityId = ENTITY_ID1;
+ Entity entity = new Entity(ENTITY_TYPE, entityId);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Wait enough time for the commit to timeout.
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
+ // write being applied to the state.
+ follower.dropAppendEntries = false;
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+ }
+
+ @Test
+ public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
+ shardIsolatedLeaderCheckIntervalInMillis(50);
+
+ String peerId = actorFactory.generateActorId("follower");
+ TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+ MockFollower follower = peer.underlyingActor();
+ follower.grantVote = true;
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+ ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ kit.waitUntilLeader(shard);
+
+ // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
+ follower.dropAppendEntries = true;
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+ YangInstanceIdentifier entityId = ENTITY_ID1;
+ Entity entity = new Entity(ENTITY_TYPE, entityId);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Resume AppendEntries - the candidate write should now be committed.
+ follower.dropAppendEntries = false;
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+ }
+
+ @Test
+ public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
+
+ String peerId = actorFactory.generateActorId("leader");
+ TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
+
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
+ ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()));
+
+ shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
+ DataStoreVersions.CURRENT_VERSION), peer);
+
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ MockLeader leader = peer.underlyingActor();
+ assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+ leader.modificationsReceived, 5, TimeUnit.SECONDS));
+ verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+ leader.modificationsReceived = new CountDownLatch(2);
+ leader.sendReply = false;
+
+ shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
+
+ shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
+ leader.modificationsReceived, 5, TimeUnit.SECONDS));
+ verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
+ }
+
+ private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) throws Exception {
+ verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
+ }
+
+ private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) throws Exception {
+ assertEquals("BatchedModifications size", 1, mods.getModifications().size());
+ assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
+ verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
+ entityId, candidateName);
+ }
+
+ private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
+ NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, EntityOwnershipShard.ENTITY_OWNERS_PATH);
+ if(node != null) {
+ return node;
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ return null;
+ }
+
+ private Props newShardProps() {
+ return newShardProps(Collections.<String,String>emptyMap());
+ }
+
+ private Props newShardProps(Map<String,String> peers) {
+ return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
+ LOCAL_MEMBER_NAME);
+ }
+
+ public static class MockFollower extends UntypedActor {
+ volatile boolean grantVote;
+ volatile boolean dropAppendEntries;
+ private final String myId;
+
+ public MockFollower(String myId) {
+ this.myId = myId;
+ }
+
+ @Override
+ public void onReceive(Object message) {
+ if(message instanceof RequestVote) {
+ if(grantVote) {
+ getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
+ }
+ } else if(message instanceof AppendEntries) {
+ if(!dropAppendEntries) {
+ AppendEntries req = (AppendEntries) message;
+ long lastIndex = req.getLeaderCommit();
+ if (req.getEntries().size() > 0) {
+ for(ReplicatedLogEntry entry : req.getEntries()) {
+ lastIndex = entry.getIndex();
+ }
+ }
+
+ getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
+ DataStoreVersions.CURRENT_VERSION), getSelf());
+ }
+ }
+ }
+ }
+
+ public static class MockLeader extends UntypedActor {
+ volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
+ volatile BatchedModifications receivedModifications;
+ volatile boolean sendReply = true;
+
+ @Override
+ public void onReceive(Object message) {
+ if(message instanceof BatchedModifications) {
+ receivedModifications = (BatchedModifications) message;
+ modificationsReceived.countDown();
+ if(sendReply) {
+ getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+ } else {
+ sendReply = true;
+ }
+ }
+ }
+ }
+}
package org.opendaylight.controller.md.cluster.datastore.model;
+import com.google.common.io.Resources;
+import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.opendaylight.yangtools.yang.model.api.Module;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException;
import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl;
public class SchemaContextHelper {
return parser.resolveSchemaContext(modules);
}
+ public static SchemaContext entityOwners() {
+ YangParserImpl parser = new YangParserImpl();
+ try {
+ File file = new File("src/main/yang/entity-owners.yang");
+ return parser.parseSources(Arrays.asList(Resources.asByteSource(file.toURI().toURL())));
+ } catch (IOException | YangSyntaxErrorException e) {
+ throw new ExceptionInInitializerError(e);
+ }
+ }
}