2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertTrue;
11 import static org.junit.Assert.fail;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.List;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
30 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
31 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
32 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import scala.concurrent.duration.Duration;
39 * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
40 * config and (optional) operational data store instances. The Builder is used to specify the setup
41 * parameters and create the data store instances. The actor system is automatically joined to address
42 * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
44 * @author Thomas Pantelis
46 public class MemberNode {
47 static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
49 private IntegrationTestKit kit;
50 private DistributedDataStore configDataStore;
51 private DistributedDataStore operDataStore;
52 private DatastoreContext.Builder datastoreContextBuilder;
53 private boolean cleanedUp;
56 * Constructs a Builder.
58 * @param members the list to which the resulting MemberNode will be added. This makes it easier for
59 * callers to cleanup instances on test completion.
60 * @return a Builder instance
62 public static Builder builder(List<MemberNode> members) {
63 return new Builder(members);
66 public IntegrationTestKit kit() {
71 public DistributedDataStore configDataStore() {
72 return configDataStore;
76 public DistributedDataStore operDataStore() {
80 public DatastoreContext.Builder datastoreContextBuilder() {
81 return datastoreContextBuilder;
84 public void waitForMembersUp(String... otherMembers) {
85 Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
86 Stopwatch sw = Stopwatch.createStarted();
87 while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
88 CurrentClusterState state = Cluster.get(kit.getSystem()).state();
89 for(Member m: state.getMembers()) {
90 if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
91 otherMembersSet.isEmpty()) {
96 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
99 fail("Member(s) " + otherMembersSet + " are not Up");
102 public void cleanup() {
105 kit.cleanup(configDataStore);
106 kit.cleanup(operDataStore);
107 kit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
111 public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
113 ActorContext actorContext = datastore.getActorContext();
115 Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
116 ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
118 AssertionError lastError = null;
119 Stopwatch sw = Stopwatch.createStarted();
120 while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
121 OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
122 executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
125 verifier.verify(raftState);
127 } catch (AssertionError e) {
129 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
136 public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
137 String... peerMemberNames) throws Exception {
138 final Set<String> peerIds = Sets.newHashSet();
139 for(String p: peerMemberNames) {
140 peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
141 type(datastore.getActorContext().getDataStoreType()).build().toString());
144 verifyRaftState(datastore, shardName, new RaftStateVerifier() {
146 public void verify(OnDemandRaftState raftState) {
147 assertTrue("Peer(s) " + peerIds + " not found for shard " + shardName,
148 raftState.getPeerAddresses().keySet().containsAll(peerIds));
153 public static class Builder {
154 private final List<MemberNode> members;
155 private String moduleShardsConfig;
156 private String akkaConfig;
157 private String[] waitForshardLeader = new String[0];
158 private String testName;
159 private SchemaContext schemaContext;
160 private boolean createOperDatastore = true;
161 private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
162 shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
164 Builder(List<MemberNode> members) {
165 this.members = members;
169 * Specifies the name of the module shards config file. This is required.
171 * @return this Builder
173 public Builder moduleShardsConfig(String moduleShardsConfig) {
174 this.moduleShardsConfig = moduleShardsConfig;
179 * Specifies the name of the akka configuration. This is required.
181 * @return this Builder
183 public Builder akkaConfig(String akkaConfig) {
184 this.akkaConfig = akkaConfig;
189 * Specifies the name of the test that is appended to the data store names. This is required.
191 * @return this Builder
193 public Builder testName(String testName) {
194 this.testName = testName;
199 * Specifies the optional names of the shards to initially wait for a leader to be elected.
201 * @return this Builder
203 public Builder waitForShardLeader(String... shardNames) {
204 this.waitForshardLeader = shardNames;
209 * Specifies whether or not to create an operational data store. Defaults to true.
211 * @return this Builder
213 public Builder createOperDatastore(boolean value) {
214 this.createOperDatastore = value;
219 * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
221 * @return this Builder
223 public Builder schemaContext(SchemaContext schemaContext) {
224 this.schemaContext = schemaContext;
229 * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
231 * @return this Builder
233 public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
234 datastoreContextBuilder = builder;
238 public MemberNode build() {
239 Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
240 Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
241 Preconditions.checkNotNull(testName, "testName must be specified");
243 if(schemaContext == null) {
244 schemaContext = SchemaContextHelper.full();
247 MemberNode node = new MemberNode();
248 node.datastoreContextBuilder = datastoreContextBuilder;
250 ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
251 Cluster.get(system).join(MEMBER_1_ADDRESS);
253 node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
255 String memberName = new ClusterWrapperImpl(system).getCurrentMemberName();
256 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
257 node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
258 true, schemaContext, waitForshardLeader);
260 if(createOperDatastore) {
261 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
262 node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
263 true, schemaContext, waitForshardLeader);
271 public static interface RaftStateVerifier {
272 void verify(OnDemandRaftState raftState);