2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.AddressFromURIString;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent.CurrentClusterState;
19 import akka.cluster.Member;
20 import akka.cluster.MemberStatus;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Stopwatch;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
33 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
34 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
35 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import scala.concurrent.Await;
38 import scala.concurrent.Future;
39 import scala.concurrent.duration.Duration;
42 * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
43 * config and (optional) operational data store instances. The Builder is used to specify the setup
44 * parameters and create the data store instances. The actor system is automatically joined to address
45 * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
47 * @author Thomas Pantelis
49 public class MemberNode {
50 static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
52 private IntegrationTestKit kit;
53 private DistributedDataStore configDataStore;
54 private DistributedDataStore operDataStore;
55 private DatastoreContext.Builder datastoreContextBuilder;
56 private boolean cleanedUp;
59 * Constructs a Builder.
61 * @param members the list to which the resulting MemberNode will be added. This makes it easier for
62 * callers to cleanup instances on test completion.
63 * @return a Builder instance
65 public static Builder builder(List<MemberNode> members) {
66 return new Builder(members);
69 public IntegrationTestKit kit() {
74 public DistributedDataStore configDataStore() {
75 return configDataStore;
79 public DistributedDataStore operDataStore() {
83 public DatastoreContext.Builder datastoreContextBuilder() {
84 return datastoreContextBuilder;
87 public void waitForMembersUp(String... otherMembers) {
88 kit.waitForMembersUp(otherMembers);
91 public void waitForMemberDown(String member) {
92 Stopwatch sw = Stopwatch.createStarted();
93 while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
94 CurrentClusterState state = Cluster.get(kit.getSystem()).state();
95 for (Member m : state.getUnreachable()) {
96 if (member.equals(m.getRoles().iterator().next())) {
101 for (Member m : state.getMembers()) {
102 if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
107 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
110 fail("Member " + member + " is now down");
113 public void cleanup() {
116 if (configDataStore != null) {
117 configDataStore.close();
119 if (operDataStore != null) {
120 operDataStore.close();
123 IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
127 public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
129 ActorContext actorContext = datastore.getActorContext();
131 Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
132 ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
134 AssertionError lastError = null;
135 Stopwatch sw = Stopwatch.createStarted();
136 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
137 OnDemandRaftState raftState = (OnDemandRaftState)actorContext
138 .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
141 verifier.verify(raftState);
143 } catch (AssertionError e) {
145 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
152 public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
153 String... peerMemberNames) throws Exception {
154 final Set<String> peerIds = Sets.newHashSet();
155 for (String p: peerMemberNames) {
156 peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
157 datastore.getActorContext().getDataStoreName()).toString());
160 verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
161 raftState.getPeerAddresses().keySet()));
164 public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
165 Stopwatch sw = Stopwatch.createStarted();
166 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
167 Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
168 if (!shardReply.isPresent()) {
172 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
175 fail("Shard " + shardName + " is present");
178 public static class Builder {
179 private final List<MemberNode> members;
180 private String moduleShardsConfig;
181 private String akkaConfig;
182 private String[] waitForshardLeader = new String[0];
183 private String testName;
184 private SchemaContext schemaContext;
185 private boolean createOperDatastore = true;
186 private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
187 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
189 Builder(List<MemberNode> members) {
190 this.members = members;
194 * Specifies the name of the module shards config file. This is required.
196 * @return this Builder
198 public Builder moduleShardsConfig(String newModuleShardsConfig) {
199 this.moduleShardsConfig = newModuleShardsConfig;
204 * Specifies the name of the akka configuration. This is required.
206 * @return this Builder
208 public Builder akkaConfig(String newAkkaConfig) {
209 this.akkaConfig = newAkkaConfig;
214 * Specifies the name of the test that is appended to the data store names. This is required.
216 * @return this Builder
218 public Builder testName(String newTestName) {
219 this.testName = newTestName;
224 * Specifies the optional names of the shards to initially wait for a leader to be elected.
226 * @return this Builder
228 public Builder waitForShardLeader(String... shardNames) {
229 this.waitForshardLeader = shardNames;
234 * Specifies whether or not to create an operational data store. Defaults to true.
236 * @return this Builder
238 public Builder createOperDatastore(boolean value) {
239 this.createOperDatastore = value;
244 * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
246 * @return this Builder
248 public Builder schemaContext(SchemaContext newSchemaContext) {
249 this.schemaContext = newSchemaContext;
254 * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
256 * @return this Builder
258 public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
259 datastoreContextBuilder = builder;
263 public MemberNode build() {
264 Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
265 Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
266 Preconditions.checkNotNull(testName, "testName must be specified");
268 if (schemaContext == null) {
269 schemaContext = SchemaContextHelper.full();
272 MemberNode node = new MemberNode();
273 node.datastoreContextBuilder = datastoreContextBuilder;
275 ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
276 Cluster.get(system).join(MEMBER_1_ADDRESS);
278 node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
280 String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
281 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
282 node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
283 true, schemaContext, waitForshardLeader);
285 if (createOperDatastore) {
286 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
287 node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
288 true, schemaContext, waitForshardLeader);
296 public interface RaftStateVerifier {
297 void verify(OnDemandRaftState raftState);