Remove use of {String,UUID}Identifier
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.fail;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Optional;
21 import com.google.common.base.Preconditions;
22 import com.google.common.base.Stopwatch;
23 import com.google.common.collect.Sets;
24 import com.google.common.util.concurrent.Uninterruptibles;
25 import com.typesafe.config.ConfigFactory;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.TimeUnit;
29 import org.opendaylight.controller.cluster.access.concepts.MemberName;
30 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
31 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
32 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
33 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
34 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
35 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
36 import scala.concurrent.Await;
37 import scala.concurrent.Future;
38 import scala.concurrent.duration.Duration;
39
40 /**
41  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
42  * config and (optional) operational data store instances. The Builder is used to specify the setup
43  * parameters and create the data store instances. The actor system is automatically joined to address
44  * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
45  *
46  * @author Thomas Pantelis
47  */
48 public class MemberNode {
49     static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
50
51     private IntegrationTestKit kit;
52     private DistributedDataStore configDataStore;
53     private DistributedDataStore operDataStore;
54     private DatastoreContext.Builder datastoreContextBuilder;
55     private boolean cleanedUp;
56
57     /**
58      * Constructs a Builder.
59      *
60      * @param members the list to which the resulting MemberNode will be added. This makes it easier for
61      *                callers to cleanup instances on test completion.
62      * @return a Builder instance
63      */
64     public static Builder builder(List<MemberNode> members) {
65         return new Builder(members);
66     }
67
68     public IntegrationTestKit kit() {
69         return kit;
70     }
71
72
73     public DistributedDataStore configDataStore() {
74         return configDataStore;
75     }
76
77
78     public DistributedDataStore operDataStore() {
79         return operDataStore;
80     }
81
82     public DatastoreContext.Builder datastoreContextBuilder() {
83         return datastoreContextBuilder;
84     }
85
86     public void waitForMembersUp(String... otherMembers) {
87         kit.waitForMembersUp(otherMembers);
88     }
89
90     public void waitForMemberDown(String member) {
91         Stopwatch sw = Stopwatch.createStarted();
92         while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
93             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
94             for(Member m: state.getUnreachable()) {
95                 if(member.equals(m.getRoles().iterator().next())) {
96                     return;
97                 }
98             }
99
100             for(Member m: state.getMembers()) {
101                 if(m.status() != MemberStatus.up() && member.equals(m.getRoles().iterator().next())) {
102                     return;
103                 }
104             }
105
106             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
107         }
108
109         fail("Member " + member + " is now down");
110     }
111
112     public void cleanup() {
113         if(!cleanedUp) {
114             cleanedUp = true;
115             if (configDataStore != null) {
116                 configDataStore.close();
117             }
118             if (operDataStore != null) {
119                 operDataStore.close();
120             }
121
122             IntegrationTestKit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
123         }
124     }
125
126     public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
127             throws Exception {
128         ActorContext actorContext = datastore.getActorContext();
129
130         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
131         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
132
133         AssertionError lastError = null;
134         Stopwatch sw = Stopwatch.createStarted();
135         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
136             OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
137                     executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
138
139             try {
140                 verifier.verify(raftState);
141                 return;
142             } catch (AssertionError e) {
143                 lastError = e;
144                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
145             }
146         }
147
148         throw lastError;
149     }
150
151     public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
152             String... peerMemberNames) throws Exception {
153         final Set<String> peerIds = Sets.newHashSet();
154         for(String p: peerMemberNames) {
155             peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
156                 datastore.getActorContext().getDataStoreName()).toString());
157         }
158
159         verifyRaftState(datastore, shardName, raftState -> assertEquals("Peers for shard " + shardName, peerIds,
160             raftState.getPeerAddresses().keySet()));
161     }
162
163     public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
164         Stopwatch sw = Stopwatch.createStarted();
165         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
166             Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
167             if(!shardReply.isPresent()) {
168                 return;
169             }
170
171             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
172         }
173
174         fail("Shard " + shardName + " is present");
175     }
176
177     public static class Builder {
178         private final List<MemberNode> members;
179         private String moduleShardsConfig;
180         private String akkaConfig;
181         private String[] waitForshardLeader = new String[0];
182         private String testName;
183         private SchemaContext schemaContext;
184         private boolean createOperDatastore = true;
185         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
186                 shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
187
188         Builder(List<MemberNode> members) {
189             this.members = members;
190         }
191
192         /**
193          * Specifies the name of the module shards config file. This is required.
194          *
195          * @return this Builder
196          */
197         public Builder moduleShardsConfig(String moduleShardsConfig) {
198             this.moduleShardsConfig = moduleShardsConfig;
199             return this;
200         }
201
202         /**
203          * Specifies the name of the akka configuration. This is required.
204          *
205          * @return this Builder
206          */
207         public Builder akkaConfig(String akkaConfig) {
208             this.akkaConfig = akkaConfig;
209             return this;
210         }
211
212         /**
213          * Specifies the name of the test that is appended to the data store names. This is required.
214          *
215          * @return this Builder
216          */
217         public Builder testName(String testName) {
218             this.testName = testName;
219             return this;
220         }
221
222         /**
223          * Specifies the optional names of the shards to initially wait for a leader to be elected.
224          *
225          * @return this Builder
226          */
227         public Builder waitForShardLeader(String... shardNames) {
228             this.waitForshardLeader = shardNames;
229             return this;
230         }
231
232         /**
233          * Specifies whether or not to create an operational data store. Defaults to true.
234          *
235          * @return this Builder
236          */
237         public Builder createOperDatastore(boolean value) {
238             this.createOperDatastore = value;
239             return this;
240         }
241
242         /**
243          * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
244          *
245          * @return this Builder
246          */
247         public Builder schemaContext(SchemaContext schemaContext) {
248             this.schemaContext = schemaContext;
249             return this;
250         }
251
252         /**
253          * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
254          *
255          * @return this Builder
256          */
257         public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
258             datastoreContextBuilder = builder;
259             return this;
260         }
261
262         public MemberNode build() {
263             Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
264             Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
265             Preconditions.checkNotNull(testName, "testName must be specified");
266
267             if(schemaContext == null) {
268                 schemaContext = SchemaContextHelper.full();
269             }
270
271             MemberNode node = new MemberNode();
272             node.datastoreContextBuilder = datastoreContextBuilder;
273
274             ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
275             Cluster.get(system).join(MEMBER_1_ADDRESS);
276
277             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
278
279             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName().getName();
280             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
281             node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
282                     true, schemaContext, waitForshardLeader);
283
284             if(createOperDatastore) {
285                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
286                 node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
287                         true, schemaContext, waitForshardLeader);
288             }
289
290             members.add(node);
291             return node;
292         }
293     }
294
295     public static interface RaftStateVerifier {
296         void verify(OnDemandRaftState raftState);
297     }
298 }