Prevent non-voting member from becoming entity owner
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / DistributedEntityOwnershipIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNull;
12 import static org.junit.Assert.assertThat;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.AdditionalMatchers.or;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.reset;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
21 import static org.opendaylight.controller.cluster.datastore.entityownership.AbstractEntityOwnershipTest.ownershipChange;
22 import static org.opendaylight.controller.cluster.datastore.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
23 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
24 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityPath;
25
26 import akka.actor.ActorRef;
27 import akka.actor.Status.Failure;
28 import akka.actor.Status.Success;
29 import akka.cluster.Cluster;
30 import akka.pattern.Patterns;
31 import akka.testkit.JavaTestKit;
32 import akka.util.Timeout;
33 import com.google.common.base.Optional;
34 import com.google.common.base.Stopwatch;
35 import com.google.common.collect.ImmutableMap;
36 import com.google.common.collect.Iterables;
37 import com.google.common.collect.Lists;
38 import com.google.common.util.concurrent.Uninterruptibles;
39 import java.util.ArrayList;
40 import java.util.Arrays;
41 import java.util.Collection;
42 import java.util.List;
43 import java.util.concurrent.TimeUnit;
44 import org.junit.After;
45 import org.junit.Before;
46 import org.junit.Test;
47 import org.mockito.ArgumentCaptor;
48 import org.mockito.Mock;
49 import org.mockito.Mockito;
50 import org.mockito.MockitoAnnotations;
51 import org.mockito.exceptions.base.MockitoException;
52 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
53 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
54 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
55 import org.opendaylight.controller.cluster.datastore.MemberNode;
56 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
57 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
58 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
59 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
60 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
61 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
62 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
63 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
64 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
65 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
66 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
69 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
70 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
71 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
72 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
73 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
74 import scala.concurrent.Await;
75 import scala.concurrent.Future;
76 import scala.concurrent.duration.FiniteDuration;
77
78 /**
79  * End-to-end integration tests for the entity ownership functionality.
80  *
81  * @author Thomas Pantelis
82  */
83 public class DistributedEntityOwnershipIntegrationTest {
84     private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
85     private static final String MODULE_SHARDS_5_NODE_CONFIG = "module-shards-default-5-node.conf";
86     private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf";
87     private static final String ENTITY_TYPE1 = "entityType1";
88     private static final String ENTITY_TYPE2 = "entityType2";
89     private static final DOMEntity ENTITY1 = new DOMEntity(ENTITY_TYPE1, "entity1");
90     private static final DOMEntity ENTITY1_2 = new DOMEntity(ENTITY_TYPE2, "entity1");
91     private static final DOMEntity ENTITY2 = new DOMEntity(ENTITY_TYPE1, "entity2");
92     private static final DOMEntity ENTITY3 = new DOMEntity(ENTITY_TYPE1, "entity3");
93     private static final DOMEntity ENTITY4 = new DOMEntity(ENTITY_TYPE1, "entity4");
94     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
95
96     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
97             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
98                     .shardIsolatedLeaderCheckIntervalInMillis(1000000);
99
100     private final DatastoreContext.Builder followerDatastoreContextBuilder =
101             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
102
103     private final List<MemberNode> memberNodes = new ArrayList<>();
104
105     @Mock
106     private DOMEntityOwnershipListener leaderMockListener;
107
108     @Mock
109     private DOMEntityOwnershipListener leaderMockListener2;
110
111     @Mock
112     private DOMEntityOwnershipListener follower1MockListener;
113
114     @Mock
115     private DOMEntityOwnershipListener follower2MockListener;
116
117     @Before
118     public void setUp() {
119         MockitoAnnotations.initMocks(this);
120         InMemoryJournal.clear();
121         InMemorySnapshotStore.clear();
122     }
123
124     @After
125     public void tearDown() {
126         for (MemberNode m : Lists.reverse(memberNodes)) {
127             m.cleanup();
128         }
129         memberNodes.clear();
130     }
131
132     private static DistributedEntityOwnershipService newOwnershipService(final AbstractDataStore datastore) {
133         return DistributedEntityOwnershipService.start(datastore.getActorContext(),
134                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
135     }
136
137     @Test
138     public void testFunctionalityWithThreeNodes() throws Exception {
139         String name = "testFunctionalityWithThreeNodes";
140         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
141                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
142                 .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
143
144         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
145                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
146                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
147
148         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
149                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
150                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
151
152         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
153
154         leaderDistributedDataStore.waitTillReady();
155         follower1Node.configDataStore().waitTillReady();
156         follower2Node.configDataStore().waitTillReady();
157
158         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
159         final DOMEntityOwnershipService follower1EntityOwnershipService =
160                 newOwnershipService(follower1Node.configDataStore());
161         final DOMEntityOwnershipService follower2EntityOwnershipService =
162                 newOwnershipService(follower2Node.configDataStore());
163
164         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
165
166         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
167         leaderEntityOwnershipService.registerListener(ENTITY_TYPE2, leaderMockListener2);
168         follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener);
169
170         // Register leader candidate for entity1 and verify it becomes owner
171
172         leaderEntityOwnershipService.registerCandidate(ENTITY1);
173         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
174         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
175         reset(leaderMockListener, follower1MockListener);
176
177         verifyGetOwnershipState(leaderEntityOwnershipService, ENTITY1, EntityOwnershipState.IS_OWNER);
178         verifyGetOwnershipState(follower1EntityOwnershipService, ENTITY1, EntityOwnershipState.OWNED_BY_OTHER);
179
180         // Register leader candidate for entity1_2 (same id, different type) and verify it becomes owner
181
182         leaderEntityOwnershipService.registerCandidate(ENTITY1_2);
183         verify(leaderMockListener2, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1_2, false, true, true));
184         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
185         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1_2));
186         reset(leaderMockListener2);
187
188         // Register follower1 candidate for entity1 and verify it gets added but doesn't become owner
189
190         follower1EntityOwnershipService.registerCandidate(ENTITY1);
191         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1", "member-2");
192         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
193         verifyOwner(follower2Node.configDataStore(), ENTITY1, "member-1");
194         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
195         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
196         verify(follower1MockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
197
198         // Register follower1 candidate for entity2 and verify it becomes owner
199
200         final DOMEntityOwnershipCandidateRegistration follower1Entity2Reg =
201                 follower1EntityOwnershipService.registerCandidate(ENTITY2);
202         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
203         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
204         verifyOwner(follower2Node.configDataStore(), ENTITY2, "member-2");
205         reset(leaderMockListener, follower1MockListener);
206
207         // Register follower2 candidate for entity2 and verify it gets added but doesn't become owner
208
209         follower2EntityOwnershipService.registerListener(ENTITY_TYPE1, follower2MockListener);
210         verify(follower2MockListener, timeout(5000).times(2)).ownershipChanged(or(
211                 ownershipChange(ENTITY1, false, false, true), ownershipChange(ENTITY2, false, false, true)));
212
213         follower2EntityOwnershipService.registerCandidate(ENTITY2);
214         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-2", "member-3");
215         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-2");
216
217         // Unregister follower1 candidate for entity2 and verify follower2 becomes owner
218
219         follower1Entity2Reg.close();
220         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-3");
221         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-3");
222         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, true));
223         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
224
225         // Depending on timing, follower2MockListener could get ownershipChanged with "false, false, true" if
226         // if the original ownership change with "member-2 is replicated to follower2 after the listener is
227         // registered.
228         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
229         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
230
231         // Register follower1 candidate for entity3 and verify it becomes owner
232
233         follower1EntityOwnershipService.registerCandidate(ENTITY3);
234         verifyOwner(leaderDistributedDataStore, ENTITY3, "member-2");
235         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, true, true));
236         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, false, true));
237         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, false, true));
238
239         // Register follower2 candidate for entity4 and verify it becomes owner
240
241         follower2EntityOwnershipService.registerCandidate(ENTITY4);
242         verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
243         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, true, true));
244         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
245         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
246         reset(follower1MockListener, follower2MockListener);
247
248         // Register follower1 candidate for entity4 and verify it gets added but doesn't become owner
249
250         follower1EntityOwnershipService.registerCandidate(ENTITY4);
251         verifyCandidates(leaderDistributedDataStore, ENTITY4, "member-3", "member-2");
252         verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
253
254         // Shutdown follower2 and verify it's owned entities (entity 4) get re-assigned
255
256         reset(leaderMockListener, follower1MockListener);
257         follower2Node.cleanup();
258
259         verify(follower1MockListener, timeout(15000)).ownershipChanged(ownershipChange(ENTITY4, false, true, true));
260         verify(leaderMockListener, timeout(15000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
261
262         // Register leader candidate for entity2 and verify it becomes owner
263
264         DOMEntityOwnershipCandidateRegistration leaderEntity2Reg =
265                 leaderEntityOwnershipService.registerCandidate(ENTITY2);
266         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
267         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
268
269         // Unregister leader candidate for entity2 and verify the owner is cleared
270
271         leaderEntity2Reg.close();
272         verifyOwner(leaderDistributedDataStore, ENTITY2, "");
273         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, false));
274         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, false));
275     }
276
277     @Test
278     public void testLeaderEntityOwnersReassignedAfterShutdown() throws Exception {
279         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5)
280                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
281
282         String name = "testLeaderEntityOwnersReassignedAfterShutdown";
283         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
284                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
285                 .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
286
287         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
288                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
289                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
290
291         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
292                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
293                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
294
295         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
296
297         leaderDistributedDataStore.waitTillReady();
298         follower1Node.configDataStore().waitTillReady();
299         follower2Node.configDataStore().waitTillReady();
300
301         follower1Node.waitForMembersUp("member-1", "member-3");
302
303         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
304         final DOMEntityOwnershipService follower1EntityOwnershipService =
305                 newOwnershipService(follower1Node.configDataStore());
306         final DOMEntityOwnershipService follower2EntityOwnershipService =
307                 newOwnershipService(follower2Node.configDataStore());
308
309         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
310
311         // Register follower1 candidate for entity1 and verify it becomes owner
312
313         follower1EntityOwnershipService.registerCandidate(ENTITY1);
314         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
315
316         // Register leader candidate for entity1
317
318         leaderEntityOwnershipService.registerCandidate(ENTITY1);
319         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2", "member-1");
320         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
321
322         // Register leader candidate for entity2 and verify it becomes owner
323
324         leaderEntityOwnershipService.registerCandidate(ENTITY2);
325         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
326
327         // Register follower2 candidate for entity2
328
329         follower2EntityOwnershipService.registerCandidate(ENTITY2);
330         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3");
331         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
332
333         // Re-enable elections on all remaining followers so one becomes the new leader
334
335         ActorRef follower1Shard = IntegrationTestKit.findLocalShard(follower1Node.configDataStore().getActorContext(),
336                 ENTITY_OWNERSHIP_SHARD_NAME);
337         follower1Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
338                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
339
340         ActorRef follower2Shard = IntegrationTestKit.findLocalShard(follower2Node.configDataStore().getActorContext(),
341                 ENTITY_OWNERSHIP_SHARD_NAME);
342         follower2Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
343                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
344
345         // Shutdown the leader and verify its removed from the candidate list
346
347         leaderNode.cleanup();
348         follower1Node.waitForMemberDown("member-1");
349         follower2Node.waitForMemberDown("member-1");
350
351         // Verify the prior leader's entity owners are re-assigned.
352
353         verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2", "member-1");
354         verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-1", "member-3");
355         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
356         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
357     }
358
359     @Test
360     public void testLeaderAndFollowerEntityOwnersReassignedAfterShutdown() throws Exception {
361         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5)
362                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
363
364         String name = "testLeaderAndFollowerEntityOwnersReassignedAfterShutdown";
365         final MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1")
366                 .useAkkaArtery(false).testName(name)
367                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
368                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
369
370         final MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2")
371                 .useAkkaArtery(false).testName(name)
372                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
373                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
374
375         final MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3")
376                 .useAkkaArtery(false).testName(name)
377                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
378                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
379
380         final MemberNode follower3Node = MemberNode.builder(memberNodes).akkaConfig("Member4")
381                 .useAkkaArtery(false).testName(name)
382                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
383                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
384
385         final MemberNode follower4Node = MemberNode.builder(memberNodes).akkaConfig("Member5")
386                 .useAkkaArtery(false).testName(name)
387                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
388                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
389
390         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
391
392         leaderDistributedDataStore.waitTillReady();
393         follower1Node.configDataStore().waitTillReady();
394         follower2Node.configDataStore().waitTillReady();
395         follower3Node.configDataStore().waitTillReady();
396         follower4Node.configDataStore().waitTillReady();
397
398         leaderNode.waitForMembersUp("member-2", "member-3", "member-4", "member-5");
399         follower1Node.waitForMembersUp("member-1", "member-3", "member-4", "member-5");
400
401         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
402         final DOMEntityOwnershipService follower1EntityOwnershipService =
403                 newOwnershipService(follower1Node.configDataStore());
404         final DOMEntityOwnershipService follower2EntityOwnershipService =
405                 newOwnershipService(follower2Node.configDataStore());
406         final DOMEntityOwnershipService follower3EntityOwnershipService =
407                 newOwnershipService(follower3Node.configDataStore());
408         newOwnershipService(follower4Node.configDataStore());
409
410         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
411
412         // Register follower1 candidate for entity1 and verify it becomes owner
413
414         follower1EntityOwnershipService.registerCandidate(ENTITY1);
415         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
416
417         // Register leader candidate for entity1
418
419         leaderEntityOwnershipService.registerCandidate(ENTITY1);
420         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2", "member-1");
421         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
422
423         // Register leader candidate for entity2 and verify it becomes owner
424
425         leaderEntityOwnershipService.registerCandidate(ENTITY2);
426         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
427
428         // Register follower2 candidate for entity2
429
430         follower2EntityOwnershipService.registerCandidate(ENTITY2);
431         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3");
432         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
433
434         // Register follower3 as a candidate for entity2 as well
435
436         follower3EntityOwnershipService.registerCandidate(ENTITY2);
437         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3", "member-4");
438         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
439
440         // Re-enable elections on all remaining followers so one becomes the new leader
441
442         ActorRef follower1Shard = IntegrationTestKit.findLocalShard(follower1Node.configDataStore().getActorContext(),
443                 ENTITY_OWNERSHIP_SHARD_NAME);
444         follower1Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
445                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
446
447         ActorRef follower2Shard = IntegrationTestKit.findLocalShard(follower2Node.configDataStore().getActorContext(),
448                 ENTITY_OWNERSHIP_SHARD_NAME);
449         follower2Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
450                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
451
452         ActorRef follower4Shard = IntegrationTestKit.findLocalShard(follower4Node.configDataStore().getActorContext(),
453                 ENTITY_OWNERSHIP_SHARD_NAME);
454         follower4Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
455                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
456
457         // Shutdown the leader and follower3
458
459         leaderNode.cleanup();
460         follower3Node.cleanup();
461
462         follower1Node.waitForMemberDown("member-1");
463         follower1Node.waitForMemberDown("member-4");
464         follower2Node.waitForMemberDown("member-1");
465         follower2Node.waitForMemberDown("member-4");
466         follower4Node.waitForMemberDown("member-1");
467         follower4Node.waitForMemberDown("member-4");
468
469         // Verify the prior leader's and follower3 entity owners are re-assigned.
470
471         verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2", "member-1");
472         verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-1", "member-3", "member-4");
473         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
474         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
475     }
476
477     /**
478      * Reproduces bug <a href="https://bugs.opendaylight.org/show_bug.cgi?id=4554">4554</a>.
479      */
480     @Test
481     public void testCloseCandidateRegistrationInQuickSuccession() throws Exception {
482         String name = "testCloseCandidateRegistrationInQuickSuccession";
483         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
484                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
485                 .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
486
487         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
488                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
489                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
490
491         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
492                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
493                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
494
495         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
496
497         leaderDistributedDataStore.waitTillReady();
498         follower1Node.configDataStore().waitTillReady();
499         follower2Node.configDataStore().waitTillReady();
500
501         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
502         final DOMEntityOwnershipService follower1EntityOwnershipService =
503                 newOwnershipService(follower1Node.configDataStore());
504         final DOMEntityOwnershipService follower2EntityOwnershipService =
505                 newOwnershipService(follower2Node.configDataStore());
506
507         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
508
509         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
510         follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener);
511         follower2EntityOwnershipService.registerListener(ENTITY_TYPE1, follower2MockListener);
512
513         final DOMEntityOwnershipCandidateRegistration candidate1 =
514                 leaderEntityOwnershipService.registerCandidate(ENTITY1);
515         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
516
517         final DOMEntityOwnershipCandidateRegistration candidate2 =
518                 follower1EntityOwnershipService.registerCandidate(ENTITY1);
519         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
520
521         final DOMEntityOwnershipCandidateRegistration candidate3 =
522                 follower2EntityOwnershipService.registerCandidate(ENTITY1);
523         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
524
525         Mockito.reset(leaderMockListener, follower1MockListener, follower2MockListener);
526
527         ArgumentCaptor<DOMEntityOwnershipChange> leaderChangeCaptor =
528                 ArgumentCaptor.forClass(DOMEntityOwnershipChange.class);
529         ArgumentCaptor<DOMEntityOwnershipChange> follower1ChangeCaptor =
530                 ArgumentCaptor.forClass(DOMEntityOwnershipChange.class);
531         ArgumentCaptor<DOMEntityOwnershipChange> follower2ChangeCaptor =
532                 ArgumentCaptor.forClass(DOMEntityOwnershipChange.class);
533         doNothing().when(leaderMockListener).ownershipChanged(leaderChangeCaptor.capture());
534         doNothing().when(follower1MockListener).ownershipChanged(follower1ChangeCaptor.capture());
535         doNothing().when(follower2MockListener).ownershipChanged(follower2ChangeCaptor.capture());
536
537         candidate1.close();
538         candidate2.close();
539         candidate3.close();
540
541         boolean passed = false;
542         for (int i = 0; i < 100; i++) {
543             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
544             final Optional<EntityOwnershipState> leaderState = leaderEntityOwnershipService.getOwnershipState(ENTITY1);
545             final Optional<EntityOwnershipState> follower1State =
546                     follower1EntityOwnershipService.getOwnershipState(ENTITY1);
547             final Optional<EntityOwnershipState> follower2State =
548                     follower2EntityOwnershipService.getOwnershipState(ENTITY1);
549             final Optional<DOMEntityOwnershipChange> leaderChange = getValueSafely(leaderChangeCaptor);
550             final Optional<DOMEntityOwnershipChange> follower1Change = getValueSafely(follower1ChangeCaptor);
551             final Optional<DOMEntityOwnershipChange> follower2Change = getValueSafely(follower2ChangeCaptor);
552             if (!leaderState.isPresent() || leaderState.get() == EntityOwnershipState.NO_OWNER
553                     && follower1State.isPresent() && follower1State.get() == EntityOwnershipState.NO_OWNER
554                     && follower2State.isPresent() && follower2State.get() == EntityOwnershipState.NO_OWNER
555                     && leaderChange.isPresent() && !leaderChange.get().getState().hasOwner()
556                     && follower1Change.isPresent() && !follower1Change.get().getState().hasOwner()
557                     && follower2Change.isPresent() && !follower2Change.get().getState().hasOwner()) {
558                 passed = true;
559                 break;
560             }
561         }
562
563         assertTrue("No ownership change message was sent with hasOwner=false", passed);
564     }
565
566     private static Optional<DOMEntityOwnershipChange> getValueSafely(ArgumentCaptor<DOMEntityOwnershipChange> captor) {
567         try {
568             return Optional.fromNullable(captor.getValue());
569         } catch (MockitoException e) {
570             // No value was captured
571             return Optional.absent();
572         }
573     }
574
575     /**
576      * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local
577      * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring
578      * an AddShardReplica request to join it to an existing leader.
579      */
580     @Test
581     public void testEntityOwnershipShardBootstrapping() throws Exception {
582         String name = "testEntityOwnershipShardBootstrapping";
583         String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG;
584         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
585                 .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
586                 .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
587
588         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
589         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
590
591         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
592
593         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
594                 .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
595                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
596
597         AbstractDataStore follower1DistributedDataStore = follower1Node.configDataStore();
598         follower1DistributedDataStore.waitTillReady();
599
600         leaderNode.waitForMembersUp("member-2");
601         follower1Node.waitForMembersUp("member-1");
602
603         DOMEntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore);
604
605         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
606
607         // Register a candidate for follower1 - should get queued since follower1 has no leader
608         final DOMEntityOwnershipCandidateRegistration candidateReg =
609                 follower1EntityOwnershipService.registerCandidate(ENTITY1);
610         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
611         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
612
613         // Add replica in follower1
614         AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
615         follower1DistributedDataStore.getActorContext().getShardManager().tell(addReplica,
616                 follower1Node.kit().getRef());
617         Object reply = follower1Node.kit().expectMsgAnyClassOf(JavaTestKit.duration("5 sec"),
618                 Success.class, Failure.class);
619         if (reply instanceof Failure) {
620             throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause());
621         }
622
623         // The queued candidate registration should proceed
624         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
625         reset(leaderMockListener);
626
627         candidateReg.close();
628         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
629         reset(leaderMockListener);
630
631         // Restart follower1 and verify the entity ownership shard is re-instated by registering.
632         Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
633         follower1Node.cleanup();
634
635         follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
636                 .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
637                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
638         follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
639
640         follower1EntityOwnershipService.registerCandidate(ENTITY1);
641         verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
642
643         verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, raftState -> {
644             assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
645             assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
646             assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
647                     org.hamcrest.CoreMatchers.containsString("member-1"));
648         });
649     }
650
651     @Test
652     public void testOwnerSelectedOnRapidUnregisteringAndRegisteringOfCandidates() throws Exception {
653         String name = "testOwnerSelectedOnRapidUnregisteringAndRegisteringOfCandidates";
654         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
655                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
656                 .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
657
658         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
659                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
660                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
661
662         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
663                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
664                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
665
666         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
667
668         leaderDistributedDataStore.waitTillReady();
669         follower1Node.configDataStore().waitTillReady();
670         follower2Node.configDataStore().waitTillReady();
671
672         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
673         final DOMEntityOwnershipService follower1EntityOwnershipService =
674                 newOwnershipService(follower1Node.configDataStore());
675         newOwnershipService(follower2Node.configDataStore());
676
677         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
678
679         // Register leader candidate for entity1 and verify it becomes owner
680
681         DOMEntityOwnershipCandidateRegistration leaderEntity1Reg =
682                 leaderEntityOwnershipService.registerCandidate(ENTITY1);
683
684         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1");
685         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
686
687         leaderEntity1Reg.close();
688         follower1EntityOwnershipService.registerCandidate(ENTITY1);
689
690         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2");
691         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
692     }
693
694     @Test
695     public void testOwnerSelectedOnRapidRegisteringAndUnregisteringOfCandidates() throws Exception {
696         String name = "testOwnerSelectedOnRapidRegisteringAndUnregisteringOfCandidates";
697         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
698                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
699                 .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
700
701         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
702                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
703                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
704
705         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
706                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
707                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
708
709         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
710
711         leaderDistributedDataStore.waitTillReady();
712         follower1Node.configDataStore().waitTillReady();
713         follower2Node.configDataStore().waitTillReady();
714
715         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
716         final DOMEntityOwnershipService follower1EntityOwnershipService =
717                 newOwnershipService(follower1Node.configDataStore());
718         newOwnershipService(follower2Node.configDataStore());
719
720         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
721
722         // Register leader candidate for entity1 and verify it becomes owner
723
724         final DOMEntityOwnershipCandidateRegistration leaderEntity1Reg =
725                 leaderEntityOwnershipService.registerCandidate(ENTITY1);
726
727         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1");
728         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
729
730         follower1EntityOwnershipService.registerCandidate(ENTITY1);
731         leaderEntity1Reg.close();
732
733         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2");
734         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
735     }
736
737     @Test
738     public void testEntityOwnershipWithNonVotingMembers() throws Exception {
739         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5)
740                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
741
742         String name = "testEntityOwnershipWithNonVotingMembers";
743         final MemberNode member1LeaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1")
744                 .useAkkaArtery(false).testName(name)
745                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
746                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
747
748         final MemberNode member2FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member2")
749                 .useAkkaArtery(false).testName(name)
750                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
751                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
752
753         final MemberNode member3FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member3")
754                 .useAkkaArtery(false).testName(name)
755                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
756                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
757
758         final MemberNode member4FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member4")
759                 .useAkkaArtery(false).testName(name)
760                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
761                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
762
763         final MemberNode member5FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member5")
764                 .useAkkaArtery(false).testName(name)
765                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
766                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
767
768         AbstractDataStore leaderDistributedDataStore = member1LeaderNode.configDataStore();
769
770         leaderDistributedDataStore.waitTillReady();
771         member2FollowerNode.configDataStore().waitTillReady();
772         member3FollowerNode.configDataStore().waitTillReady();
773         member4FollowerNode.configDataStore().waitTillReady();
774         member5FollowerNode.configDataStore().waitTillReady();
775
776         member1LeaderNode.waitForMembersUp("member-2", "member-3", "member-4", "member-5");
777
778         final DOMEntityOwnershipService member3EntityOwnershipService =
779                 newOwnershipService(member3FollowerNode.configDataStore());
780         final DOMEntityOwnershipService member4EntityOwnershipService =
781                 newOwnershipService(member4FollowerNode.configDataStore());
782         final DOMEntityOwnershipService member5EntityOwnershipService =
783                 newOwnershipService(member5FollowerNode.configDataStore());
784
785         newOwnershipService(member1LeaderNode.configDataStore());
786         member1LeaderNode.kit().waitUntilLeader(member1LeaderNode.configDataStore().getActorContext(),
787                 ENTITY_OWNERSHIP_SHARD_NAME);
788
789         // Make member4 and member5 non-voting
790
791         Future<Object> future = Patterns.ask(leaderDistributedDataStore.getActorContext().getShardManager(),
792                 new ChangeShardMembersVotingStatus(ENTITY_OWNERSHIP_SHARD_NAME,
793                         ImmutableMap.of("member-4", false, "member-5", false)), new Timeout(10, TimeUnit.SECONDS));
794         Object response = Await.result(future, FiniteDuration.apply(10, TimeUnit.SECONDS));
795         if (response instanceof Throwable) {
796             throw new AssertionError("ChangeShardMembersVotingStatus failed", (Throwable)response);
797         }
798
799         assertNull("Expected null Success response. Actual " + response, response);
800
801         // Register member4 candidate for entity1 - it should not become owner since it's non-voting
802
803         member4EntityOwnershipService.registerCandidate(ENTITY1);
804         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-4");
805
806         // Register member5 candidate for entity2 - it should not become owner since it's non-voting
807
808         member5EntityOwnershipService.registerCandidate(ENTITY2);
809         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-5");
810
811         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
812         verifyOwner(leaderDistributedDataStore, ENTITY1, "");
813         verifyOwner(leaderDistributedDataStore, ENTITY2, "");
814
815         // Register member3 candidate for entity1 - it should become owner since it's voting
816
817         member3EntityOwnershipService.registerCandidate(ENTITY1);
818         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-4", "member-3");
819         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-3");
820
821         // Switch member4 and member5 back to voting and member3 non-voting. This should result in member4 and member5
822         // to become entity owners.
823
824         future = Patterns.ask(leaderDistributedDataStore.getActorContext().getShardManager(),
825                 new ChangeShardMembersVotingStatus(ENTITY_OWNERSHIP_SHARD_NAME,
826                         ImmutableMap.of("member-3", false, "member-4", true, "member-5", true)),
827                 new Timeout(10, TimeUnit.SECONDS));
828         response = Await.result(future, FiniteDuration.apply(10, TimeUnit.SECONDS));
829         if (response instanceof Throwable) {
830             throw new AssertionError("ChangeShardMembersVotingStatus failed", (Throwable)response);
831         }
832
833         assertNull("Expected null Success response. Actual " + response, response);
834
835         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-4");
836         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-5");
837     }
838
839     private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
840             final EntityOwnershipState expState) {
841         Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
842         assertEquals("getOwnershipState present", true, state.isPresent());
843         assertEquals("EntityOwnershipState", expState, state.get());
844     }
845
846     private static void verifyCandidates(final AbstractDataStore dataStore, final DOMEntity entity,
847             final String... expCandidates) throws Exception {
848         AssertionError lastError = null;
849         Stopwatch sw = Stopwatch.createStarted();
850         while (sw.elapsed(TimeUnit.MILLISECONDS) <= 10000) {
851             Optional<NormalizedNode<?, ?>> possible = dataStore.newReadOnlyTransaction()
852                     .read(entityPath(entity.getType(), entity.getIdentifier()).node(Candidate.QNAME))
853                     .get(5, TimeUnit.SECONDS);
854             try {
855                 assertEquals("Candidates not found for " + entity, true, possible.isPresent());
856                 Collection<String> actual = new ArrayList<>();
857                 for (MapEntryNode candidate: ((MapNode)possible.get()).getValue()) {
858                     actual.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
859                 }
860
861                 assertEquals("Candidates for " + entity, Arrays.asList(expCandidates), actual);
862                 return;
863             } catch (AssertionError e) {
864                 lastError = e;
865                 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
866             }
867         }
868
869         throw lastError;
870     }
871
872     @SuppressWarnings("checkstyle:IllegalCatch")
873     private static void verifyOwner(final AbstractDataStore dataStore, final DOMEntity entity,
874             final String expOwner) {
875         AbstractEntityOwnershipTest.verifyOwner(expOwner, entity.getType(), entity.getIdentifier(), path -> {
876             try {
877                 return dataStore.newReadOnlyTransaction().read(path).get(5, TimeUnit.SECONDS).get();
878             } catch (Exception e) {
879                 return null;
880             }
881         });
882     }
883 }