*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
-import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
private IntegrationTestKit kit;
- private AbstractDataStore configDataStore;
- private AbstractDataStore operDataStore;
+ private ClientBackedDataStore configDataStore;
+ private ClientBackedDataStore operDataStore;
private DatastoreContext.Builder datastoreContextBuilder;
private boolean cleanedUp;
}
- public AbstractDataStore configDataStore() {
+ public ClientBackedDataStore configDataStore() {
return configDataStore;
}
- public AbstractDataStore operDataStore() {
+ public ClientBackedDataStore operDataStore() {
return operDataStore;
}
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
CurrentClusterState state = Cluster.get(kit.getSystem()).state();
+
for (Member m : state.getUnreachable()) {
if (member.equals(m.getRoles().iterator().next())) {
return;
}
try {
- IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
+ IntegrationTestKit.shutdownActorSystem(kit.getSystem(), true);
} catch (RuntimeException e) {
LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
}
}
}
- public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
+ public static void verifyRaftState(final ClientBackedDataStore datastore, final String shardName,
final RaftStateVerifier verifier) throws Exception {
ActorUtils actorUtils = datastore.getActorUtils();
throw lastError;
}
- public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
+ public static void verifyRaftPeersPresent(final ClientBackedDataStore datastore, final String shardName,
final String... peerMemberNames) throws Exception {
- final Set<String> peerIds = Sets.newHashSet();
+ final Set<String> peerIds = new HashSet<>();
for (String p: peerMemberNames) {
peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
datastore.getActorUtils().getDataStoreName()).toString());
raftState.getPeerAddresses().keySet()));
}
- public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
+ public static void verifyNoShardPresent(final ClientBackedDataStore datastore, final String shardName) {
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
Optional<ActorRef> shardReply = datastore.getActorUtils().findLocalShard(shardName);
private boolean useAkkaArtery = true;
private String[] waitForshardLeader = new String[0];
private String testName;
- private SchemaContext schemaContext;
+ private EffectiveModelContext schemaContext;
private boolean createOperDatastore = true;
private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
.shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
* @return this Builder
*/
public Builder moduleShardsConfig(final String newModuleShardsConfig) {
- this.moduleShardsConfig = newModuleShardsConfig;
+ moduleShardsConfig = newModuleShardsConfig;
return this;
}
* @return this Builder
*/
public Builder akkaConfig(final String newAkkaConfig) {
- this.akkaConfig = newAkkaConfig;
+ akkaConfig = newAkkaConfig;
return this;
}
* @return this Builder
*/
public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
- this.useAkkaArtery = newUseAkkaArtery;
+ useAkkaArtery = newUseAkkaArtery;
return this;
}
* @return this Builder
*/
public Builder testName(final String newTestName) {
- this.testName = newTestName;
+ testName = newTestName;
return this;
}
* @return this Builder
*/
public Builder waitForShardLeader(final String... shardNames) {
- this.waitForshardLeader = shardNames;
+ waitForshardLeader = shardNames;
return this;
}
* @return this Builder
*/
public Builder createOperDatastore(final boolean value) {
- this.createOperDatastore = value;
+ createOperDatastore = value;
return this;
}
*
* @return this Builder
*/
- public Builder schemaContext(final SchemaContext newSchemaContext) {
- this.schemaContext = newSchemaContext;
+ public Builder schemaContext(final EffectiveModelContext newSchemaContext) {
+ schemaContext = newSchemaContext;
return this;
}
}
public MemberNode build() throws Exception {
- Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
- Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
- Preconditions.checkNotNull(testName, "testName must be specified");
+ requireNonNull(moduleShardsConfig, "moduleShardsConfig must be specified");
+ requireNonNull(akkaConfig, "akkaConfig must be specified");
+ requireNonNull(testName, "testName must be specified");
if (schemaContext == null) {
schemaContext = SchemaContextHelper.full();
String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
- node.configDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
- "config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
+ node.configDataStore = node.kit.setupDataStore(ClientBackedDataStore.class, "config_" + testName,
+ moduleShardsConfig, true, schemaContext, waitForshardLeader);
if (createOperDatastore) {
node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
- node.operDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
+ node.operDataStore = node.kit.setupDataStore(ClientBackedDataStore.class,
"oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
}