import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import akka.actor.Address;
import akka.actor.AddressFromURIString;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.CurrentClusterState;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.List;
import java.util.Set;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
* @author Thomas Pantelis
*/
public class MemberNode {
- static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
+ private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
private IntegrationTestKit kit;
private AbstractDataStore configDataStore;
fail("Member " + member + " is now down");
}
+ @SuppressWarnings("checkstyle:IllegalCatch")
public void cleanup() {
if (!cleanedUp) {
cleanedUp = true;
operDataStore.close();
}
- IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ try {
+ IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ } catch (RuntimeException e) {
+ LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
+ }
}
}
private final List<MemberNode> members;
private String moduleShardsConfig;
private String akkaConfig;
+ private boolean useAkkaArtery = true;
private String[] waitForshardLeader = new String[0];
private String testName;
private SchemaContext schemaContext;
return this;
}
+ /**
+ * Specifies whether or not to use akka artery for remoting. Default is true.
+ *
+ * @return this Builder
+ */
+ public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
+ this.useAkkaArtery = newUseAkkaArtery;
+ return this;
+ }
+
/**
* Specifies the name of the test that is appended to the data store names. This is required.
*
return this;
}
- public MemberNode build() {
+ public MemberNode build() throws Exception {
Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
Preconditions.checkNotNull(testName, "testName must be specified");
MemberNode node = new MemberNode();
node.datastoreContextBuilder = datastoreContextBuilder;
- ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
- Cluster.get(system).join(MEMBER_1_ADDRESS);
+ Config baseConfig = ConfigFactory.load();
+ Config config;
+ if (useAkkaArtery) {
+ config = baseConfig.getConfig(akkaConfig);
+ } else {
+ config = baseConfig.getConfig(akkaConfig + "-without-artery")
+ .withFallback(baseConfig.getConfig(akkaConfig));
+ }
+
+ ActorSystem system = ActorSystem.create("cluster-test", config);
+ String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
+ Cluster.get(system).join(AddressFromURIString.parse(member1Address));
node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
- node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
- true, schemaContext, waitForshardLeader);
+ node.configDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
+ "config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
if (createOperDatastore) {
node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
- node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
- true, schemaContext, waitForshardLeader);
+ node.operDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
+ "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
}
members.add(node);