@Override
public void onComplete(Throwable failure, Object response) {
if(failure != null) {
- LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME);
+ LOG.error("Failed to create {} shard", ENTITY_OWNERSHIP_SHARD_NAME, failure);
} else {
LOG.info("Successfully created {} shard", ENTITY_OWNERSHIP_SHARD_NAME);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Address;
+import akka.actor.AddressFromURIString;
+import akka.cluster.Cluster;
+import akka.cluster.ClusterEvent.CurrentClusterState;
+import akka.cluster.Member;
+import akka.cluster.MemberStatus;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.ConfigFactory;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+
+/**
+ * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
+ * config and (optional) operational data store instances. The Builder is used to specify the setup
+ * parameters and create the data store instances. The actor system is automatically joined to address
+ * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
+ *
+ * @author Thomas Pantelis
+ */
+public class MemberNode {
+ static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+
+ private IntegrationTestKit kit;
+ private DistributedDataStore configDataStore;
+ private DistributedDataStore operDataStore;
+ private DatastoreContext.Builder datastoreContextBuilder;
+ private boolean cleanedUp;
+
+ /**
+ * Constructs a Builder.
+ *
+ * @param members the list to which the resulting MemberNode will be added. This makes it easier for
+ * callers to cleanup instances on test completion.
+ * @return a Builder instance
+ */
+ public static Builder builder(List<MemberNode> members) {
+ return new Builder(members);
+ }
+
+ public IntegrationTestKit kit() {
+ return kit;
+ }
+
+
+ public DistributedDataStore configDataStore() {
+ return configDataStore;
+ }
+
+
+ public DistributedDataStore operDataStore() {
+ return operDataStore;
+ }
+
+ public DatastoreContext.Builder datastoreContextBuilder() {
+ return datastoreContextBuilder;
+ }
+
+ public void waitForMembersUp(String... otherMembers) {
+ Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ CurrentClusterState state = Cluster.get(kit.getSystem()).state();
+ for(Member m: state.getMembers()) {
+ if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
+ otherMembersSet.isEmpty()) {
+ return;
+ }
+ }
+
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+
+ fail("Member(s) " + otherMembersSet + " are not Up");
+ }
+
+ public void cleanup() {
+ if(!cleanedUp) {
+ cleanedUp = true;
+ kit.cleanup(configDataStore);
+ kit.cleanup(operDataStore);
+ kit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ }
+ }
+
+ public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
+ throws Exception {
+ ActorContext actorContext = datastore.getActorContext();
+
+ Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
+ ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
+
+ AssertionError lastError = null;
+ Stopwatch sw = Stopwatch.createStarted();
+ while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
+ executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
+
+ try {
+ verifier.verify(raftState);
+ return;
+ } catch (AssertionError e) {
+ lastError = e;
+ Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ throw lastError;
+ }
+
+ public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
+ String... peerMemberNames) throws Exception {
+ final Set<String> peerIds = Sets.newHashSet();
+ for(String p: peerMemberNames) {
+ peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
+ type(datastore.getActorContext().getDataStoreType()).build().toString());
+ }
+
+ verifyRaftState(datastore, shardName, new RaftStateVerifier() {
+ @Override
+ public void verify(OnDemandRaftState raftState) {
+ assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName,
+ raftState.getPeerAddresses().keySet().containsAll(peerIds));
+ }
+ });
+ }
+
+ public static class Builder {
+ private final List<MemberNode> members;
+ private String moduleShardsConfig;
+ private String akkaConfig;
+ private String[] waitForshardLeader = new String[0];
+ private String testName;
+ private SchemaContext schemaContext;
+ private boolean createOperDatastore = true;
+ private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
+ shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
+
+ Builder(List<MemberNode> members) {
+ this.members = members;
+ }
+
+ /**
+ * Specifies the name of the module shards config file. This is required.
+ *
+ * @return this Builder
+ */
+ public Builder moduleShardsConfig(String moduleShardsConfig) {
+ this.moduleShardsConfig = moduleShardsConfig;
+ return this;
+ }
+
+ /**
+ * Specifies the name of the akka configuration. This is required.
+ *
+ * @return this Builder
+ */
+ public Builder akkaConfig(String akkaConfig) {
+ this.akkaConfig = akkaConfig;
+ return this;
+ }
+
+ /**
+ * Specifies the name of the test that is appended to the data store names. This is required.
+ *
+ * @return this Builder
+ */
+ public Builder testName(String testName) {
+ this.testName = testName;
+ return this;
+ }
+
+ /**
+ * Specifies the optional names of the shards to initially wait for a leader to be elected.
+ *
+ * @return this Builder
+ */
+ public Builder waitForShardLeader(String... shardNames) {
+ this.waitForshardLeader = shardNames;
+ return this;
+ }
+
+ /**
+ * Specifies whether or not to create an operational data store. Defaults to true.
+ *
+ * @return this Builder
+ */
+ public Builder createOperDatastore(boolean value) {
+ this.createOperDatastore = value;
+ return this;
+ }
+
+ /**
+ * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
+ *
+ * @return this Builder
+ */
+ public Builder schemaContext(SchemaContext schemaContext) {
+ this.schemaContext = schemaContext;
+ return this;
+ }
+
+ /**
+ * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
+ *
+ * @return this Builder
+ */
+ public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
+ datastoreContextBuilder = builder;
+ return this;
+ }
+
+ public MemberNode build() {
+ Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
+ Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
+ Preconditions.checkNotNull(testName, "testName must be specified");
+
+ if(schemaContext == null) {
+ schemaContext = SchemaContextHelper.full();
+ }
+
+ MemberNode node = new MemberNode();
+ node.datastoreContextBuilder = datastoreContextBuilder;
+
+ ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
+ Cluster.get(system).join(MEMBER_1_ADDRESS);
+
+ node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
+
+ String memberName = new ClusterWrapperImpl(system).getCurrentMemberName();
+ node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
+ node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
+ true, schemaContext, waitForshardLeader);
+
+ if(createOperDatastore) {
+ node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
+ node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
+ true, schemaContext, waitForshardLeader);
+ }
+
+ members.add(node);
+ return node;
+ }
+ }
+
+ public static interface RaftStateVerifier {
+ void verify(OnDemandRaftState raftState);
+ }
+}
\ No newline at end of file
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftPeersPresent;
+import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.AddressFromURIString;
import akka.actor.PoisonPill;
import akka.cluster.Cluster;
-import akka.cluster.ClusterEvent.CurrentClusterState;
-import akka.cluster.Member;
-import akka.cluster.MemberStatus;
-import akka.testkit.JavaTestKit;
import com.google.common.base.Optional;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import org.opendaylight.controller.cluster.datastore.ClusterWrapperImpl;
-import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
+import org.opendaylight.controller.cluster.datastore.MemberNode;
+import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.yang.common.RpcError;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
/**
* Unit tests for ClusterAdminRpcService.
* @author Thomas Pantelis
*/
public class ClusterAdminRpcServiceTest {
- private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
-
private final List<MemberNode> memberNodes = new ArrayList<>();
+ @Before
+ public void setUp() {
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
+ }
+
@After
public void tearDown() {
for(MemberNode m: memberNodes) {
String fileName = "target/testBackupDatastore";
new File(fileName).delete();
- ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore, node.operDataStore);
+ ClusterAdminRpcService service = new ClusterAdminRpcService(node.configDataStore(), node.operDataStore());
RpcResult<Void> rpcResult = service .backupDatastore(new BackupDatastoreInputBuilder().
setFilePath(fileName).build()).get(5, TimeUnit.SECONDS);
ImmutableMap<String, DatastoreSnapshot> map = ImmutableMap.of(snapshots.get(0).getType(), snapshots.get(0),
snapshots.get(1).getType(), snapshots.get(1));
- verifyDatastoreSnapshot(node.configDataStore.getActorContext().getDataStoreType(),
- map.get(node.configDataStore.getActorContext().getDataStoreType()), "cars", "people");
+ verifyDatastoreSnapshot(node.configDataStore().getActorContext().getDataStoreType(),
+ map.get(node.configDataStore().getActorContext().getDataStoreType()), "cars", "people");
} finally {
new File(fileName).delete();
}
// Test failure by killing a shard.
- node.configDataStore.getActorContext().getShardManager().tell(node.datastoreContextBuilder.
+ node.configDataStore().getActorContext().getShardManager().tell(node.datastoreContextBuilder().
shardInitializationTimeout(200, TimeUnit.MILLISECONDS).build(), ActorRef.noSender());
- ActorRef carsShardActor = node.configDataStore.getActorContext().findLocalShard("cars").get();
- node.kit.watch(carsShardActor);
+ ActorRef carsShardActor = node.configDataStore().getActorContext().findLocalShard("cars").get();
+ node.kit().watch(carsShardActor);
carsShardActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- node.kit.expectTerminated(carsShardActor);
+ node.kit().expectTerminated(carsShardActor);
rpcResult = service.backupDatastore(new BackupDatastoreInputBuilder().setFilePath(fileName).build()).
get(5, TimeUnit.SECONDS);
testAddShardReplica(newReplicaNode3, "cars", "member-1", "member-2");
- verifyRaftPeersPresent(newReplicaNode2.configDataStore, "cars", "member-1", "member-3");
- verifyRaftPeersPresent(newReplicaNode2.operDataStore, "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(newReplicaNode2.configDataStore(), "cars", "member-1", "member-3");
+ verifyRaftPeersPresent(newReplicaNode2.operDataStore(), "cars", "member-1", "member-3");
// Write data to member-2's config datastore and read/verify via member-3
- NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore,
- newReplicaNode3.configDataStore);
+ NormalizedNode<?, ?> configCarsNode = writeCarsNodeAndVerify(newReplicaNode2.configDataStore(),
+ newReplicaNode3.configDataStore());
// Write data to member-3's oper datastore and read/verify via member-2
- writeCarsNodeAndVerify(newReplicaNode3.operDataStore, newReplicaNode2.operDataStore);
+ writeCarsNodeAndVerify(newReplicaNode3.operDataStore(), newReplicaNode2.operDataStore());
// Verify all data has been replicated. We expect 3 log entries and thus last applied index of 2 -
// 2 ServerConfigurationPayload entries and the transaction payload entry.
}
};
- verifyRaftState(leaderNode1.configDataStore, "cars", verifier);
- verifyRaftState(leaderNode1.operDataStore, "cars", verifier);
+ verifyRaftState(leaderNode1.configDataStore(), "cars", verifier);
+ verifyRaftState(leaderNode1.operDataStore(), "cars", verifier);
- verifyRaftState(newReplicaNode2.configDataStore, "cars", verifier);
- verifyRaftState(newReplicaNode2.operDataStore, "cars", verifier);
+ verifyRaftState(newReplicaNode2.configDataStore(), "cars", verifier);
+ verifyRaftState(newReplicaNode2.operDataStore(), "cars", verifier);
- verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier);
- verifyRaftState(newReplicaNode3.operDataStore, "cars", verifier);
+ verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
+ verifyRaftState(newReplicaNode3.operDataStore(), "cars", verifier);
// Restart member-3 and verify the cars config shard is re-instated.
- Cluster.get(leaderNode1.kit.getSystem()).down(Cluster.get(newReplicaNode3.kit.getSystem()).selfAddress());
+ Cluster.get(leaderNode1.kit().getSystem()).down(Cluster.get(newReplicaNode3.kit().getSystem()).selfAddress());
newReplicaNode3.cleanup();
newReplicaNode3 = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
moduleShardsConfig(moduleShardsConfig).createOperDatastore(false).build();
- verifyRaftState(newReplicaNode3.configDataStore, "cars", verifier);
- readCarsNodeAndVerify(newReplicaNode3.configDataStore, configCarsNode);
+ verifyRaftState(newReplicaNode3.configDataStore(), "cars", verifier);
+ readCarsNodeAndVerify(newReplicaNode3.configDataStore(), configCarsNode);
}
private NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
throws Exception {
memberNode.waitForMembersUp(peerMemberNames);
- ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore,
- memberNode.operDataStore);
+ ClusterAdminRpcService service = new ClusterAdminRpcService(memberNode.configDataStore(),
+ memberNode.operDataStore());
RpcResult<Void> rpcResult = service.addShardReplica(new AddShardReplicaInputBuilder().setShardName(shardName).
build()).get(10, TimeUnit.SECONDS);
checkSuccessfulRpcResult(rpcResult);
- verifyRaftPeersPresent(memberNode.configDataStore, shardName, peerMemberNames);
- verifyRaftPeersPresent(memberNode.operDataStore, shardName, peerMemberNames);
+ verifyRaftPeersPresent(memberNode.configDataStore(), shardName, peerMemberNames);
+ verifyRaftPeersPresent(memberNode.operDataStore(), shardName, peerMemberNames);
service.close();
}
- private void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName, String... peerMemberNames)
- throws Exception {
- final Set<String> peerIds = Sets.newHashSet();
- for(String p: peerMemberNames) {
- peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
- type(datastore.getActorContext().getDataStoreType()).build().toString());
- }
-
- verifyRaftState(datastore, shardName, new RaftStateVerifier() {
- @Override
- public void verify(OnDemandRaftState raftState) {
- assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName,
- raftState.getPeerAddresses().keySet().containsAll(peerIds));
- }
- });
- }
-
- private void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
- throws Exception {
- ActorContext actorContext = datastore.getActorContext();
-
- Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
- ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
-
- AssertionError lastError = null;
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
- OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
- executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
-
- try {
- verifier.verify(raftState);
- return;
- } catch (AssertionError e) {
- lastError = e;
- Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
- }
- }
-
- throw lastError;
- }
-
private void checkSuccessfulRpcResult(RpcResult<Void> rpcResult) {
if(!rpcResult.isSuccessful()) {
if(rpcResult.getErrors().size() > 0) {
public void testConvertMembersToNonvotingForAllShards() {
// TODO implement
}
-
- private static class MemberNode {
- IntegrationTestKit kit;
- DistributedDataStore configDataStore;
- DistributedDataStore operDataStore;
- final DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
- shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
- boolean cleanedUp;
-
- static Builder builder(List<MemberNode> members) {
- return new Builder(members);
- }
-
- void waitForMembersUp(String... otherMembers) {
- Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
- Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
- CurrentClusterState state = Cluster.get(kit.getSystem()).state();
- for(Member m: state.getMembers()) {
- if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
- otherMembersSet.isEmpty()) {
- return;
- }
- }
-
- Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
- }
-
- fail("Member(s) " + otherMembersSet + " are not Up");
- }
-
- void cleanup() {
- if(!cleanedUp) {
- cleanedUp = true;
- kit.cleanup(configDataStore);
- kit.cleanup(operDataStore);
- JavaTestKit.shutdownActorSystem(kit.getSystem());
- }
- }
-
- static class Builder {
- List<MemberNode> members;
- String moduleShardsConfig;
- String akkaConfig;
- String[] waitForshardLeader = new String[0];
- String testName;
- boolean createOperDatastore = true;
-
- Builder(List<MemberNode> members) {
- this.members = members;
- }
-
- Builder moduleShardsConfig(String moduleShardsConfig) {
- this.moduleShardsConfig = moduleShardsConfig;
- return this;
- }
-
- Builder akkaConfig(String akkaConfig) {
- this.akkaConfig = akkaConfig;
- return this;
- }
-
- Builder testName(String testName) {
- this.testName = testName;
- return this;
- }
-
- Builder waitForShardLeader(String... shardNames) {
- this.waitForshardLeader = shardNames;
- return this;
- }
-
- Builder createOperDatastore(boolean value) {
- this.createOperDatastore = value;
- return this;
- }
-
- MemberNode build() {
- MemberNode node = new MemberNode();
- ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
- Cluster.get(system).join(MEMBER_1_ADDRESS);
-
- node.kit = new IntegrationTestKit(system, node.datastoreContextBuilder);
-
- String memberName = new ClusterWrapperImpl(system).getCurrentMemberName();
- node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
- node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
- true, waitForshardLeader);
-
- if(createOperDatastore) {
- node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
- node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
- true, waitForshardLeader);
- }
-
- members.add(node);
- return node;
- }
- }
- }
-
- private static interface RaftStateVerifier {
- void verify(OnDemandRaftState raftState);
- }
}
import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.AddressFromURIString;
import akka.actor.Status.Failure;
import akka.actor.Status.Success;
-import akka.cluster.Cluster;
import akka.testkit.JavaTestKit;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.ConfigFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.mockito.MockitoAnnotations;
import org.opendaylight.controller.cluster.datastore.DatastoreContext;
import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
-import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
+import org.opendaylight.controller.cluster.datastore.MemberNode;
import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
+import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
+import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.controller.md.sal.common.api.clustering.CandidateAlreadyRegisteredException;
import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidateRegistration;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipService;
import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipState;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
* @author Thomas Pantelis
*/
public class DistributedEntityOwnershipIntegrationTest {
- private static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf";
private static final String ENTITY_TYPE1 = "entityType1";
private static final Entity ENTITY4 = new Entity(ENTITY_TYPE1, "entity4");
private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
- private ActorSystem leaderSystem;
- private ActorSystem follower1System;
- private ActorSystem follower2System;
-
private final DatastoreContext.Builder leaderDatastoreContextBuilder =
- DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
+ DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5).
+ shardIsolatedLeaderCheckIntervalInMillis(1000000);
private final DatastoreContext.Builder followerDatastoreContextBuilder =
DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
- private DistributedDataStore leaderDistributedDataStore;
- private DistributedDataStore follower1DistributedDataStore;
- private DistributedDataStore follower2DistributedDataStore;
- private DistributedEntityOwnershipService leaderEntityOwnershipService;
- private DistributedEntityOwnershipService follower1EntityOwnershipService;
- private DistributedEntityOwnershipService follower2EntityOwnershipService;
- private IntegrationTestKit leaderTestKit;
- private IntegrationTestKit follower1TestKit;
- private IntegrationTestKit follower2TestKit;
+ private final List<MemberNode> memberNodes = new ArrayList<>();
@Mock
private EntityOwnershipListener leaderMockListener;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ InMemoryJournal.clear();
+ InMemorySnapshotStore.clear();
}
@After
public void tearDown() {
- if(leaderSystem != null) {
- JavaTestKit.shutdownActorSystem(leaderSystem);
- }
-
- if(follower1System != null) {
- JavaTestKit.shutdownActorSystem(follower1System);
- }
-
- if(follower2System != null) {
- JavaTestKit.shutdownActorSystem(follower2System);
+ for(MemberNode m: memberNodes) {
+ m.cleanup();
}
}
- private void startAllSystems() {
- startLeaderSystem();
- startFollower1System();
- startFollower2System();
- }
-
- private void startFollower2System() {
- follower2System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member3"));
- Cluster.get(follower2System).join(MEMBER_1_ADDRESS);
- }
-
- private void startFollower1System() {
- follower1System = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member2"));
- Cluster.get(follower1System).join(MEMBER_1_ADDRESS);
+ private DistributedEntityOwnershipService newOwnershipService(DistributedDataStore datastore) {
+ DistributedEntityOwnershipService service = new DistributedEntityOwnershipService(datastore,
+ EntityOwnerSelectionStrategyConfig.newBuilder().build());
+ service.start();
+ return service;
}
- private void startLeaderSystem() {
- leaderSystem = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig("Member1"));
- Cluster.get(leaderSystem).join(MEMBER_1_ADDRESS);
- }
+ @Test
+ public void testFunctionalityWithThreeNodes() throws Exception {
+ String name = "test";
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(leaderDatastoreContextBuilder).build();
- private void initDatastores(String type) {
- initLeaderDatastore(type, MODULE_SHARDS_CONFIG);
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
- initFollower1Datastore(type, MODULE_SHARDS_CONFIG);
+ MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
- follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
- follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(
- type, MODULE_SHARDS_CONFIG, false, SCHEMA_CONTEXT);
+ DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
leaderDistributedDataStore.waitTillReady();
- follower1DistributedDataStore.waitTillReady();
- follower2DistributedDataStore.waitTillReady();
-
- startLeaderService();
-
- startFollower1Service();
-
- follower2EntityOwnershipService = new DistributedEntityOwnershipService(follower2DistributedDataStore,
- EntityOwnerSelectionStrategyConfig.newBuilder().build());
- follower2EntityOwnershipService.start();
-
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
- }
-
- private void startFollower1Service() {
- follower1EntityOwnershipService = new DistributedEntityOwnershipService(follower1DistributedDataStore,
- EntityOwnerSelectionStrategyConfig.newBuilder().build());
- follower1EntityOwnershipService.start();
- }
-
- private void startLeaderService() {
- leaderEntityOwnershipService = new DistributedEntityOwnershipService(leaderDistributedDataStore,
- EntityOwnerSelectionStrategyConfig.newBuilder().build());
- leaderEntityOwnershipService.start();
- }
+ follower1Node.configDataStore().waitTillReady();
+ follower2Node.configDataStore().waitTillReady();
- private void initFollower1Datastore(String type, String moduleConfig) {
- follower1TestKit = new IntegrationTestKit(follower1System, followerDatastoreContextBuilder);
- follower1DistributedDataStore = follower1TestKit.setupDistributedDataStore(
- type, moduleConfig, false, SCHEMA_CONTEXT);
- }
+ EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+ EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+ EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
- private void initLeaderDatastore(String type, String moduleConfig) {
- leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
- leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(
- type, moduleConfig, false, SCHEMA_CONTEXT);
- }
-
- @Test
- public void test() throws Exception {
- startAllSystems();
- initDatastores("test");
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
leaderEntityOwnershipService.registerListener(ENTITY_TYPE2, leaderMockListener2);
// Register follower1 candidate for entity2 and verify it becomes owner
- follower1EntityOwnershipService.registerCandidate(ENTITY2);
+ EntityOwnershipCandidateRegistration follower1Entity2Reg = follower1EntityOwnershipService.registerCandidate(ENTITY2);
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
reset(leaderMockListener, follower1MockListener);
// Unregister follower1 candidate for entity2 and verify follower2 becomes owner
- follower1EntityOwnershipService.unregisterCandidate(ENTITY2);
+ follower1Entity2Reg.close();
verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-3");
verifyOwner(leaderDistributedDataStore, ENTITY2, "member-3");
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, true));
// Shutdown follower2 and verify it's owned entities (entity 2 & 4) get re-assigned
reset(leaderMockListener, follower1MockListener);
- JavaTestKit.shutdownActorSystem(follower2System);
+ follower2Node.cleanup();
verify(follower1MockListener, timeout(15000).times(2)).ownershipChanged(or(ownershipChange(ENTITY4, false, true, true),
ownershipChange(ENTITY2, false, false, false)));
// Register leader candidate for entity2 and verify it becomes owner
- leaderEntityOwnershipService.registerCandidate(ENTITY2);
+ EntityOwnershipCandidateRegistration leaderEntity2Reg = leaderEntityOwnershipService.registerCandidate(ENTITY2);
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
// Unregister leader candidate for entity2 and verify the owner is cleared
- leaderEntityOwnershipService.unregisterCandidate(ENTITY2);
+ leaderEntity2Reg.close();
verifyOwner(leaderDistributedDataStore, ENTITY2, "");
verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, false));
verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, false));
*/
@Test
public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException {
- startAllSystems();
- initDatastores("testCloseCandidateRegistrationInQuickSuccession");
+ String name = "testCloseCandidateRegistrationInQuickSuccession";
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name ).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+
+ leaderDistributedDataStore.waitTillReady();
+ follower1Node.configDataStore().waitTillReady();
+ follower2Node.configDataStore().waitTillReady();
+
+ EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+ EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
+ EntityOwnershipService follower2EntityOwnershipService = newOwnershipService(follower2Node.configDataStore());
+
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener);
*/
@Test
public void testEntityOwnershipShardBootstrapping() throws Throwable {
- startLeaderSystem();
- startFollower1System();
- String type = "testEntityOwnershipShardBootstrapping";
- initLeaderDatastore(type, MODULE_SHARDS_MEMBER_1_CONFIG);
- initFollower1Datastore(type, MODULE_SHARDS_MEMBER_1_CONFIG);
+ String name = "testEntityOwnershipShardBootstrapping";
+ String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG;
+ MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(leaderDatastoreContextBuilder).build();
- leaderDistributedDataStore.waitTillReady();
+ DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+ EntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
+
+ leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name ).
+ moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ DistributedDataStore follower1DistributedDataStore = follower1Node.configDataStore();
follower1DistributedDataStore.waitTillReady();
- startLeaderService();
- startFollower1Service();
+ leaderNode.waitForMembersUp("member-2");
+ follower1Node.waitForMembersUp("member-1");
- leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+ EntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore);
leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
// Register a candidate for follower1 - should get queued since follower1 has no leader
- follower1EntityOwnershipService.registerCandidate(ENTITY1);
+ EntityOwnershipCandidateRegistration candidateReg = follower1EntityOwnershipService.registerCandidate(ENTITY1);
verify(leaderMockListener, timeout(300).never()).ownershipChanged(ownershipChange(ENTITY1));
// Add replica in follower1
AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
- follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1TestKit.getRef());
- Object reply = follower1TestKit.expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class);
+ follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica , follower1Node.kit().getRef());
+ Object reply = follower1Node.kit().expectMsgAnyClassOf(JavaTestKit.duration("5 sec"), Success.class, Failure.class);
if(reply instanceof Failure) {
- throw ((Failure)reply).cause();
+ throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause());
}
// The queued candidate registration should proceed
- verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1));
-
+ verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
}
- private static void verifyGetOwnershipState(DistributedEntityOwnershipService service, Entity entity,
+ private static void verifyGetOwnershipState(EntityOwnershipService service, Entity entity,
boolean isOwner, boolean hasOwner) {
Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
assertEquals("getOwnershipState present", true, state.isPresent());