Fix unit test CS warnings in sal-distributed-datastore
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12
13 import akka.actor.ActorRef;
14 import akka.actor.ActorSystem;
15 import akka.actor.Address;
16 import akka.actor.AddressFromURIString;
17 import akka.cluster.Cluster;
18 import akka.cluster.ClusterEvent.CurrentClusterState;
19 import akka.cluster.Member;
20 import akka.cluster.MemberStatus;
21 import com.google.common.base.Optional;
22 import com.google.common.base.Preconditions;
23 import com.google.common.base.Stopwatch;
24 import com.google.common.collect.Sets;
25 import com.google.common.util.concurrent.Uninterruptibles;
26 import com.typesafe.config.ConfigFactory;
27 import java.util.List;
28 import java.util.Set;
29 import java.util.concurrent.TimeUnit;
30 import org.opendaylight.controller.cluster.access.concepts.MemberName;
31 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
32 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
33 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
34 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
35 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
36 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
37 import scala.concurrent.Await;
38 import scala.concurrent.Future;
39 import scala.concurrent.duration.Duration;
40
41 /**
42  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
43  * config and (optional) operational data store instances. The Builder is used to specify the setup
44  * parameters and create the data store instances. The actor system is automatically joined to address
45  * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
46  *
47  * @author Thomas Pantelis
48  */
49 public class MemberNode {
50     static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
51
52     private IntegrationTestKit kit;
53     private DistributedDataStore configDataStore;
54     private DistributedDataStore operDataStore;
55     private DatastoreContext.Builder datastoreContextBuilder;
56     private boolean cleanedUp;
57
58     /**
59      * Constructs a Builder.
60      *
61      * @param members the list to which the resulting MemberNode will be added. This makes it easier for
62      *                callers to cleanup instances on test completion.
63      * @return a Builder instance
64      */
65     public static Builder builder(List<MemberNode> members) {
66         return new Builder(members);
67     }
68
69     public IntegrationTestKit kit() {
70         return kit;
71     }
72
73
74     public DistributedDataStore configDataStore() {
75         return configDataStore;
76     }
77
78
79     public DistributedDataStore operDataStore() {
80         return operDataStore;
81     }
82
83     public DatastoreContext.Builder datastoreContextBuilder() {
84         return datastoreContextBuilder;
85     }
86
87     public void waitForMembersUp(String... otherMembers) {
88         kit.waitForMembersUp(otherMembers);
89     }
90
91     public void waitForMemberDown(String member) {
92         Stopwatch sw = Stopwatch.createStarted();
93         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
94             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
95             for (Member m : state.getUnreachable()) {
96                 if (member.equals(m.getRoles().iterator().next())) {
97                     return;
98                 }
99             }
100
101             for (Member m : state.getMembers()) {
102                 if (m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
103                     return;
104                 }
105             }
106
107             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
108         }
109
110         fail("Member " + member + " is now down");
111     }
112
113     public void cleanup() {
114         if (!cleanedUp) {
115             cleanedUp = true;
116             if (configDataStore != null) {
117                 configDataStore.close();
118             }
119             if (operDataStore != null) {
120                 operDataStore.close();
121             }
122
123             IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
124         }
125     }
126
127     public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
128             throws Exception {
129         ActorContext actorContext = datastore.getActorContext();
130
131         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
132         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
133
134         AssertionError lastError = null;
135         Stopwatch sw = Stopwatch.createStarted();
136         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
137             OnDemandRaftState raftState = (OnDemandRaftState)actorContext
138                     .executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
139
140             try {
141                 verifier.verify(raftState);
142                 return;
143             } catch (AssertionError e) {
144                 lastError = e;
145                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
146             }
147         }
148
149         throw lastError;
150     }
151
152     public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
153             String... peerMemberNames) throws Exception {
154         final Set<String> peerIds = Sets.newHashSet();
155         for (String p: peerMemberNames) {
156             peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
157                 datastore.getActorContext().getDataStoreName()).toString());
158         }
159
160         verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
161             raftState.getPeerAddresses().keySet()));
162     }
163
164     public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
165         Stopwatch sw = Stopwatch.createStarted();
166         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
167             Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
168             if (!shardReply.isPresent()) {
169                 return;
170             }
171
172             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
173         }
174
175         fail("Shard " + shardName + " is present");
176     }
177
178     public static class Builder {
179         private final List<MemberNode> members;
180         private String moduleShardsConfig;
181         private String akkaConfig;
182         private String[] waitForshardLeader = new String[0];
183         private String testName;
184         private SchemaContext schemaContext;
185         private boolean createOperDatastore = true;
186         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
187                 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
188
189         Builder(List<MemberNode> members) {
190             this.members = members;
191         }
192
193         /**
194          * Specifies the name of the module shards config file. This is required.
195          *
196          * @return this Builder
197          */
198         public Builder moduleShardsConfig(String newModuleShardsConfig) {
199             this.moduleShardsConfig = newModuleShardsConfig;
200             return this;
201         }
202
203         /**
204          * Specifies the name of the akka configuration. This is required.
205          *
206          * @return this Builder
207          */
208         public Builder akkaConfig(String newAkkaConfig) {
209             this.akkaConfig = newAkkaConfig;
210             return this;
211         }
212
213         /**
214          * Specifies the name of the test that is appended to the data store names. This is required.
215          *
216          * @return this Builder
217          */
218         public Builder testName(String newTestName) {
219             this.testName = newTestName;
220             return this;
221         }
222
223         /**
224          * Specifies the optional names of the shards to initially wait for a leader to be elected.
225          *
226          * @return this Builder
227          */
228         public Builder waitForShardLeader(String... shardNames) {
229             this.waitForshardLeader = shardNames;
230             return this;
231         }
232
233         /**
234          * Specifies whether or not to create an operational data store. Defaults to true.
235          *
236          * @return this Builder
237          */
238         public Builder createOperDatastore(boolean value) {
239             this.createOperDatastore = value;
240             return this;
241         }
242
243         /**
244          * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
245          *
246          * @return this Builder
247          */
248         public Builder schemaContext(SchemaContext newSchemaContext) {
249             this.schemaContext = newSchemaContext;
250             return this;
251         }
252
253         /**
254          * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
255          *
256          * @return this Builder
257          */
258         public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
259             datastoreContextBuilder = builder;
260             return this;
261         }
262
263         public MemberNode build() {
264             Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
265             Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
266             Preconditions.checkNotNull(testName, "testName must be specified");
267
268             if (schemaContext == null) {
269                 schemaContext = SchemaContextHelper.full();
270             }
271
272             MemberNode node = new MemberNode();
273             node.datastoreContextBuilder = datastoreContextBuilder;
274
275             ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
276             Cluster.get(system).join(MEMBER_1_ADDRESS);
277
278             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
279
280             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
281             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
282             node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
283                     true, schemaContext, waitForshardLeader);
284
285             if (createOperDatastore) {
286                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
287                 node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
288                         true, schemaContext, waitForshardLeader);
289             }
290
291             members.add(node);
292             return node;
293         }
294     }
295
296     public interface RaftStateVerifier {
297         void verify(OnDemandRaftState raftState);
298     }
299 }