Add UnsignedLongBitmap
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static java.util.Objects.requireNonNull;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.fail;
13
14 import akka.actor.ActorRef;
15 import akka.actor.ActorSystem;
16 import akka.actor.AddressFromURIString;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent.CurrentClusterState;
19 import akka.cluster.Member;
20 import akka.cluster.MemberStatus;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.util.concurrent.Uninterruptibles;
23 import com.typesafe.config.Config;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.HashSet;
26 import java.util.List;
27 import java.util.Optional;
28 import java.util.Set;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorUtils;
33 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
34 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
35 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
36 import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
37 import org.slf4j.LoggerFactory;
38 import scala.concurrent.Await;
39 import scala.concurrent.Future;
40 import scala.concurrent.duration.FiniteDuration;
41
42 /**
43  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
44  * config and (optional) operational data store instances. The Builder is used to specify the setup
45  * parameters and create the data store instances. The actor system is automatically joined to address
46  * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
47  *
48  * @author Thomas Pantelis
49  */
50 public class MemberNode {
51     private static final String MEMBER_1_ADDRESS = "akka://cluster-test@127.0.0.1:2558";
52
53     private IntegrationTestKit kit;
54     private AbstractDataStore configDataStore;
55     private AbstractDataStore operDataStore;
56     private DatastoreContext.Builder datastoreContextBuilder;
57     private boolean cleanedUp;
58
59     /**
60      * Constructs a Builder.
61      *
62      * @param members the list to which the resulting MemberNode will be added. This makes it easier for
63      *                callers to cleanup instances on test completion.
64      * @return a Builder instance
65      */
66     public static Builder builder(final List<MemberNode> members) {
67         return new Builder(members);
68     }
69
70     public IntegrationTestKit kit() {
71         return kit;
72     }
73
74
75     public AbstractDataStore configDataStore() {
76         return configDataStore;
77     }
78
79
80     public AbstractDataStore operDataStore() {
81         return operDataStore;
82     }
83
84     public DatastoreContext.Builder datastoreContextBuilder() {
85         return datastoreContextBuilder;
86     }
87
88     public void waitForMembersUp(final String... otherMembers) {
89         kit.waitForMembersUp(otherMembers);
90     }
91
92     public void waitForMemberDown(final String member) {
93         Stopwatch sw = Stopwatch.createStarted();
94         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
95             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
96
97             for (Member m : state.getUnreachable()) {
98                 if (member.equals(m.getRoles().iterator().next())) {
99                     return;
100                 }
101             }
102
103             for (Member m : state.getMembers()) {
104                 if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
105                     return;
106                 }
107             }
108
109             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
110         }
111
112         fail("Member " + member + " is now down");
113     }
114
115     @SuppressWarnings("checkstyle:IllegalCatch")
116     public void cleanup() {
117         if (!cleanedUp) {
118             cleanedUp = true;
119             if (configDataStore != null) {
120                 configDataStore.close();
121             }
122             if (operDataStore != null) {
123                 operDataStore.close();
124             }
125
126             try {
127                 IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
128             } catch (RuntimeException e) {
129                 LoggerFactory.getLogger(MemberNode.class).warn("Failed to shutdown actor system", e);
130             }
131         }
132     }
133
134     public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
135             final RaftStateVerifier verifier) throws Exception {
136         ActorUtils actorUtils = datastore.getActorUtils();
137
138         Future<ActorRef> future = actorUtils.findLocalShardAsync(shardName);
139         ActorRef shardActor = Await.result(future, FiniteDuration.create(10, TimeUnit.SECONDS));
140
141         AssertionError lastError = null;
142         Stopwatch sw = Stopwatch.createStarted();
143         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
144             OnDemandRaftState raftState = (OnDemandRaftState)actorUtils
145                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
146
147             try {
148                 verifier.verify(raftState);
149                 return;
150             } catch (AssertionError e) {
151                 lastError = e;
152                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
153             }
154         }
155
156         throw lastError;
157     }
158
159     public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
160             final String... peerMemberNames) throws Exception {
161         final Set<String> peerIds = new HashSet<>();
162         for (String p: peerMemberNames) {
163             peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
164                 datastore.getActorUtils().getDataStoreName()).toString());
165         }
166
167         verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
168             raftState.getPeerAddresses().keySet()));
169     }
170
171     public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
172         Stopwatch sw = Stopwatch.createStarted();
173         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
174             Optional<ActorRef> shardReply = datastore.getActorUtils().findLocalShard(shardName);
175             if (!shardReply.isPresent()) {
176                 return;
177             }
178
179             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
180         }
181
182         fail("Shard " + shardName + " is present");
183     }
184
185     public static class Builder {
186         private final List<MemberNode> members;
187         private String moduleShardsConfig;
188         private String akkaConfig;
189         private boolean useAkkaArtery = true;
190         private String[] waitForshardLeader = new String[0];
191         private String testName;
192         private EffectiveModelContext schemaContext;
193         private boolean createOperDatastore = true;
194         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
195                 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
196
197         Builder(final List<MemberNode> members) {
198             this.members = members;
199         }
200
201         /**
202          * Specifies the name of the module shards config file. This is required.
203          *
204          * @return this Builder
205          */
206         public Builder moduleShardsConfig(final String newModuleShardsConfig) {
207             this.moduleShardsConfig = newModuleShardsConfig;
208             return this;
209         }
210
211         /**
212          * Specifies the name of the akka configuration. This is required.
213          *
214          * @return this Builder
215          */
216         public Builder akkaConfig(final String newAkkaConfig) {
217             this.akkaConfig = newAkkaConfig;
218             return this;
219         }
220
221         /**
222          * Specifies whether or not to use akka artery for remoting. Default is true.
223          *
224          * @return this Builder
225          */
226         public Builder useAkkaArtery(final boolean newUseAkkaArtery) {
227             this.useAkkaArtery = newUseAkkaArtery;
228             return this;
229         }
230
231         /**
232          * Specifies the name of the test that is appended to the data store names. This is required.
233          *
234          * @return this Builder
235          */
236         public Builder testName(final String newTestName) {
237             this.testName = newTestName;
238             return this;
239         }
240
241         /**
242          * Specifies the optional names of the shards to initially wait for a leader to be elected.
243          *
244          * @return this Builder
245          */
246         public Builder waitForShardLeader(final String... shardNames) {
247             this.waitForshardLeader = shardNames;
248             return this;
249         }
250
251         /**
252          * Specifies whether or not to create an operational data store. Defaults to true.
253          *
254          * @return this Builder
255          */
256         public Builder createOperDatastore(final boolean value) {
257             this.createOperDatastore = value;
258             return this;
259         }
260
261         /**
262          * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
263          *
264          * @return this Builder
265          */
266         public Builder schemaContext(final EffectiveModelContext newSchemaContext) {
267             this.schemaContext = newSchemaContext;
268             return this;
269         }
270
271         /**
272          * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
273          *
274          * @return this Builder
275          */
276         public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
277             datastoreContextBuilder = builder;
278             return this;
279         }
280
281         public MemberNode build() throws Exception {
282             requireNonNull(moduleShardsConfig, "moduleShardsConfig must be specified");
283             requireNonNull(akkaConfig, "akkaConfig must be specified");
284             requireNonNull(testName, "testName must be specified");
285
286             if (schemaContext == null) {
287                 schemaContext = SchemaContextHelper.full();
288             }
289
290             MemberNode node = new MemberNode();
291             node.datastoreContextBuilder = datastoreContextBuilder;
292
293             Config baseConfig = ConfigFactory.load();
294             Config config;
295             if (useAkkaArtery) {
296                 config = baseConfig.getConfig(akkaConfig);
297             } else {
298                 config = baseConfig.getConfig(akkaConfig + "-without-artery")
299                         .withFallback(baseConfig.getConfig(akkaConfig));
300             }
301
302             ActorSystem system = ActorSystem.create("cluster-test", config);
303             String member1Address = useAkkaArtery ? MEMBER_1_ADDRESS : MEMBER_1_ADDRESS.replace("akka", "akka.tcp");
304             Cluster.get(system).join(AddressFromURIString.parse(member1Address));
305
306             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
307
308             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
309             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
310             node.configDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
311                     "config_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
312
313             if (createOperDatastore) {
314                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
315                 node.operDataStore = node.kit.setupAbstractDataStore(DistributedDataStore.class,
316                         "oper_" + testName, moduleShardsConfig, true, schemaContext, waitForshardLeader);
317             }
318
319             members.add(node);
320             return node;
321         }
322     }
323
324     public interface RaftStateVerifier {
325         void verify(OnDemandRaftState raftState);
326     }
327 }