Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.AddressFromURIString;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent.CurrentClusterState;
19 import akka.cluster.Member;
20 import akka.cluster.MemberStatus;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.typesafe.config.Config;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.Set;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
32 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
33 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
34 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
35 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
36 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
37 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
38 import org.slf4j.LoggerFactory;
39 import scala.concurrent.Await;
40 import scala.concurrent.Future;
41 import scala.concurrent.duration.FiniteDuration;
42
43 /**
44  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
45  * config and (optional) operational data store instances. The Builder is used to specify the setup
46  * parameters and create the data store instances. The actor system is automatically joined to address
47  * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
48  *
49  * @author Thomas Pantelis
50  */
51 public class MemberNode {
52     private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
53
54     private IntegrationTestKit kit;
55     private ClientBackedDataStore configDataStore;
56     private ClientBackedDataStore operDataStore;
57     private DatastoreContext.Builder datastoreContextBuilder;
58     private boolean cleanedUp;
59
60     /**
61      * Constructs a Builder.
62      *
63      * @param members the list to which the resulting MemberNode will be added. This makes it easier for
64      *                callers to cleanup instances on test completion.
65      * @return a Builder instance
66      */
67     public static Builder builder(final List<MemberNode> members) {
68         return new Builder(members);
69     }
70
71     public IntegrationTestKit kit() {
72         return kit;
73     }
74
75
76     public ClientBackedDataStore configDataStore() {
77         return configDataStore;
78     }
79
80
81     public ClientBackedDataStore operDataStore() {
82         return operDataStore;
83     }
84
85     public DatastoreContext.Builder datastoreContextBuilder() {
86         return datastoreContextBuilder;
87     }
88
89     public void waitForMembersUp(final String... otherMembers) {
90         kit.waitForMembersUp(otherMembers);
91     }
92
93     public void waitForMemberDown(final String member) {
94         Stopwatch sw = Stopwatch.createStarted();
95         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
96             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
97
98             for (Member m : state.getUnreachable()) {
99                 if (member.equals(m.getRoles().iterator().next())) {
100                     return;
101                 }
102             }
103
104             for (Member m : state.getMembers()) {
105                 if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
106                     return;
107                 }
108             }
109
110             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
111         }
112
113         fail("Member " + member + " is now down");
114     }
115
116     @SuppressWarnings("checkstyle:IllegalCatch")
117     public void cleanup() {
118         if (!cleanedUp) {
119             cleanedUp = true;
120             if (configDataStore != null) {
121                 configDataStore.close();
122             }
123             if (operDataStore != null) {
124                 operDataStore.close();
125             }
126
127             try {
128                 IntegrationTestKit.shutdownActorSystem(kit.getSystem(), true);
129             } catch (RuntimeException e) {
130                 LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
131             }
132         }
133     }
134
135     public static void verifyRaftState(final ClientBackedDataStore datastore, final String shardName,
136             final RaftStateVerifier verifier) throws Exception {
137         ActorUtils actorUtils = datastore.getActorUtils();
138
139         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
140         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
141
142         AssertionError lastError = null;
143         Stopwatch sw = Stopwatch.createStarted();
144         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
145             OnDemandRaftState raftState = (OnDemandRaftState)actorUtils
146                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
147
148             try {
149                 verifier.verify(raftState);
150                 return;
151             } catch (AssertionError e) {
152                 lastError = e;
153                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
154             }
155         }
156
157         throw lastError;
158     }
159
160     public static void verifyRaftPeersPresent(final ClientBackedDataStore datastore, final String shardName,
161             final String... peerMemberNames) throws Exception {
162         final Set<String> peerIds = new HashSet<>();
163         for (String p: peerMemberNames) {
164             peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
165                 datastore.getActorUtils().getDataStoreName()).toString());
166         }
167
168         verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
169             raftState.getPeerAddresses().keySet()));
170     }
171
172     public static void verifyNoShardPresent(final ClientBackedDataStore datastore, final String shardName) {
173         Stopwatch sw = Stopwatch.createStarted();
174         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
175             Optional<ActorRef> shardReply = datastore.getActorUtils().findLocalShard(shardName);
176             if (!shardReply.isPresent()) {
177                 return;
178             }
179
180             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
181         }
182
183         fail("Shard " + shardName + " is present");
184     }
185
186     public static class Builder {
187         private final List<MemberNode> members;
188         private String moduleShardsConfig;
189         private String akkaConfig;
190         private boolean useAkkaArtery = true;
191         private String[] waitForshardLeader = new String[0];
192         private String testName;
193         private EffectiveModelContext schemaContext;
194         private boolean createOperDatastore = true;
195         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
196                 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
197
198         Builder(final List<MemberNode> members) {
199             this.members = members;
200         }
201
202         /**
203          * Specifies the name of the module shards config file. This is required.
204          *
205          * @return this Builder
206          */
207         public Builder moduleShardsConfig(final String newModuleShardsConfig) {
208             moduleShardsConfig = newModuleShardsConfig;
209             return this;
210         }
211
212         /**
213          * Specifies the name of the akka configuration. This is required.
214          *
215          * @return this Builder
216          */
217         public Builder akkaConfig(final String newAkkaConfig) {
218             akkaConfig = newAkkaConfig;
219             return this;
220         }
221
222         /**
223          * Specifies whether or not to use akka artery for remoting. Default is true.
224          *
225          * @return this Builder
226          */
227         public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
228             useAkkaArtery = newUseAkkaArtery;
229             return this;
230         }
231
232         /**
233          * Specifies the name of the test that is appended to the data store names. This is required.
234          *
235          * @return this Builder
236          */
237         public Builder testName(final String newTestName) {
238             testName = newTestName;
239             return this;
240         }
241
242         /**
243          * Specifies the optional names of the shards to initially wait for a leader to be elected.
244          *
245          * @return this Builder
246          */
247         public Builder waitForShardLeader(final String... shardNames) {
248             waitForshardLeader = shardNames;
249             return this;
250         }
251
252         /**
253          * Specifies whether or not to create an operational data store. Defaults to true.
254          *
255          * @return this Builder
256          */
257         public Builder createOperDatastore(final boolean value) {
258             createOperDatastore = value;
259             return this;
260         }
261
262         /**
263          * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
264          *
265          * @return this Builder
266          */
267         public Builder schemaContext(final EffectiveModelContext newSchemaContext) {
268             schemaContext = newSchemaContext;
269             return this;
270         }
271
272         /**
273          * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
274          *
275          * @return this Builder
276          */
277         public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
278             datastoreContextBuilder = builder;
279             return this;
280         }
281
282         public MemberNode build() throws Exception {
283             requireNonNull(moduleShardsConfig, "moduleShardsConfig must be specified");
284             requireNonNull(akkaConfig, "akkaConfig must be specified");
285             requireNonNull(testName, "testName must be specified");
286
287             if (schemaContext == null) {
288                 schemaContext = SchemaContextHelper.full();
289             }
290
291             MemberNode node = new MemberNode();
292             node.datastoreContextBuilder = datastoreContextBuilder;
293
294             Config baseConfig = ConfigFactory.load();
295             Config config;
296             if (useAkkaArtery) {
297                 config = baseConfig.getConfig(akkaConfig);
298             } else {
299                 config = baseConfig.getConfig(akkaConfig + "-without-artery")
300                         .withFallback(baseConfig.getConfig(akkaConfig));
301             }
302
303             ActorSystem system = ActorSystem.create("cluster-test", config);
304             String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
305             Cluster.get(system).join(AddressFromURIString.parse(member1Address));
306
307             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
308
309             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
310             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
311             node.configDataStore = node.kit.setupDataStore(ClientBackedDataStore.class, "config_" + testName,
312                 moduleShardsConfig, true, schemaContext, waitForshardLeader);
313
314             if (createOperDatastore) {
315                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
316                 node.operDataStore = node.kit.setupDataStore(ClientBackedDataStore.class,
317                         "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
318             }
319
320             members.add(node);
321             return node;
322         }
323     }
324
325     public interface RaftStateVerifier {
326         void verify(OnDemandRaftState raftState);
327     }
328 }