Migrate Assert.assertThat()
[controller.git] / opendaylight / md-sal / sal-distributed-eos / src / test / java / org / opendaylight / controller / cluster / entityownership / DistributedEntityOwnershipIntegrationTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.entityownership;
9
10 import static org.hamcrest.MatcherAssert.assertThat;
11 import static org.junit.Assert.assertEquals;
12 import static org.junit.Assert.assertNull;
13 import static org.junit.Assert.assertTrue;
14 import static org.mockito.AdditionalMatchers.or;
15 import static org.mockito.Mockito.doNothing;
16 import static org.mockito.Mockito.never;
17 import static org.mockito.Mockito.reset;
18 import static org.mockito.Mockito.timeout;
19 import static org.mockito.Mockito.verify;
20 import static org.opendaylight.controller.cluster.datastore.MemberNode.verifyRaftState;
21 import static org.opendaylight.controller.cluster.entityownership.AbstractEntityOwnershipTest.ownershipChange;
22 import static org.opendaylight.controller.cluster.entityownership.DistributedEntityOwnershipService.ENTITY_OWNERSHIP_SHARD_NAME;
23 import static org.opendaylight.controller.cluster.entityownership.EntityOwnersModel.CANDIDATE_NAME_NODE_ID;
24 import static org.opendaylight.controller.cluster.entityownership.EntityOwnersModel.entityPath;
25
26 import akka.actor.ActorRef;
27 import akka.actor.Status.Failure;
28 import akka.actor.Status.Success;
29 import akka.cluster.Cluster;
30 import akka.pattern.Patterns;
31 import akka.util.Timeout;
32 import com.google.common.base.Stopwatch;
33 import com.google.common.collect.ImmutableMap;
34 import com.google.common.collect.Iterables;
35 import com.google.common.collect.Lists;
36 import com.google.common.util.concurrent.Uninterruptibles;
37 import java.util.ArrayList;
38 import java.util.Arrays;
39 import java.util.Collection;
40 import java.util.List;
41 import java.util.Optional;
42 import java.util.concurrent.TimeUnit;
43 import org.junit.After;
44 import org.junit.Before;
45 import org.junit.Test;
46 import org.mockito.ArgumentCaptor;
47 import org.mockito.Mock;
48 import org.mockito.Mockito;
49 import org.mockito.MockitoAnnotations;
50 import org.mockito.exceptions.base.MockitoException;
51 import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
52 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
53 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
54 import org.opendaylight.controller.cluster.datastore.MemberNode;
55 import org.opendaylight.controller.cluster.datastore.messages.AddShardReplica;
56 import org.opendaylight.controller.cluster.datastore.messages.ChangeShardMembersVotingStatus;
57 import org.opendaylight.controller.cluster.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
58 import org.opendaylight.controller.cluster.raft.policy.DisableElectionsRaftPolicy;
59 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
60 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
61 import org.opendaylight.mdsal.eos.common.api.EntityOwnershipState;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
63 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipCandidateRegistration;
64 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
65 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
66 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipService;
67 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.clustering.entity.owners.rev150804.entity.owners.entity.type.entity.Candidate;
68 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
69 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import scala.concurrent.Await;
72 import scala.concurrent.Future;
73 import scala.concurrent.duration.FiniteDuration;
74
75 /**
76  * End-to-end integration tests for the entity ownership functionality.
77  *
78  * @author Thomas Pantelis
79  */
80 public class DistributedEntityOwnershipIntegrationTest {
81     private static final String MODULE_SHARDS_CONFIG = "module-shards-default.conf";
82     private static final String MODULE_SHARDS_5_NODE_CONFIG = "module-shards-default-5-node.conf";
83     private static final String MODULE_SHARDS_MEMBER_1_CONFIG = "module-shards-default-member-1.conf";
84     private static final String ENTITY_TYPE1 = "entityType1";
85     private static final String ENTITY_TYPE2 = "entityType2";
86     private static final DOMEntity ENTITY1 = new DOMEntity(ENTITY_TYPE1, "entity1");
87     private static final DOMEntity ENTITY1_2 = new DOMEntity(ENTITY_TYPE2, "entity1");
88     private static final DOMEntity ENTITY2 = new DOMEntity(ENTITY_TYPE1, "entity2");
89     private static final DOMEntity ENTITY3 = new DOMEntity(ENTITY_TYPE1, "entity3");
90     private static final DOMEntity ENTITY4 = new DOMEntity(ENTITY_TYPE1, "entity4");
91     private final DatastoreContext.Builder leaderDatastoreContextBuilder =
92             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5)
93                     .shardIsolatedLeaderCheckIntervalInMillis(1000000);
94
95     private final DatastoreContext.Builder followerDatastoreContextBuilder =
96             DatastoreContext.newBuilder().shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
97
98     private final List<MemberNode> memberNodes = new ArrayList<>();
99
100     @Mock
101     private DOMEntityOwnershipListener leaderMockListener;
102
103     @Mock
104     private DOMEntityOwnershipListener leaderMockListener2;
105
106     @Mock
107     private DOMEntityOwnershipListener follower1MockListener;
108
109     @Mock
110     private DOMEntityOwnershipListener follower2MockListener;
111
112     @Before
113     public void setUp() {
114         MockitoAnnotations.initMocks(this);
115         InMemoryJournal.clear();
116         InMemorySnapshotStore.clear();
117     }
118
119     @After
120     public void tearDown() {
121         for (MemberNode m : Lists.reverse(memberNodes)) {
122             m.cleanup();
123         }
124         memberNodes.clear();
125     }
126
127     private static DistributedEntityOwnershipService newOwnershipService(final AbstractDataStore datastore) {
128         return DistributedEntityOwnershipService.start(datastore.getActorUtils(),
129                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
130     }
131
132     @Test
133     public void testFunctionalityWithThreeNodes() throws Exception {
134         String name = "testFunctionalityWithThreeNodes";
135         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
136                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
137                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
138
139         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
140                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
141                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
142
143         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
144                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
145                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
146
147         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
148
149         leaderDistributedDataStore.waitTillReady();
150         follower1Node.configDataStore().waitTillReady();
151         follower2Node.configDataStore().waitTillReady();
152
153         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
154         final DOMEntityOwnershipService follower1EntityOwnershipService =
155                 newOwnershipService(follower1Node.configDataStore());
156         final DOMEntityOwnershipService follower2EntityOwnershipService =
157                 newOwnershipService(follower2Node.configDataStore());
158
159         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorUtils(), ENTITY_OWNERSHIP_SHARD_NAME);
160
161         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
162         leaderEntityOwnershipService.registerListener(ENTITY_TYPE2, leaderMockListener2);
163         follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener);
164
165         // Register leader candidate for entity1 and verify it becomes owner
166
167         leaderEntityOwnershipService.registerCandidate(ENTITY1);
168         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
169         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
170         reset(leaderMockListener, follower1MockListener);
171
172         verifyGetOwnershipState(leaderEntityOwnershipService, ENTITY1, EntityOwnershipState.IS_OWNER);
173         verifyGetOwnershipState(follower1EntityOwnershipService, ENTITY1, EntityOwnershipState.OWNED_BY_OTHER);
174
175         // Register leader candidate for entity1_2 (same id, different type) and verify it becomes owner
176
177         leaderEntityOwnershipService.registerCandidate(ENTITY1_2);
178         verify(leaderMockListener2, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1_2, false, true, true));
179         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
180         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1_2));
181         reset(leaderMockListener2);
182
183         // Register follower1 candidate for entity1 and verify it gets added but doesn't become owner
184
185         follower1EntityOwnershipService.registerCandidate(ENTITY1);
186         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1", "member-2");
187         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
188         verifyOwner(follower2Node.configDataStore(), ENTITY1, "member-1");
189         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
190         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
191         verify(follower1MockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
192
193         // Register follower1 candidate for entity2 and verify it becomes owner
194
195         final DOMEntityOwnershipCandidateRegistration follower1Entity2Reg =
196                 follower1EntityOwnershipService.registerCandidate(ENTITY2);
197         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
198         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
199         verifyOwner(follower2Node.configDataStore(), ENTITY2, "member-2");
200         reset(leaderMockListener, follower1MockListener);
201
202         // Register follower2 candidate for entity2 and verify it gets added but doesn't become owner
203
204         follower2EntityOwnershipService.registerListener(ENTITY_TYPE1, follower2MockListener);
205         verify(follower2MockListener, timeout(5000).times(2)).ownershipChanged(or(
206                 ownershipChange(ENTITY1, false, false, true), ownershipChange(ENTITY2, false, false, true)));
207
208         follower2EntityOwnershipService.registerCandidate(ENTITY2);
209         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-2", "member-3");
210         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-2");
211
212         // Unregister follower1 candidate for entity2 and verify follower2 becomes owner
213
214         follower1Entity2Reg.close();
215         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-3");
216         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-3");
217         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, true));
218         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, true));
219
220         // Depending on timing, follower2MockListener could get ownershipChanged with "false, false, true" if
221         // if the original ownership change with "member-2 is replicated to follower2 after the listener is
222         // registered.
223         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
224         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
225
226         // Register follower1 candidate for entity3 and verify it becomes owner
227
228         follower1EntityOwnershipService.registerCandidate(ENTITY3);
229         verifyOwner(leaderDistributedDataStore, ENTITY3, "member-2");
230         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, true, true));
231         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, false, true));
232         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY3, false, false, true));
233
234         // Register follower2 candidate for entity4 and verify it becomes owner
235
236         follower2EntityOwnershipService.registerCandidate(ENTITY4);
237         verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
238         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, true, true));
239         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
240         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
241         reset(follower1MockListener, follower2MockListener);
242
243         // Register follower1 candidate for entity4 and verify it gets added but doesn't become owner
244
245         follower1EntityOwnershipService.registerCandidate(ENTITY4);
246         verifyCandidates(leaderDistributedDataStore, ENTITY4, "member-3", "member-2");
247         verifyOwner(leaderDistributedDataStore, ENTITY4, "member-3");
248
249         // Shutdown follower2 and verify it's owned entities (entity 4) get re-assigned
250
251         reset(leaderMockListener, follower1MockListener);
252         follower2Node.cleanup();
253
254         verify(follower1MockListener, timeout(15000)).ownershipChanged(ownershipChange(ENTITY4, false, true, true));
255         verify(leaderMockListener, timeout(15000)).ownershipChanged(ownershipChange(ENTITY4, false, false, true));
256
257         // Register leader candidate for entity2 and verify it becomes owner
258
259         DOMEntityOwnershipCandidateRegistration leaderEntity2Reg =
260                 leaderEntityOwnershipService.registerCandidate(ENTITY2);
261         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
262         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, true, true));
263
264         // Unregister leader candidate for entity2 and verify the owner is cleared
265
266         leaderEntity2Reg.close();
267         verifyOwner(leaderDistributedDataStore, ENTITY2, "");
268         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, true, false, false));
269         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY2, false, false, false));
270     }
271
272     @Test
273     public void testLeaderEntityOwnersReassignedAfterShutdown() throws Exception {
274         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5)
275                     .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
276
277         String name = "testLeaderEntityOwnersReassignedAfterShutdown";
278         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
279                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
280                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
281
282         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
283                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
284                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
285
286         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
287                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
288                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
289
290         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
291
292         leaderDistributedDataStore.waitTillReady();
293         follower1Node.configDataStore().waitTillReady();
294         follower2Node.configDataStore().waitTillReady();
295
296         follower1Node.waitForMembersUp("member-1", "member-3");
297
298         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
299         final DOMEntityOwnershipService follower1EntityOwnershipService =
300                 newOwnershipService(follower1Node.configDataStore());
301         final DOMEntityOwnershipService follower2EntityOwnershipService =
302                 newOwnershipService(follower2Node.configDataStore());
303
304         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorUtils(), ENTITY_OWNERSHIP_SHARD_NAME);
305
306         // Register follower1 candidate for entity1 and verify it becomes owner
307
308         follower1EntityOwnershipService.registerCandidate(ENTITY1);
309         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
310
311         // Register leader candidate for entity1
312
313         leaderEntityOwnershipService.registerCandidate(ENTITY1);
314         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2", "member-1");
315         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
316
317         // Register leader candidate for entity2 and verify it becomes owner
318
319         leaderEntityOwnershipService.registerCandidate(ENTITY2);
320         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
321
322         // Register follower2 candidate for entity2
323
324         follower2EntityOwnershipService.registerCandidate(ENTITY2);
325         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3");
326         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
327
328         // Re-enable elections on all remaining followers so one becomes the new leader
329
330         ActorRef follower1Shard = IntegrationTestKit.findLocalShard(follower1Node.configDataStore().getActorUtils(),
331                 ENTITY_OWNERSHIP_SHARD_NAME);
332         follower1Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
333                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
334
335         ActorRef follower2Shard = IntegrationTestKit.findLocalShard(follower2Node.configDataStore().getActorUtils(),
336                 ENTITY_OWNERSHIP_SHARD_NAME);
337         follower2Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
338                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
339
340         // Shutdown the leader and verify its removed from the candidate list
341
342         leaderNode.cleanup();
343         follower1Node.waitForMemberDown("member-1");
344         follower2Node.waitForMemberDown("member-1");
345
346         // Verify the prior leader's entity owners are re-assigned.
347
348         verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2", "member-1");
349         verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-1", "member-3");
350         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
351         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
352     }
353
354     @Test
355     public void testLeaderAndFollowerEntityOwnersReassignedAfterShutdown() throws Exception {
356         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5)
357                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
358
359         String name = "testLeaderAndFollowerEntityOwnersReassignedAfterShutdown";
360         final MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1")
361                 .useAkkaArtery(false).testName(name)
362                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
363                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
364
365         final MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2")
366                 .useAkkaArtery(false).testName(name)
367                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
368                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
369
370         final MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3")
371                 .useAkkaArtery(false).testName(name)
372                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
373                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
374
375         final MemberNode follower3Node = MemberNode.builder(memberNodes).akkaConfig("Member4")
376                 .useAkkaArtery(false).testName(name)
377                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
378                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
379
380         final MemberNode follower4Node = MemberNode.builder(memberNodes).akkaConfig("Member5")
381                 .useAkkaArtery(false).testName(name)
382                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
383                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
384
385         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
386
387         leaderDistributedDataStore.waitTillReady();
388         follower1Node.configDataStore().waitTillReady();
389         follower2Node.configDataStore().waitTillReady();
390         follower3Node.configDataStore().waitTillReady();
391         follower4Node.configDataStore().waitTillReady();
392
393         leaderNode.waitForMembersUp("member-2", "member-3", "member-4", "member-5");
394         follower1Node.waitForMembersUp("member-1", "member-3", "member-4", "member-5");
395
396         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
397         final DOMEntityOwnershipService follower1EntityOwnershipService =
398                 newOwnershipService(follower1Node.configDataStore());
399         final DOMEntityOwnershipService follower2EntityOwnershipService =
400                 newOwnershipService(follower2Node.configDataStore());
401         final DOMEntityOwnershipService follower3EntityOwnershipService =
402                 newOwnershipService(follower3Node.configDataStore());
403         newOwnershipService(follower4Node.configDataStore());
404
405         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorUtils(), ENTITY_OWNERSHIP_SHARD_NAME);
406
407         // Register follower1 candidate for entity1 and verify it becomes owner
408
409         follower1EntityOwnershipService.registerCandidate(ENTITY1);
410         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
411
412         // Register leader candidate for entity1
413
414         leaderEntityOwnershipService.registerCandidate(ENTITY1);
415         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2", "member-1");
416         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
417
418         // Register leader candidate for entity2 and verify it becomes owner
419
420         leaderEntityOwnershipService.registerCandidate(ENTITY2);
421         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
422
423         // Register follower2 candidate for entity2
424
425         follower2EntityOwnershipService.registerCandidate(ENTITY2);
426         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3");
427         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
428
429         // Register follower3 as a candidate for entity2 as well
430
431         follower3EntityOwnershipService.registerCandidate(ENTITY2);
432         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-1", "member-3", "member-4");
433         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-1");
434
435         // Re-enable elections on all remaining followers so one becomes the new leader
436
437         ActorRef follower1Shard = IntegrationTestKit.findLocalShard(follower1Node.configDataStore().getActorUtils(),
438                 ENTITY_OWNERSHIP_SHARD_NAME);
439         follower1Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
440                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
441
442         ActorRef follower2Shard = IntegrationTestKit.findLocalShard(follower2Node.configDataStore().getActorUtils(),
443                 ENTITY_OWNERSHIP_SHARD_NAME);
444         follower2Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
445                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
446
447         ActorRef follower4Shard = IntegrationTestKit.findLocalShard(follower4Node.configDataStore().getActorUtils(),
448                 ENTITY_OWNERSHIP_SHARD_NAME);
449         follower4Shard.tell(DatastoreContext.newBuilderFrom(followerDatastoreContextBuilder.build())
450                 .customRaftPolicyImplementation(null).build(), ActorRef.noSender());
451
452         // Shutdown the leader and follower3
453
454         leaderNode.cleanup();
455         follower3Node.cleanup();
456
457         follower1Node.waitForMemberDown("member-1");
458         follower1Node.waitForMemberDown("member-4");
459         follower2Node.waitForMemberDown("member-1");
460         follower2Node.waitForMemberDown("member-4");
461         follower4Node.waitForMemberDown("member-1");
462         follower4Node.waitForMemberDown("member-4");
463
464         // Verify the prior leader's and follower3 entity owners are re-assigned.
465
466         verifyCandidates(follower1Node.configDataStore(), ENTITY1, "member-2", "member-1");
467         verifyCandidates(follower1Node.configDataStore(), ENTITY2, "member-1", "member-3", "member-4");
468         verifyOwner(follower1Node.configDataStore(), ENTITY1, "member-2");
469         verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
470     }
471
472     /**
473      * Reproduces bug <a href="https://bugs.opendaylight.org/show_bug.cgi?id=4554">4554</a>.
474      */
475     @Test
476     public void testCloseCandidateRegistrationInQuickSuccession() throws Exception {
477         String name = "testCloseCandidateRegistrationInQuickSuccession";
478         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
479                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
480                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
481
482         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
483                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
484                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
485
486         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
487                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
488                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
489
490         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
491
492         leaderDistributedDataStore.waitTillReady();
493         follower1Node.configDataStore().waitTillReady();
494         follower2Node.configDataStore().waitTillReady();
495
496         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
497         final DOMEntityOwnershipService follower1EntityOwnershipService =
498                 newOwnershipService(follower1Node.configDataStore());
499         final DOMEntityOwnershipService follower2EntityOwnershipService =
500                 newOwnershipService(follower2Node.configDataStore());
501
502         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorUtils(), ENTITY_OWNERSHIP_SHARD_NAME);
503
504         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
505         follower1EntityOwnershipService.registerListener(ENTITY_TYPE1, follower1MockListener);
506         follower2EntityOwnershipService.registerListener(ENTITY_TYPE1, follower2MockListener);
507
508         final DOMEntityOwnershipCandidateRegistration candidate1 =
509                 leaderEntityOwnershipService.registerCandidate(ENTITY1);
510         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
511
512         final DOMEntityOwnershipCandidateRegistration candidate2 =
513                 follower1EntityOwnershipService.registerCandidate(ENTITY1);
514         verify(follower1MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
515
516         final DOMEntityOwnershipCandidateRegistration candidate3 =
517                 follower2EntityOwnershipService.registerCandidate(ENTITY1);
518         verify(follower2MockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
519
520         Mockito.reset(leaderMockListener, follower1MockListener, follower2MockListener);
521
522         ArgumentCaptor<DOMEntityOwnershipChange> leaderChangeCaptor =
523                 ArgumentCaptor.forClass(DOMEntityOwnershipChange.class);
524         ArgumentCaptor<DOMEntityOwnershipChange> follower1ChangeCaptor =
525                 ArgumentCaptor.forClass(DOMEntityOwnershipChange.class);
526         ArgumentCaptor<DOMEntityOwnershipChange> follower2ChangeCaptor =
527                 ArgumentCaptor.forClass(DOMEntityOwnershipChange.class);
528         doNothing().when(leaderMockListener).ownershipChanged(leaderChangeCaptor.capture());
529         doNothing().when(follower1MockListener).ownershipChanged(follower1ChangeCaptor.capture());
530         doNothing().when(follower2MockListener).ownershipChanged(follower2ChangeCaptor.capture());
531
532         candidate1.close();
533         candidate2.close();
534         candidate3.close();
535
536         boolean passed = false;
537         for (int i = 0; i < 100; i++) {
538             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
539             final Optional<EntityOwnershipState> leaderState = leaderEntityOwnershipService.getOwnershipState(ENTITY1);
540             final Optional<EntityOwnershipState> follower1State =
541                     follower1EntityOwnershipService.getOwnershipState(ENTITY1);
542             final Optional<EntityOwnershipState> follower2State =
543                     follower2EntityOwnershipService.getOwnershipState(ENTITY1);
544             final Optional<DOMEntityOwnershipChange> leaderChange = getValueSafely(leaderChangeCaptor);
545             final Optional<DOMEntityOwnershipChange> follower1Change = getValueSafely(follower1ChangeCaptor);
546             final Optional<DOMEntityOwnershipChange> follower2Change = getValueSafely(follower2ChangeCaptor);
547             if (!leaderState.isPresent() || leaderState.get() == EntityOwnershipState.NO_OWNER
548                     && follower1State.isPresent() && follower1State.get() == EntityOwnershipState.NO_OWNER
549                     && follower2State.isPresent() && follower2State.get() == EntityOwnershipState.NO_OWNER
550                     && leaderChange.isPresent() && !leaderChange.get().getState().hasOwner()
551                     && follower1Change.isPresent() && !follower1Change.get().getState().hasOwner()
552                     && follower2Change.isPresent() && !follower2Change.get().getState().hasOwner()) {
553                 passed = true;
554                 break;
555             }
556         }
557
558         assertTrue("No ownership change message was sent with hasOwner=false", passed);
559     }
560
561     private static Optional<DOMEntityOwnershipChange> getValueSafely(
562             final ArgumentCaptor<DOMEntityOwnershipChange> captor) {
563         try {
564             return Optional.ofNullable(captor.getValue());
565         } catch (MockitoException e) {
566             // No value was captured
567             return Optional.empty();
568         }
569     }
570
571     /**
572      * Tests bootstrapping the entity-ownership shard when there's no shards initially configured for local
573      * member. The entity-ownership shard is initially created as inactive (ie remains a follower), requiring
574      * an AddShardReplica request to join it to an existing leader.
575      */
576     @Test
577     public void testEntityOwnershipShardBootstrapping() throws Exception {
578         String name = "testEntityOwnershipShardBootstrapping";
579         String moduleShardsConfig = MODULE_SHARDS_MEMBER_1_CONFIG;
580         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
581                 .moduleShardsConfig(moduleShardsConfig).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
582                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
583
584         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
585         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
586
587         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorUtils(), ENTITY_OWNERSHIP_SHARD_NAME);
588
589         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
590                 .moduleShardsConfig(moduleShardsConfig).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
591                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
592
593         AbstractDataStore follower1DistributedDataStore = follower1Node.configDataStore();
594         follower1DistributedDataStore.waitTillReady();
595
596         leaderNode.waitForMembersUp("member-2");
597         follower1Node.waitForMembersUp("member-1");
598
599         DOMEntityOwnershipService follower1EntityOwnershipService = newOwnershipService(follower1DistributedDataStore);
600
601         leaderEntityOwnershipService.registerListener(ENTITY_TYPE1, leaderMockListener);
602
603         // Register a candidate for follower1 - should get queued since follower1 has no leader
604         final DOMEntityOwnershipCandidateRegistration candidateReg =
605                 follower1EntityOwnershipService.registerCandidate(ENTITY1);
606         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
607         verify(leaderMockListener, never()).ownershipChanged(ownershipChange(ENTITY1));
608
609         // Add replica in follower1
610         AddShardReplica addReplica = new AddShardReplica(ENTITY_OWNERSHIP_SHARD_NAME);
611         follower1DistributedDataStore.getActorUtils().getShardManager().tell(addReplica,
612                 follower1Node.kit().getRef());
613         Object reply = follower1Node.kit().expectMsgAnyClassOf(follower1Node.kit().duration("5 sec"),
614                 Success.class, Failure.class);
615         if (reply instanceof Failure) {
616             throw new AssertionError("AddShardReplica failed", ((Failure)reply).cause());
617         }
618
619         // The queued candidate registration should proceed
620         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
621         reset(leaderMockListener);
622
623         candidateReg.close();
624         verify(leaderMockListener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, false));
625         reset(leaderMockListener);
626
627         // Restart follower1 and verify the entity ownership shard is re-instated by registering.
628         Cluster.get(leaderNode.kit().getSystem()).down(Cluster.get(follower1Node.kit().getSystem()).selfAddress());
629         follower1Node.cleanup();
630
631         follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
632                 .moduleShardsConfig(moduleShardsConfig).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
633                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
634         follower1EntityOwnershipService = newOwnershipService(follower1Node.configDataStore());
635
636         follower1EntityOwnershipService.registerCandidate(ENTITY1);
637         verify(leaderMockListener, timeout(20000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
638
639         verifyRaftState(follower1Node.configDataStore(), ENTITY_OWNERSHIP_SHARD_NAME, raftState -> {
640             assertNull("Custom RaftPolicy class name", raftState.getCustomRaftPolicyClassName());
641             assertEquals("Peer count", 1, raftState.getPeerAddresses().keySet().size());
642             assertThat("Peer Id", Iterables.<String>getLast(raftState.getPeerAddresses().keySet()),
643                     org.hamcrest.CoreMatchers.containsString("member-1"));
644         });
645     }
646
647     @Test
648     public void testOwnerSelectedOnRapidUnregisteringAndRegisteringOfCandidates() throws Exception {
649         String name = "testOwnerSelectedOnRapidUnregisteringAndRegisteringOfCandidates";
650         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
651                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
652                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
653
654         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
655                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
656                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
657
658         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
659                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
660                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
661
662         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
663
664         leaderDistributedDataStore.waitTillReady();
665         follower1Node.configDataStore().waitTillReady();
666         follower2Node.configDataStore().waitTillReady();
667
668         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
669         final DOMEntityOwnershipService follower1EntityOwnershipService =
670                 newOwnershipService(follower1Node.configDataStore());
671         newOwnershipService(follower2Node.configDataStore());
672
673         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorUtils(), ENTITY_OWNERSHIP_SHARD_NAME);
674
675         // Register leader candidate for entity1 and verify it becomes owner
676
677         DOMEntityOwnershipCandidateRegistration leaderEntity1Reg =
678                 leaderEntityOwnershipService.registerCandidate(ENTITY1);
679
680         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1");
681         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
682
683         leaderEntity1Reg.close();
684         follower1EntityOwnershipService.registerCandidate(ENTITY1);
685
686         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2");
687         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
688     }
689
690     @Test
691     public void testOwnerSelectedOnRapidRegisteringAndUnregisteringOfCandidates() throws Exception {
692         String name = "testOwnerSelectedOnRapidRegisteringAndUnregisteringOfCandidates";
693         MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name)
694                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
695                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
696
697         MemberNode follower1Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name)
698                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
699                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
700
701         MemberNode follower2Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name)
702                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
703                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
704
705         AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
706
707         leaderDistributedDataStore.waitTillReady();
708         follower1Node.configDataStore().waitTillReady();
709         follower2Node.configDataStore().waitTillReady();
710
711         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
712         final DOMEntityOwnershipService follower1EntityOwnershipService =
713                 newOwnershipService(follower1Node.configDataStore());
714         newOwnershipService(follower2Node.configDataStore());
715
716         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorUtils(), ENTITY_OWNERSHIP_SHARD_NAME);
717
718         // Register leader candidate for entity1 and verify it becomes owner
719
720         final DOMEntityOwnershipCandidateRegistration leaderEntity1Reg =
721                 leaderEntityOwnershipService.registerCandidate(ENTITY1);
722
723         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-1");
724         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-1");
725
726         follower1EntityOwnershipService.registerCandidate(ENTITY1);
727         leaderEntity1Reg.close();
728
729         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-2");
730         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-2");
731     }
732
733     @Test
734     public void testEntityOwnershipWithNonVotingMembers() throws Exception {
735         followerDatastoreContextBuilder.shardElectionTimeoutFactor(5)
736                 .customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
737
738         String name = "testEntityOwnershipWithNonVotingMembers";
739         final MemberNode member1LeaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1")
740                 .useAkkaArtery(false).testName(name)
741                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
742                 .createOperDatastore(false).datastoreContextBuilder(leaderDatastoreContextBuilder).build();
743
744         final MemberNode member2FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member2")
745                 .useAkkaArtery(false).testName(name)
746                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
747                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
748
749         final MemberNode member3FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member3")
750                 .useAkkaArtery(false).testName(name)
751                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
752                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
753
754         final MemberNode member4FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member4")
755                 .useAkkaArtery(false).testName(name)
756                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
757                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
758
759         final MemberNode member5FollowerNode = MemberNode.builder(memberNodes).akkaConfig("Member5")
760                 .useAkkaArtery(false).testName(name)
761                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(EOSTestUtils.SCHEMA_CONTEXT)
762                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
763
764         AbstractDataStore leaderDistributedDataStore = member1LeaderNode.configDataStore();
765
766         leaderDistributedDataStore.waitTillReady();
767         member2FollowerNode.configDataStore().waitTillReady();
768         member3FollowerNode.configDataStore().waitTillReady();
769         member4FollowerNode.configDataStore().waitTillReady();
770         member5FollowerNode.configDataStore().waitTillReady();
771
772         member1LeaderNode.waitForMembersUp("member-2", "member-3", "member-4", "member-5");
773
774         final DOMEntityOwnershipService member3EntityOwnershipService =
775                 newOwnershipService(member3FollowerNode.configDataStore());
776         final DOMEntityOwnershipService member4EntityOwnershipService =
777                 newOwnershipService(member4FollowerNode.configDataStore());
778         final DOMEntityOwnershipService member5EntityOwnershipService =
779                 newOwnershipService(member5FollowerNode.configDataStore());
780
781         newOwnershipService(member1LeaderNode.configDataStore());
782         member1LeaderNode.kit().waitUntilLeader(member1LeaderNode.configDataStore().getActorUtils(),
783                 ENTITY_OWNERSHIP_SHARD_NAME);
784
785         // Make member4 and member5 non-voting
786
787         Future<Object> future = Patterns.ask(leaderDistributedDataStore.getActorUtils().getShardManager(),
788                 new ChangeShardMembersVotingStatus(ENTITY_OWNERSHIP_SHARD_NAME,
789                         ImmutableMap.of("member-4", Boolean.FALSE, "member-5", Boolean.FALSE)),
790                 new Timeout(10, TimeUnit.SECONDS));
791         Object response = Await.result(future, FiniteDuration.apply(10, TimeUnit.SECONDS));
792         if (response instanceof Throwable) {
793             throw new AssertionError("ChangeShardMembersVotingStatus failed", (Throwable)response);
794         }
795
796         assertNull("Expected null Success response. Actual " + response, response);
797
798         // Register member4 candidate for entity1 - it should not become owner since it's non-voting
799
800         member4EntityOwnershipService.registerCandidate(ENTITY1);
801         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-4");
802
803         // Register member5 candidate for entity2 - it should not become owner since it's non-voting
804
805         member5EntityOwnershipService.registerCandidate(ENTITY2);
806         verifyCandidates(leaderDistributedDataStore, ENTITY2, "member-5");
807
808         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
809         verifyOwner(leaderDistributedDataStore, ENTITY1, "");
810         verifyOwner(leaderDistributedDataStore, ENTITY2, "");
811
812         // Register member3 candidate for entity1 - it should become owner since it's voting
813
814         member3EntityOwnershipService.registerCandidate(ENTITY1);
815         verifyCandidates(leaderDistributedDataStore, ENTITY1, "member-4", "member-3");
816         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-3");
817
818         // Switch member4 and member5 back to voting and member3 non-voting. This should result in member4 and member5
819         // to become entity owners.
820
821         future = Patterns.ask(leaderDistributedDataStore.getActorUtils().getShardManager(),
822                 new ChangeShardMembersVotingStatus(ENTITY_OWNERSHIP_SHARD_NAME,
823                         ImmutableMap.of("member-3", Boolean.FALSE, "member-4", Boolean.TRUE, "member-5", Boolean.TRUE)),
824                 new Timeout(10, TimeUnit.SECONDS));
825         response = Await.result(future, FiniteDuration.apply(10, TimeUnit.SECONDS));
826         if (response instanceof Throwable) {
827             throw new AssertionError("ChangeShardMembersVotingStatus failed", (Throwable)response);
828         }
829
830         assertNull("Expected null Success response. Actual " + response, response);
831
832         verifyOwner(leaderDistributedDataStore, ENTITY1, "member-4");
833         verifyOwner(leaderDistributedDataStore, ENTITY2, "member-5");
834     }
835
836     private static void verifyGetOwnershipState(final DOMEntityOwnershipService service, final DOMEntity entity,
837             final EntityOwnershipState expState) {
838         Optional<EntityOwnershipState> state = service.getOwnershipState(entity);
839         assertTrue("getOwnershipState present", state.isPresent());
840         assertEquals("EntityOwnershipState", expState, state.get());
841     }
842
843     private static void verifyCandidates(final AbstractDataStore dataStore, final DOMEntity entity,
844             final String... expCandidates) throws Exception {
845         AssertionError lastError = null;
846         Stopwatch sw = Stopwatch.createStarted();
847         while (sw.elapsed(TimeUnit.MILLISECONDS) <= 10000) {
848             Optional<NormalizedNode<?, ?>> possible = dataStore.newReadOnlyTransaction()
849                     .read(entityPath(entity.getType(), entity.getIdentifier()).node(Candidate.QNAME))
850                     .get(5, TimeUnit.SECONDS);
851             try {
852                 assertTrue("Candidates not found for " + entity, possible.isPresent());
853                 Collection<String> actual = new ArrayList<>();
854                 for (MapEntryNode candidate: ((MapNode)possible.get()).getValue()) {
855                     actual.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
856                 }
857
858                 assertEquals("Candidates for " + entity, Arrays.asList(expCandidates), actual);
859                 return;
860             } catch (AssertionError e) {
861                 lastError = e;
862                 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
863             }
864         }
865
866         throw lastError;
867     }
868
869     @SuppressWarnings("checkstyle:IllegalCatch")
870     private static void verifyOwner(final AbstractDataStore dataStore, final DOMEntity entity,
871             final String expOwner) {
872         AbstractEntityOwnershipTest.verifyOwner(expOwner, entity.getType(), entity.getIdentifier(), path -> {
873             try {
874                 return dataStore.newReadOnlyTransaction().read(path).get(5, TimeUnit.SECONDS).get();
875             } catch (Exception e) {
876                 return null;
877             }
878         });
879     }
880 }