2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Stopwatch;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.typesafe.config.Config;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.List;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
33 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
34 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
35 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.LoggerFactory;
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.Duration;
43 * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
44 * config and (optional) operational data store instances. The Builder is used to specify the setup
45 * parameters and create the data store instances. The actor system is automatically joined to address
46 * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
48 * @author Thomas Pantelis
50 public class MemberNode {
51 private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
53 private IntegrationTestKit kit;
54 private AbstractDataStore configDataStore;
55 private AbstractDataStore operDataStore;
56 private DatastoreContext.Builder datastoreContextBuilder;
57 private boolean cleanedUp;
60 * Constructs a Builder.
62 * @param members the list to which the resulting MemberNode will be added. This makes it easier for
63 * callers to cleanup instances on test completion.
64 * @return a Builder instance
66 public static Builder builder(final List<MemberNode> members) {
67 return new Builder(members);
70 public IntegrationTestKit kit() {
75 public AbstractDataStore configDataStore() {
76 return configDataStore;
80 public AbstractDataStore operDataStore() {
84 public DatastoreContext.Builder datastoreContextBuilder() {
85 return datastoreContextBuilder;
88 public void waitForMembersUp(final String... otherMembers) {
89 kit.waitForMembersUp(otherMembers);
92 public void waitForMemberDown(final String member) {
93 Stopwatch sw = Stopwatch.createStarted();
94 while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
95 CurrentClusterState state = Cluster.get(kit.getSystem()).state();
96 for (Member m : state.getUnreachable()) {
97 if (member.equals(m.getRoles().iterator().next())) {
102 for (Member m : state.getMembers()) {
103 if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
108 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
111 fail("Member " + member + " is now down");
114 @SuppressWarnings("checkstyle:IllegalCatch")
115 public void cleanup() {
118 if (configDataStore != null) {
119 configDataStore.close();
121 if (operDataStore != null) {
122 operDataStore.close();
126 IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
127 } catch (RuntimeException e) {
128 LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
133 public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
134 final RaftStateVerifier verifier) throws Exception {
135 ActorContext actorContext = datastore.getActorContext();
137 Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
138 ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
140 AssertionError lastError = null;
141 Stopwatch sw = Stopwatch.createStarted();
142 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
143 OnDemandRaftState raftState = (OnDemandRaftState)actorContext
144 .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
147 verifier.verify(raftState);
149 } catch (AssertionError e) {
151 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
158 public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
159 final String... peerMemberNames) throws Exception {
160 final Set<String> peerIds = Sets.newHashSet();
161 for (String p: peerMemberNames) {
162 peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
163 datastore.getActorContext().getDataStoreName()).toString());
166 verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
167 raftState.getPeerAddresses().keySet()));
170 public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
171 Stopwatch sw = Stopwatch.createStarted();
172 while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
173 Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
174 if (!shardReply.isPresent()) {
178 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
181 fail("Shard " + shardName + " is present");
184 public static class Builder {
185 private final List<MemberNode> members;
186 private String moduleShardsConfig;
187 private String akkaConfig;
188 private boolean useAkkaArtery = true;
189 private String[] waitForshardLeader = new String[0];
190 private String testName;
191 private SchemaContext schemaContext;
192 private boolean createOperDatastore = true;
193 private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
194 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
196 Builder(final List<MemberNode> members) {
197 this.members = members;
201 * Specifies the name of the module shards config file. This is required.
203 * @return this Builder
205 public Builder moduleShardsConfig(final String newModuleShardsConfig) {
206 this.moduleShardsConfig = newModuleShardsConfig;
211 * Specifies the name of the akka configuration. This is required.
213 * @return this Builder
215 public Builder akkaConfig(final String newAkkaConfig) {
216 this.akkaConfig = newAkkaConfig;
221 * Specifies whether or not to use akka artery for remoting. Default is true.
223 * @return this Builder
225 public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
226 this.useAkkaArtery = newUseAkkaArtery;
231 * Specifies the name of the test that is appended to the data store names. This is required.
233 * @return this Builder
235 public Builder testName(final String newTestName) {
236 this.testName = newTestName;
241 * Specifies the optional names of the shards to initially wait for a leader to be elected.
243 * @return this Builder
245 public Builder waitForShardLeader(final String... shardNames) {
246 this.waitForshardLeader = shardNames;
251 * Specifies whether or not to create an operational data store. Defaults to true.
253 * @return this Builder
255 public Builder createOperDatastore(final boolean value) {
256 this.createOperDatastore = value;
261 * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
263 * @return this Builder
265 public Builder schemaContext(final SchemaContext newSchemaContext) {
266 this.schemaContext = newSchemaContext;
271 * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
273 * @return this Builder
275 public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
276 datastoreContextBuilder = builder;
280 public MemberNode build() throws Exception {
281 Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
282 Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
283 Preconditions.checkNotNull(testName, "testName must be specified");
285 if (schemaContext == null) {
286 schemaContext = SchemaContextHelper.full();
289 MemberNode node = new MemberNode();
290 node.datastoreContextBuilder = datastoreContextBuilder;
292 Config baseConfig = ConfigFactory.load();
295 config = baseConfig.getConfig(akkaConfig);
297 config = baseConfig.getConfig(akkaConfig + "-without-artery")
298 .withFallback(baseConfig.getConfig(akkaConfig));
301 ActorSystem system = ActorSystem.create("cluster-test", config);
302 String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
303 Cluster.get(system).join(AddressFromURIString.parse(member1Address));
305 node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
307 String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
308 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
309 node.configDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
310 "config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
312 if (createOperDatastore) {
313 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
314 node.operDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
315 "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
323 public interface RaftStateVerifier {
324 void verify(OnDemandRaftState raftState);