2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static java.util.Objects.requireNonNull;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.AddressFromURIString;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent.CurrentClusterState;
19 import akka.cluster.Member;
20 import akka.cluster.MemberStatus;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.typesafe.config.Config;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Optional;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
32 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
35 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
36 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
37 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.FiniteDuration;
44 * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
45 * config and (optional) operational data store instances. The Builder is used to specify the setup
46 * parameters and create the data store instances. The actor system is automatically joined to address
47 * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
49 * @author Thomas Pantelis
51 public class MemberNode {
52 private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
54 private IntegrationTestKit kit;
55 private ClientBackedDataStore configDataStore;
56 private ClientBackedDataStore operDataStore;
57 private DatastoreContext.Builder datastoreContextBuilder;
58 private boolean cleanedUp;
61 * Constructs a Builder.
63 * @param members the list to which the resulting MemberNode will be added. This makes it easier for
64 * callers to cleanup instances on test completion.
65 * @return a Builder instance
67 public static Builder builder(final List<MemberNode> members) {
68 return new Builder(members);
71 public IntegrationTestKit kit() {
76 public ClientBackedDataStore configDataStore() {
77 return configDataStore;
81 public ClientBackedDataStore operDataStore() {
85 public DatastoreContext.Builder datastoreContextBuilder() {
86 return datastoreContextBuilder;
89 public void waitForMembersUp(final String... otherMembers) {
90 kit.waitForMembersUp(otherMembers);
93 public void waitForMemberDown(final String member) {
94 Stopwatch sw = Stopwatch.createStarted();
95 while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
96 CurrentClusterState state = Cluster.get(kit.getSystem()).state();
98 for (Member m : state.getUnreachable()) {
99 if (member.equals(m.getRoles().iterator().next())) {
104 for (Member m : state.getMembers()) {
105 if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
110 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
113 fail("Member " + member + " is now down");
116 @SuppressWarnings("checkstyle:IllegalCatch")
117 public void cleanup() {
120 if (configDataStore != null) {
121 configDataStore.close();
123 if (operDataStore != null) {
124 operDataStore.close();
128 IntegrationTestKit.shutdownActorSystem(kit.getSystem(), true);
129 } catch (RuntimeException e) {
130 LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
135 public static void verifyRaftState(final ClientBackedDataStore datastore, final String shardName,
136 final RaftStateVerifier verifier) throws Exception {
137 ActorUtils actorUtils = datastore.getActorUtils();
139 Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
140 ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
142 AssertionError lastError = null;
143 Stopwatch sw = Stopwatch.createStarted();
144 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
145 OnDemandRaftState raftState = (OnDemandRaftState)actorUtils
146 .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
149 verifier.verify(raftState);
151 } catch (AssertionError e) {
153 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
160 public static void verifyRaftPeersPresent(final ClientBackedDataStore datastore, final String shardName,
161 final String... peerMemberNames) throws Exception {
162 final Set<String> peerIds = new HashSet<>();
163 for (String p: peerMemberNames) {
164 peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
165 datastore.getActorUtils().getDataStoreName()).toString());
168 verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
169 raftState.getPeerAddresses().keySet()));
172 public static void verifyNoShardPresent(final ClientBackedDataStore datastore, final String shardName) {
173 Stopwatch sw = Stopwatch.createStarted();
174 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
175 Optional<ActorRef> shardReply = datastore.getActorUtils().findLocalShard(shardName);
176 if (!shardReply.isPresent()) {
180 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
183 fail("Shard " + shardName + " is present");
186 public static class Builder {
187 private final List<MemberNode> members;
188 private String moduleShardsConfig;
189 private String akkaConfig;
190 private boolean useAkkaArtery = true;
191 private String[] waitForshardLeader = new String[0];
192 private String testName;
193 private EffectiveModelContext schemaContext;
194 private boolean createOperDatastore = true;
195 private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
196 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
198 Builder(final List<MemberNode> members) {
199 this.members = members;
203 * Specifies the name of the module shards config file. This is required.
205 * @return this Builder
207 public Builder moduleShardsConfig(final String newModuleShardsConfig) {
208 moduleShardsConfig = newModuleShardsConfig;
213 * Specifies the name of the akka configuration. This is required.
215 * @return this Builder
217 public Builder akkaConfig(final String newAkkaConfig) {
218 akkaConfig = newAkkaConfig;
223 * Specifies whether or not to use akka artery for remoting. Default is true.
225 * @return this Builder
227 public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
228 useAkkaArtery = newUseAkkaArtery;
233 * Specifies the name of the test that is appended to the data store names. This is required.
235 * @return this Builder
237 public Builder testName(final String newTestName) {
238 testName = newTestName;
243 * Specifies the optional names of the shards to initially wait for a leader to be elected.
245 * @return this Builder
247 public Builder waitForShardLeader(final String... shardNames) {
248 waitForshardLeader = shardNames;
253 * Specifies whether or not to create an operational data store. Defaults to true.
255 * @return this Builder
257 public Builder createOperDatastore(final boolean value) {
258 createOperDatastore = value;
263 * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
265 * @return this Builder
267 public Builder schemaContext(final EffectiveModelContext newSchemaContext) {
268 schemaContext = newSchemaContext;
273 * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
275 * @return this Builder
277 public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
278 datastoreContextBuilder = builder;
282 public MemberNode build() throws Exception {
283 requireNonNull(moduleShardsConfig, "moduleShardsConfig must be specified");
284 requireNonNull(akkaConfig, "akkaConfig must be specified");
285 requireNonNull(testName, "testName must be specified");
287 if (schemaContext == null) {
288 schemaContext = SchemaContextHelper.full();
291 MemberNode node = new MemberNode();
292 node.datastoreContextBuilder = datastoreContextBuilder;
294 Config baseConfig = ConfigFactory.load();
297 config = baseConfig.getConfig(akkaConfig);
299 config = baseConfig.getConfig(akkaConfig + "-without-artery")
300 .withFallback(baseConfig.getConfig(akkaConfig));
303 ActorSystem system = ActorSystem.create("cluster-test", config);
304 String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
305 Cluster.get(system).join(AddressFromURIString.parse(member1Address));
307 node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
309 String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
310 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
311 node.configDataStore = node.kit.setupDataStore(ClientBackedDataStore.class, "config_" + testName,
312 moduleShardsConfig, true, schemaContext, waitForshardLeader);
314 if (createOperDatastore) {
315 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
316 node.operDataStore = node.kit.setupDataStore(ClientBackedDataStore.class,
317 "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
325 public interface RaftStateVerifier {
326 void verify(OnDemandRaftState raftState);