Migrate ActorUtils to java.util.Optional
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.Set;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
34 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
35 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import org.slf4j.LoggerFactory;
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.FiniteDuration;
41
42 /**
43  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
44  * config and (optional) operational data store instances. The Builder is used to specify the setup
45  * parameters and create the data store instances. The actor system is automatically joined to address
46  * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
47  *
48  * @author Thomas Pantelis
49  */
50 public class MemberNode {
51     private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
52
53     private IntegrationTestKit kit;
54     private AbstractDataStore configDataStore;
55     private AbstractDataStore operDataStore;
56     private DatastoreContext.Builder datastoreContextBuilder;
57     private boolean cleanedUp;
58
59     /**
60      * Constructs a Builder.
61      *
62      * @param members the list to which the resulting MemberNode will be added. This makes it easier for
63      *                callers to cleanup instances on test completion.
64      * @return a Builder instance
65      */
66     public static Builder builder(final List<MemberNode> members) {
67         return new Builder(members);
68     }
69
70     public IntegrationTestKit kit() {
71         return kit;
72     }
73
74
75     public AbstractDataStore configDataStore() {
76         return configDataStore;
77     }
78
79
80     public AbstractDataStore operDataStore() {
81         return operDataStore;
82     }
83
84     public DatastoreContext.Builder datastoreContextBuilder() {
85         return datastoreContextBuilder;
86     }
87
88     public void waitForMembersUp(final String... otherMembers) {
89         kit.waitForMembersUp(otherMembers);
90     }
91
92     public void waitForMemberDown(final String member) {
93         Stopwatch sw = Stopwatch.createStarted();
94         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
95             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
96             for (Member m : state.getUnreachable()) {
97                 if (member.equals(m.getRoles().iterator().next())) {
98                     return;
99                 }
100             }
101
102             for (Member m : state.getMembers()) {
103                 if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
104                     return;
105                 }
106             }
107
108             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
109         }
110
111         fail("Member " + member + " is now down");
112     }
113
114     @SuppressWarnings("checkstyle:IllegalCatch")
115     public void cleanup() {
116         if (!cleanedUp) {
117             cleanedUp = true;
118             if (configDataStore != null) {
119                 configDataStore.close();
120             }
121             if (operDataStore != null) {
122                 operDataStore.close();
123             }
124
125             try {
126                 IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
127             } catch (RuntimeException e) {
128                 LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
129             }
130         }
131     }
132
133     public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
134             final RaftStateVerifier verifier) throws Exception {
135         ActorUtils actorUtils = datastore.getActorUtils();
136
137         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
138         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
139
140         AssertionError lastError = null;
141         Stopwatch sw = Stopwatch.createStarted();
142         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
143             OnDemandRaftState raftState = (OnDemandRaftState)actorUtils
144                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
145
146             try {
147                 verifier.verify(raftState);
148                 return;
149             } catch (AssertionError e) {
150                 lastError = e;
151                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
152             }
153         }
154
155         throw lastError;
156     }
157
158     public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
159             final String... peerMemberNames) throws Exception {
160         final Set<String> peerIds = Sets.newHashSet();
161         for (String p: peerMemberNames) {
162             peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
163                 datastore.getActorUtils().getDataStoreName()).toString());
164         }
165
166         verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
167             raftState.getPeerAddresses().keySet()));
168     }
169
170     public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
171         Stopwatch sw = Stopwatch.createStarted();
172         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
173             Optional<ActorRef> shardReply = datastore.getActorUtils().findLocalShard(shardName);
174             if (!shardReply.isPresent()) {
175                 return;
176             }
177
178             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
179         }
180
181         fail("Shard " + shardName + " is present");
182     }
183
184     public static class Builder {
185         private final List<MemberNode> members;
186         private String moduleShardsConfig;
187         private String akkaConfig;
188         private boolean useAkkaArtery = true;
189         private String[] waitForshardLeader = new String[0];
190         private String testName;
191         private SchemaContext schemaContext;
192         private boolean createOperDatastore = true;
193         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
194                 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
195
196         Builder(final List<MemberNode> members) {
197             this.members = members;
198         }
199
200         /**
201          * Specifies the name of the module shards config file. This is required.
202          *
203          * @return this Builder
204          */
205         public Builder moduleShardsConfig(final String newModuleShardsConfig) {
206             this.moduleShardsConfig = newModuleShardsConfig;
207             return this;
208         }
209
210         /**
211          * Specifies the name of the akka configuration. This is required.
212          *
213          * @return this Builder
214          */
215         public Builder akkaConfig(final String newAkkaConfig) {
216             this.akkaConfig = newAkkaConfig;
217             return this;
218         }
219
220         /**
221          * Specifies whether or not to use akka artery for remoting. Default is true.
222          *
223          * @return this Builder
224          */
225         public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
226             this.useAkkaArtery = newUseAkkaArtery;
227             return this;
228         }
229
230         /**
231          * Specifies the name of the test that is appended to the data store names. This is required.
232          *
233          * @return this Builder
234          */
235         public Builder testName(final String newTestName) {
236             this.testName = newTestName;
237             return this;
238         }
239
240         /**
241          * Specifies the optional names of the shards to initially wait for a leader to be elected.
242          *
243          * @return this Builder
244          */
245         public Builder waitForShardLeader(final String... shardNames) {
246             this.waitForshardLeader = shardNames;
247             return this;
248         }
249
250         /**
251          * Specifies whether or not to create an operational data store. Defaults to true.
252          *
253          * @return this Builder
254          */
255         public Builder createOperDatastore(final boolean value) {
256             this.createOperDatastore = value;
257             return this;
258         }
259
260         /**
261          * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
262          *
263          * @return this Builder
264          */
265         public Builder schemaContext(final SchemaContext newSchemaContext) {
266             this.schemaContext = newSchemaContext;
267             return this;
268         }
269
270         /**
271          * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
272          *
273          * @return this Builder
274          */
275         public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
276             datastoreContextBuilder = builder;
277             return this;
278         }
279
280         public MemberNode build() throws Exception {
281             Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
282             Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
283             Preconditions.checkNotNull(testName, "testName must be specified");
284
285             if (schemaContext == null) {
286                 schemaContext = SchemaContextHelper.full();
287             }
288
289             MemberNode node = new MemberNode();
290             node.datastoreContextBuilder = datastoreContextBuilder;
291
292             Config baseConfig = ConfigFactory.load();
293             Config config;
294             if (useAkkaArtery) {
295                 config = baseConfig.getConfig(akkaConfig);
296             } else {
297                 config = baseConfig.getConfig(akkaConfig + "-without-artery")
298                         .withFallback(baseConfig.getConfig(akkaConfig));
299             }
300
301             ActorSystem system = ActorSystem.create("cluster-test", config);
302             String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
303             Cluster.get(system).join(AddressFromURIString.parse(member1Address));
304
305             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
306
307             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
308             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
309             node.configDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
310                     "config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
311
312             if (createOperDatastore) {
313                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
314                 node.operDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
315                         "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
316             }
317
318             members.add(node);
319             return node;
320         }
321     }
322
323     public interface RaftStateVerifier {
324         void verify(OnDemandRaftState raftState);
325     }
326 }