Bug 2187: Implement add-replicas-for-all-shards RPC
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / MemberNode.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore;
9
10 import static org.junit.Assert.assertTrue;
11 import static org.junit.Assert.fail;
12 import akka.actor.ActorRef;
13 import akka.actor.ActorSystem;
14 import akka.actor.Address;
15 import akka.actor.AddressFromURIString;
16 import akka.cluster.Cluster;
17 import akka.cluster.ClusterEvent.CurrentClusterState;
18 import akka.cluster.Member;
19 import akka.cluster.MemberStatus;
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.Sets;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import com.typesafe.config.ConfigFactory;
25 import java.util.List;
26 import java.util.Set;
27 import java.util.concurrent.TimeUnit;
28 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
29 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
30 import org.opendaylight.controller.cluster.raft.client.messages.GetOnDemandRaftState;
31 import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
32 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
33 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
34 import scala.concurrent.Await;
35 import scala.concurrent.Future;
36 import scala.concurrent.duration.Duration;
37
38 /**
39  * Class that represents a cluster member node for unit tests. It encapsulates an actor system with
40  * config and (optional) operational data store instances. The Builder is used to specify the setup
41  * parameters and create the data store instances. The actor system is automatically joined to address
42  * 127.0.0.1:2558 so one member must specify an akka cluster configuration with that address.
43  *
44  * @author Thomas Pantelis
45  */
46 public class MemberNode {
47     static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
48
49     private IntegrationTestKit kit;
50     private DistributedDataStore configDataStore;
51     private DistributedDataStore operDataStore;
52     private DatastoreContext.Builder datastoreContextBuilder;
53     private boolean cleanedUp;
54
55     /**
56      * Constructs a Builder.
57      *
58      * @param members the list to which the resulting MemberNode will be added. This makes it easier for
59      *                callers to cleanup instances on test completion.
60      * @return a Builder instance
61      */
62     public static Builder builder(List<MemberNode> members) {
63         return new Builder(members);
64     }
65
66     public IntegrationTestKit kit() {
67         return kit;
68     }
69
70
71     public DistributedDataStore configDataStore() {
72         return configDataStore;
73     }
74
75
76     public DistributedDataStore operDataStore() {
77         return operDataStore;
78     }
79
80     public DatastoreContext.Builder datastoreContextBuilder() {
81         return datastoreContextBuilder;
82     }
83
84     public void waitForMembersUp(String... otherMembers) {
85         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
86         Stopwatch sw = Stopwatch.createStarted();
87         while(sw.elapsed(TimeUnit.SECONDS) <= 10) {
88             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
89             for(Member m: state.getMembers()) {
90                 if(m.status() == MemberStatus.up() && otherMembersSet.remove(m.getRoles().iterator().next()) &&
91                         otherMembersSet.isEmpty()) {
92                     return;
93                 }
94             }
95
96             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
97         }
98
99         fail("Member(s) " + otherMembersSet + " are not Up");
100     }
101
102     public void cleanup() {
103         if(!cleanedUp) {
104             cleanedUp = true;
105             kit.cleanup(configDataStore);
106             kit.cleanup(operDataStore);
107             kit.shutdownActorSystem(kit.getSystem(), Boolean.TRUE);
108         }
109     }
110
111     public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
112             throws Exception {
113         ActorContext actorContext = datastore.getActorContext();
114
115         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
116         ActorRef shardActor = Await.result(future, Duration.create(10, TimeUnit.SECONDS));
117
118         AssertionError lastError = null;
119         Stopwatch sw = Stopwatch.createStarted();
120         while(sw.elapsed(TimeUnit.SECONDS) <= 5) {
121             OnDemandRaftState raftState = (OnDemandRaftState)actorContext.
122                     executeOperation(shardActor, GetOnDemandRaftState.INSTANCE);
123
124             try {
125                 verifier.verify(raftState);
126                 return;
127             } catch (AssertionError e) {
128                 lastError = e;
129                 Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
130             }
131         }
132
133         throw lastError;
134     }
135
136     public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
137             String... peerMemberNames) throws Exception {
138         final Set<String> peerIds = Sets.newHashSet();
139         for(String p: peerMemberNames) {
140             peerIds.add(ShardIdentifier.builder().memberName(p).shardName(shardName).
141                 type(datastore.getActorContext().getDataStoreType()).build().toString());
142         }
143
144         verifyRaftState(datastore, shardName, new RaftStateVerifier() {
145             @Override
146             public void verify(OnDemandRaftState raftState) {
147                 assertTrue(String.format("Peer(s) %s not found for shard %s. Actual: %s", peerIds, shardName,
148                         raftState.getPeerAddresses().keySet()),
149                         raftState.getPeerAddresses().keySet().containsAll(peerIds));
150             }
151         });
152     }
153
154     public static class Builder {
155         private final List<MemberNode> members;
156         private String moduleShardsConfig;
157         private String akkaConfig;
158         private String[] waitForshardLeader = new String[0];
159         private String testName;
160         private SchemaContext schemaContext;
161         private boolean createOperDatastore = true;
162         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder().
163                 shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
164
165         Builder(List<MemberNode> members) {
166             this.members = members;
167         }
168
169         /**
170          * Specifies the name of the module shards config file. This is required.
171          *
172          * @return this Builder
173          */
174         public Builder moduleShardsConfig(String moduleShardsConfig) {
175             this.moduleShardsConfig = moduleShardsConfig;
176             return this;
177         }
178
179         /**
180          * Specifies the name of the akka configuration. This is required.
181          *
182          * @return this Builder
183          */
184         public Builder akkaConfig(String akkaConfig) {
185             this.akkaConfig = akkaConfig;
186             return this;
187         }
188
189         /**
190          * Specifies the name of the test that is appended to the data store names. This is required.
191          *
192          * @return this Builder
193          */
194         public Builder testName(String testName) {
195             this.testName = testName;
196             return this;
197         }
198
199         /**
200          * Specifies the optional names of the shards to initially wait for a leader to be elected.
201          *
202          * @return this Builder
203          */
204         public Builder waitForShardLeader(String... shardNames) {
205             this.waitForshardLeader = shardNames;
206             return this;
207         }
208
209         /**
210          * Specifies whether or not to create an operational data store. Defaults to true.
211          *
212          * @return this Builder
213          */
214         public Builder createOperDatastore(boolean value) {
215             this.createOperDatastore = value;
216             return this;
217         }
218
219         /**
220          * Specifies the SchemaContext for the data stores. Defaults to SchemaContextHelper.full().
221          *
222          * @return this Builder
223          */
224         public Builder schemaContext(SchemaContext schemaContext) {
225             this.schemaContext = schemaContext;
226             return this;
227         }
228
229         /**
230          * Specifies the DatastoreContext Builder. If not specified, a default instance is used.
231          *
232          * @return this Builder
233          */
234         public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
235             datastoreContextBuilder = builder;
236             return this;
237         }
238
239         public MemberNode build() {
240             Preconditions.checkNotNull(moduleShardsConfig, "moduleShardsConfig must be specified");
241             Preconditions.checkNotNull(akkaConfig, "akkaConfig must be specified");
242             Preconditions.checkNotNull(testName, "testName must be specified");
243
244             if(schemaContext == null) {
245                 schemaContext = SchemaContextHelper.full();
246             }
247
248             MemberNode node = new MemberNode();
249             node.datastoreContextBuilder = datastoreContextBuilder;
250
251             ActorSystem system = ActorSystem.create("cluster-test", ConfigFactory.load().getConfig(akkaConfig));
252             Cluster.get(system).join(MEMBER_1_ADDRESS);
253
254             node.kit = new IntegrationTestKit(system, datastoreContextBuilder);
255
256             String memberName = new ClusterWrapperImpl(system).getCurrentMemberName();
257             node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-config-" + memberName);
258             node.configDataStore = node.kit.setupDistributedDataStore("config_" + testName, moduleShardsConfig,
259                     true, schemaContext, waitForshardLeader);
260
261             if(createOperDatastore) {
262                 node.kit.getDatastoreContextBuilder().shardManagerPersistenceId("shard-manager-oper-" + memberName);
263                 node.operDataStore = node.kit.setupDistributedDataStore("oper_" + testName, moduleShardsConfig,
264                         true, schemaContext, waitForshardLeader);
265             }
266
267             members.add(node);
268             return node;
269         }
270     }
271
272     public static interface RaftStateVerifier {
273         void verify(OnDemandRaftState raftState);
274     }
275 }