*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.util.HashSet;
import java.util.List;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
/**
* Class that represents a cluster member node for unit tests. It encapsulates an actor system with
* @author Thomas Pantelis
*/
public class MemberNode {
- static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
private IntegrationTestKit kit;
- private DistributedDataStore configDataStore;
- private DistributedDataStore operDataStore;
+ private AbstractDataStore configDataStore;
+ private AbstractDataStore operDataStore;
private DatastoreContext.Builder datastoreContextBuilder;
private boolean cleanedUp;
* callers to cleanup instances on test completion.
* @return a Builder instance
*/
- public static Builder builder(List<MemberNode> members) {
+ public static Builder builder(final List<MemberNode> members) {
return new Builder(members);
}
}
- public DistributedDataStore configDataStore() {
+ public AbstractDataStore configDataStore() {
return configDataStore;
}
- public DistributedDataStore operDataStore() {
+ public AbstractDataStore operDataStore() {
return operDataStore;
}
return datastoreContextBuilder;
}
- public void waitForMembersUp(String... otherMembers) {
+ public void waitForMembersUp(final String... otherMembers) {
kit.waitForMembersUp(otherMembers);
}
- public void waitForMemberDown(String member) {
+ public void waitForMemberDown(final String member) {
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
+ while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
CurrentClusterState state = Cluster.get(kit.getSystem()).state();
- for(Member m: state.getUnreachable()) {
- if(member.equals(m.getRoles().iterator().next())) {
+ for (Member m : state.getUnreachable()) {
+ if (member.equals(m.getRoles().iterator().next())) {
return;
}
}
- for(Member m: state.getMembers()) {
- if(m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
+ for (Member m : state.getMembers()) {
+ if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
return;
}
}
fail("Member " + member + " is now down");
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void cleanup() {
- if(!cleanedUp) {
+ if (!cleanedUp) {
cleanedUp = true;
- kit.cleanup(configDataStore);
- kit.cleanup(operDataStore);
- IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ if (configDataStore != null) {
+ configDataStore.close();
+ }
+ if (operDataStore != null) {
+ operDataStore.close();
+ }
+
+ try {
+ IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ } catch (RuntimeException e) {
+ LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
+ }
}
}
- public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
- throws Exception {
- ActorContext actorContext = datastore.getActorContext();
+ public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
+ final RaftStateVerifier verifier) throws Exception {
+ ActorUtils actorUtils = datastore.getActorUtils();
- Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
- ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
+ Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
+ ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
AssertionError lastError = null;
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
- OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
- executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
+ while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ OnDemandRaftState raftState = (OnDemandRaftState)actorUtils
+ .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
try {
verifier.verify(raftState);
throw lastError;
}
- public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
- String... peerMemberNames) throws Exception {
- final Set<String> peerIds = Sets.newHashSet();
- for(String p: peerMemberNames) {
- peerIds.add(ShardIdentifier.builder().memberName(MemberName.forName(p)).shardName(shardName).
- type(datastore.getActorContext().getDataStoreName()).build().toString());
+ public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
+ final String... peerMemberNames) throws Exception {
+ final Set<String> peerIds = new HashSet<>();
+ for (String p: peerMemberNames) {
+ peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
+ datastore.getActorUtils().getDataStoreName()).toString());
}
- verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds, raftState.getPeerAddresses().keySet()));
+ verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
+ raftState.getPeerAddresses().keySet()));
}
- public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
+ public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
Stopwatch sw = Stopwatch.createStarted();
- while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
- Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
- if(!shardReply.isPresent()) {
+ while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
+ Optional<ActorRef> shardReply = datastore.getActorUtils().findLocalShard(shardName);
+ if (!shardReply.isPresent()) {
return;
}
private final List<MemberNode> members;
private String moduleShardsConfig;
private String akkaConfig;
+ private boolean useAkkaArtery = true;
private String[] waitForshardLeader = new String[0];
private String testName;
- private SchemaContext schemaContext;
+ private EffectiveModelContext schemaContext;
private boolean createOperDatastore = true;
- private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
- shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
+ private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
+ .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
- Builder(List<MemberNode> members) {
+ Builder(final List<MemberNode> members) {
this.members = members;
}
*
* @return this Builder
*/
- public Builder moduleShardsConfig(String moduleShardsConfig) {
- this.moduleShardsConfig = moduleShardsConfig;
+ public Builder moduleShardsConfig(final String newModuleShardsConfig) {
+ this.moduleShardsConfig = newModuleShardsConfig;
return this;
}
*
* @return this Builder
*/
- public Builder akkaConfig(String akkaConfig) {
- this.akkaConfig = akkaConfig;
+ public Builder akkaConfig(final String newAkkaConfig) {
+ this.akkaConfig = newAkkaConfig;
+ return this;
+ }
+
+ /**
+ * Specifies whether or not to use akka artery for remoting. Default is true.
+ *
+ * @return this Builder
+ */
+ public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
+ this.useAkkaArtery = newUseAkkaArtery;
return this;
}
*
* @return this Builder
*/
- public Builder testName(String testName) {
- this.testName = testName;
+ public Builder testName(final String newTestName) {
+ this.testName = newTestName;
return this;
}
*
* @return this Builder
*/
- public Builder waitForShardLeader(String... shardNames) {
+ public Builder waitForShardLeader(final String... shardNames) {
this.waitForshardLeader = shardNames;
return this;
}
*
* @return this Builder
*/
- public Builder createOperDatastore(boolean value) {
+ public Builder createOperDatastore(final boolean value) {
this.createOperDatastore = value;
return this;
}
*
* @return this Builder
*/
- public Builder schemaContext(SchemaContext schemaContext) {
- this.schemaContext = schemaContext;
+ public Builder schemaContext(final EffectiveModelContext newSchemaContext) {
+ this.schemaContext = newSchemaContext;
return this;
}
*
* @return this Builder
*/
- public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
+ public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
datastoreContextBuilder = builder;
return this;
}
- public MemberNode build() {
- Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
- Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
- Preconditions.checkNotNull(testName, "testName must be specified");
+ public MemberNode build() throws Exception {
+ requireNonNull(moduleShardsConfig, "moduleShardsConfig must be specified");
+ requireNonNull(akkaConfig, "akkaConfig must be specified");
+ requireNonNull(testName, "testName must be specified");
- if(schemaContext == null) {
+ if (schemaContext == null) {
schemaContext = SchemaContextHelper.full();
}
MemberNode node = new MemberNode();
node.datastoreContextBuilder = datastoreContextBuilder;
- ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
- Cluster.get(system).join(MEMBER_1_ADDRESS);
+ Config baseConfig = ConfigFactory.load();
+ Config config;
+ if (useAkkaArtery) {
+ config = baseConfig.getConfig(akkaConfig);
+ } else {
+ config = baseConfig.getConfig(akkaConfig + "-without-artery")
+ .withFallback(baseConfig.getConfig(akkaConfig));
+ }
+
+ ActorSystem system = ActorSystem.create("cluster-test", config);
+ String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
+ Cluster.get(system).join(AddressFromURIString.parse(member1Address));
node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
- node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
- true, schemaContext, waitForshardLeader);
+ node.configDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
+ "config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
- if(createOperDatastore) {
+ if (createOperDatastore) {
node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
- node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
- true, schemaContext, waitForshardLeader);
+ node.operDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
+ "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
}
members.add(node);
}
}
- public static interface RaftStateVerifier {
+ public interface RaftStateVerifier {
void verify(OnDemandRaftState raftState);
}
-}
\ No newline at end of file
+}