5ae53317f084f8b0f05e0023d23c581bf0e2c03e
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Stopwatch;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.typesafe.config.ConfigFactory;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
30 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
31 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
32 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
33 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
34 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
35 import scala.concurrent.Await;
36 import scala.concurrent.Future;
37 import scala.concurrent.duration.Duration;
38
39 /**
40  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
41  * config and (optional) operational data store instances. The Builder is used to specify the setup
42  * parameters and create the data store instances. The actor system is automatically joined to address
43  * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
44  *
45  * @author Thomas Pantelis
46  */
47 public class MemberNode {
48     static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
49
50     private IntegrationTestKit kit;
51     private DistributedDataStore configDataStore;
52     private DistributedDataStore operDataStore;
53     private DatastoreContext.Builder datastoreContextBuilder;
54     private boolean cleanedUp;
55
56     /**
57      * Constructs a Builder.
58      *
59      * @param members the list to which the resulting MemberNode will be added. This makes it easier for
60      *                callers to cleanup instances on test completion.
61      * @return a Builder instance
62      */
63     public static Builder builder(List<MemberNode> members) {
64         return new Builder(members);
65     }
66
67     public IntegrationTestKit kit() {
68         return kit;
69     }
70
71
72     public DistributedDataStore configDataStore() {
73         return configDataStore;
74     }
75
76
77     public DistributedDataStore operDataStore() {
78         return operDataStore;
79     }
80
81     public DatastoreContext.Builder datastoreContextBuilder() {
82         return datastoreContextBuilder;
83     }
84
85     public void waitForMembersUp(String... otherMembers) {
86         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
87         Stopwatch sw = Stopwatch.createStarted();
88         while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
89             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
90             for(Member m: state.getMembers()) {
91                 if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
92                         otherMembersSet.isEmpty()) {
93                     return;
94                 }
95             }
96
97             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
98         }
99
100         fail("Member(s) " + otherMembersSet + " are not Up");
101     }
102
103     public void waitForMemberDown(String member) {
104         Stopwatch sw = Stopwatch.createStarted();
105         while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
106             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
107             for(Member m: state.getUnreachable()) {
108                 if(member.equals(m.getRoles().iterator().next())) {
109                     return;
110                 }
111             }
112
113             for(Member m: state.getMembers()) {
114                 if(m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
115                     return;
116                 }
117             }
118
119             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
120         }
121
122         fail("Member " + member + " is now down");
123     }
124
125     public void cleanup() {
126         if(!cleanedUp) {
127             cleanedUp = true;
128             kit.cleanup(configDataStore);
129             kit.cleanup(operDataStore);
130             kit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
131         }
132     }
133
134     public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
135             throws Exception {
136         ActorContext actorContext = datastore.getActorContext();
137
138         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
139         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
140
141         AssertionError lastError = null;
142         Stopwatch sw = Stopwatch.createStarted();
143         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
144             OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
145                     executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
146
147             try {
148                 verifier.verify(raftState);
149                 return;
150             } catch (AssertionError e) {
151                 lastError = e;
152                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
153             }
154         }
155
156         throw lastError;
157     }
158
159     public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
160             String... peerMemberNames) throws Exception {
161         final Set<String> peerIds = Sets.newHashSet();
162         for(String p: peerMemberNames) {
163             peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
164                 type(datastore.getActorContext().getDataStoreName()).build().toString());
165         }
166
167         verifyRaftState(datastore, shardName, new RaftStateVerifier() {
168             @Override
169             public void verify(OnDemandRaftState raftState) {
170                 assertEquals("Peers for shard " + shardName, peerIds, raftState.getPeerAddresses().keySet());
171             }
172         });
173     }
174
175     public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
176         Stopwatch sw = Stopwatch.createStarted();
177         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
178             Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
179             if(!shardReply.isPresent()) {
180                 return;
181             }
182
183             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
184         }
185
186         fail("Shard " + shardName + " is present");
187     }
188
189     public static class Builder {
190         private final List<MemberNode> members;
191         private String moduleShardsConfig;
192         private String akkaConfig;
193         private String[] waitForshardLeader = new String[0];
194         private String testName;
195         private SchemaContext schemaContext;
196         private boolean createOperDatastore = true;
197         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
198                 shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
199
200         Builder(List<MemberNode> members) {
201             this.members = members;
202         }
203
204         /**
205          * Specifies the name of the module shards config file. This is required.
206          *
207          * @return this Builder
208          */
209         public Builder moduleShardsConfig(String moduleShardsConfig) {
210             this.moduleShardsConfig = moduleShardsConfig;
211             return this;
212         }
213
214         /**
215          * Specifies the name of the akka configuration. This is required.
216          *
217          * @return this Builder
218          */
219         public Builder akkaConfig(String akkaConfig) {
220             this.akkaConfig = akkaConfig;
221             return this;
222         }
223
224         /**
225          * Specifies the name of the test that is appended to the data store names. This is required.
226          *
227          * @return this Builder
228          */
229         public Builder testName(String testName) {
230             this.testName = testName;
231             return this;
232         }
233
234         /**
235          * Specifies the optional names of the shards to initially wait for a leader to be elected.
236          *
237          * @return this Builder
238          */
239         public Builder waitForShardLeader(String... shardNames) {
240             this.waitForshardLeader = shardNames;
241             return this;
242         }
243
244         /**
245          * Specifies whether or not to create an operational data store. Defaults to true.
246          *
247          * @return this Builder
248          */
249         public Builder createOperDatastore(boolean value) {
250             this.createOperDatastore = value;
251             return this;
252         }
253
254         /**
255          * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
256          *
257          * @return this Builder
258          */
259         public Builder schemaContext(SchemaContext schemaContext) {
260             this.schemaContext = schemaContext;
261             return this;
262         }
263
264         /**
265          * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
266          *
267          * @return this Builder
268          */
269         public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
270             datastoreContextBuilder = builder;
271             return this;
272         }
273
274         public MemberNode build() {
275             Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
276             Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
277             Preconditions.checkNotNull(testName, "testName must be specified");
278
279             if(schemaContext == null) {
280                 schemaContext = SchemaContextHelper.full();
281             }
282
283             MemberNode node = new MemberNode();
284             node.datastoreContextBuilder = datastoreContextBuilder;
285
286             ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
287             Cluster.get(system).join(MEMBER_1_ADDRESS);
288
289             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
290
291             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName();
292             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
293             node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
294                     true, schemaContext, waitForshardLeader);
295
296             if(createOperDatastore) {
297                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
298                 node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
299                         true, schemaContext, waitForshardLeader);
300             }
301
302             members.add(node);
303             return node;
304         }
305     }
306
307     public static interface RaftStateVerifier {
308         void verify(OnDemandRaftState raftState);
309     }
310 }