Remove unused exceptions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
23
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.function.Predicate;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
46 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
59 import org.opendaylight.controller.cluster.raft.RaftState;
60 import org.opendaylight.controller.cluster.raft.TestActorFactory;
61 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
73
74 /**
75  * Unit tests for EntityOwnershipShard.
76  *
77  * @author Thomas Pantelis
78  */
79 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
80     private static final String ENTITY_TYPE = "test type";
81     private static final YangInstanceIdentifier ENTITY_ID1 =
82             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
83     private static final YangInstanceIdentifier ENTITY_ID2 =
84             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
85     private static final YangInstanceIdentifier ENTITY_ID3 =
86             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
87     private static final YangInstanceIdentifier ENTITY_ID4 =
88             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
89     private static final YangInstanceIdentifier ENTITY_ID5 =
90             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
91     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
92     private static final String LOCAL_MEMBER_NAME = "local-member-1";
93     private static final String PEER_MEMBER_1_NAME = "peer-member-1";
94     private static final String PEER_MEMBER_2_NAME = "peer-member-2";
95
96     private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
97     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
98
99     @After
100     public void tearDown() {
101         actorFactory.close();
102     }
103
104     @Test
105     public void testOnRegisterCandidateLocal() {
106         testLog.info("testOnRegisterCandidateLocal starting");
107
108         ShardTestKit kit = new ShardTestKit(getSystem());
109
110         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
111
112         ShardTestKit.waitUntilLeader(shard);
113
114         YangInstanceIdentifier entityId = ENTITY_ID1;
115         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
116
117         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
118         kit.expectMsgClass(SuccessReply.class);
119
120         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122
123         testLog.info("testOnRegisterCandidateLocal ending");
124     }
125
126     @Test
127     public void testOnRegisterCandidateLocalWithNoInitialLeader() {
128         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
129
130         final ShardTestKit kit = new ShardTestKit(getSystem());
131
132         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
133
134         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
135         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
136
137         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
138                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
139         TestEntityOwnershipShard peerShard = peer.underlyingActor();
140         peerShard.startDroppingMessagesOfType(RequestVote.class);
141         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
142
143         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
144                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
145
146         YangInstanceIdentifier entityId = ENTITY_ID1;
147         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
148
149         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
150         kit.expectMsgClass(SuccessReply.class);
151
152         // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
153         peerShard.stopDroppingMessagesOfType(RequestVote.class);
154
155         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
156         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
157
158         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
159     }
160
161     @Test
162     public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
163         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
164
165         final ShardTestKit kit = new ShardTestKit(getSystem());
166
167         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
168                 .shardTransactionCommitTimeoutInSeconds(1);
169
170         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
171         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
172
173         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
174                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
175         TestEntityOwnershipShard peerShard = peer.underlyingActor();
176         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
177
178         // Drop AppendEntries so consensus isn't reached.
179         peerShard.startDroppingMessagesOfType(AppendEntries.class);
180
181         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
182                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
183
184         ShardTestKit.waitUntilLeader(leader);
185
186         YangInstanceIdentifier entityId = ENTITY_ID1;
187         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
188
189         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
190         kit.expectMsgClass(SuccessReply.class);
191
192         // Wait enough time for the commit to timeout.
193         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
194
195         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
196         // write being applied to the state.
197         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
198
199         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
201
202         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
203     }
204
205     @Test
206     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
207         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
208
209         final ShardTestKit kit = new ShardTestKit(getSystem());
210
211         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
212                 .shardIsolatedLeaderCheckIntervalInMillis(50);
213
214         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
215         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
216
217         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
218                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
219         TestEntityOwnershipShard peerShard = peer.underlyingActor();
220         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
221
222         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
223                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
224
225         ShardTestKit.waitUntilLeader(leader);
226
227         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
228         peerShard.startDroppingMessagesOfType(AppendEntries.class);
229         verifyRaftState(leader, state ->
230                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
231
232         YangInstanceIdentifier entityId = ENTITY_ID1;
233         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
234
235         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
236         kit.expectMsgClass(SuccessReply.class);
237
238         // Resume AppendEntries - the candidate write should now be committed.
239         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
240         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
241         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
242
243         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
244     }
245
246     @Test
247     public void testOnRegisterCandidateLocalWithRemoteLeader() {
248         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
249
250         ShardTestKit kit = new ShardTestKit(getSystem());
251
252         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
253                 .shardBatchedModificationCount(5);
254
255         ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
256         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
257         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
258                 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
259                 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
260         final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
261
262         TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
263                 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
264         local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
265
266         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
267         kit.expectMsgClass(SuccessReply.class);
268
269         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
270         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
271
272         // Test with initial commit timeout and subsequent retry.
273
274         local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
275         leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
276
277         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
278         kit.expectMsgClass(SuccessReply.class);
279
280         expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
281
282         // Send a bunch of registration messages quickly and verify.
283
284         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
285         clearMessages(leaderShard.collectorActor());
286
287         int max = 100;
288         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
289         for (int i = 1; i <= max; i++) {
290             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
291             entityIds.add(id);
292             local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
293         }
294
295         for (int i = 0; i < max; i++) {
296             verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
297         }
298
299         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
300     }
301
302     @Test
303     public void testOnUnregisterCandidateLocal() {
304         testLog.info("testOnUnregisterCandidateLocal starting");
305
306         ShardTestKit kit = new ShardTestKit(getSystem());
307         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
308         ShardTestKit.waitUntilLeader(shard);
309
310         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
311
312         // Register
313
314         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
315         kit.expectMsgClass(SuccessReply.class);
316
317         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
318         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
319
320         // Unregister
321
322         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
323         kit.expectMsgClass(SuccessReply.class);
324
325         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
326
327         // Register again
328
329         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
330         kit.expectMsgClass(SuccessReply.class);
331
332         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
333         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
334
335         testLog.info("testOnUnregisterCandidateLocal ending");
336     }
337
338     @Test
339     public void testOwnershipChanges() {
340         testLog.info("testOwnershipChanges starting");
341
342         final ShardTestKit kit = new ShardTestKit(getSystem());
343
344         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
345
346         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
347         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
348         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
349
350         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
351                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
352                     peerId1.toString());
353         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
354
355         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
356                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
357                     peerId2.toString());
358         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
359
360         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
361                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
362                     leaderId.toString());
363
364         ShardTestKit.waitUntilLeader(leader);
365
366         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
367
368         // Add a remote candidate
369
370         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
371         kit.expectMsgClass(SuccessReply.class);
372
373         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
374
375         // Register local
376
377         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
378         kit.expectMsgClass(SuccessReply.class);
379
380         // Verify the remote candidate becomes owner
381
382         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
383         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
384
385         // Add another remote candidate and verify ownership doesn't change
386
387         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
388         kit.expectMsgClass(SuccessReply.class);
389
390         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
391         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
392         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
393
394         // Remove the second remote candidate and verify ownership doesn't change
395
396         peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
397         kit.expectMsgClass(SuccessReply.class);
398
399         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
400         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
401         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
402
403         // Remove the first remote candidate and verify the local candidate becomes owner
404
405         peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
406         kit.expectMsgClass(SuccessReply.class);
407
408         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
409         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
410
411         // Add the second remote candidate back and verify ownership doesn't change
412
413         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
414         kit.expectMsgClass(SuccessReply.class);
415
416         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
417         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
418         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
419
420         // Unregister the local candidate and verify the second remote candidate becomes owner
421
422         leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
423         kit.expectMsgClass(SuccessReply.class);
424
425         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
426         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
427
428         testLog.info("testOwnershipChanges ending");
429     }
430
431     @Test
432     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
433         testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
434
435         final ShardTestKit kit = new ShardTestKit(getSystem());
436
437         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
438                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
439
440         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
441         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
442         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
443
444         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
445                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
446                     peerId1.toString());
447         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
448
449         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
450                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
451                     peerId2.toString());
452         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
453
454         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
455                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
456                     leaderId.toString());
457
458         verifyRaftState(leader, state ->
459                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
460
461         // Send PeerDown and PeerUp with no entities
462
463         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
464         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
465
466         // Add candidates for entity1 with the local leader as the owner
467
468         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
469         kit.expectMsgClass(SuccessReply.class);
470         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
471
472         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
473         kit.expectMsgClass(SuccessReply.class);
474         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
475
476         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
477         kit.expectMsgClass(SuccessReply.class);
478         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
479         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
480
481         // Add candidates for entity2 with peerMember2 as the owner
482
483         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
484         kit.expectMsgClass(SuccessReply.class);
485         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
486
487         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
488         kit.expectMsgClass(SuccessReply.class);
489         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
490         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
491
492         // Add candidates for entity3 with peerMember2 as the owner.
493
494         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
495         kit.expectMsgClass(SuccessReply.class);
496         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
497
498         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
499         kit.expectMsgClass(SuccessReply.class);
500         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
501
502         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
503         kit.expectMsgClass(SuccessReply.class);
504         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
505         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
506
507         // Add only candidate peerMember2 for entity4.
508
509         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
510         kit.expectMsgClass(SuccessReply.class);
511         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
512         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
513
514         // Add only candidate peerMember1 for entity5.
515
516         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
517         kit.expectMsgClass(SuccessReply.class);
518         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
519         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
520
521         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
522         // owner selected
523
524         kit.watch(peer2);
525         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
526         kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
527         kit.unwatch(peer2);
528
529         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
530         // Send PeerDown again - should be noop
531         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
532         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
533
534         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
535         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
536         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
537         // no other candidates for entity4 so peerMember2 should remain owner.
538         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
539
540         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
541         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
542         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
543         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
544
545         // Reinstate peerMember2
546
547         peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
548                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
549                     peerId2.toString());
550         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
551         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
552         // Send PeerUp again - should be noop
553         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
554         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
555
556         // peerMember2's candidates should be removed on startup.
557         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
558         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
559         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
560         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
561
562         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
563         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
564         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
565         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
566
567         // Add back candidate peerMember2 for entities 1, 2, & 3.
568
569         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
570         kit.expectMsgClass(SuccessReply.class);
571         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
572         kit.expectMsgClass(SuccessReply.class);
573         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
574         kit.expectMsgClass(SuccessReply.class);
575         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
576         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
577         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
578         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
579         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
580         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
581         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
582         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
583         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
584         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
585         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
586         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
587         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
588
589         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
590
591         kit.watch(peer1);
592         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
593         kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
594         kit.unwatch(peer1);
595         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
596
597         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
598
599         // Verify the reinstated peerMember2 is fully synced.
600
601         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
602         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
603         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
604         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
605
606         // Reinstate peerMember1 and verify no owner changes
607
608         peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
609                 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
610         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
611         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
612
613         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
614         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
615         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
616         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
617
618         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
619         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
620         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
621
622         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
623         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
624         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
625
626         // Verify the reinstated peerMember1 is fully synced.
627
628         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
629         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
630         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
631         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
632
633         AtomicLong leaderLastApplied = new AtomicLong();
634         verifyRaftState(leader, rs -> {
635             assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
636             leaderLastApplied.set(rs.getLastApplied());
637         });
638
639         verifyRaftState(peer2, rs -> {
640             assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex());
641         });
642
643         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
644         // the entities (1 and 3) previously owned by the local leader member.
645
646         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
647         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
648         peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
649
650         kit.watch(leader);
651         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
652         kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
653         kit.unwatch(leader);
654         peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
655         peer2.tell(TimeoutNow.INSTANCE, peer2);
656
657         verifyRaftState(peer2, state ->
658                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
659
660         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
661         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
662         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
663         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
664
665         testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
666     }
667
668     @Test
669     public void testLeaderIsolation() throws Exception {
670         testLog.info("testLeaderIsolation starting");
671
672         final ShardTestKit kit = new ShardTestKit(getSystem());
673
674         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
675         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
676         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
677
678         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
679                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
680
681         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
682                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
683                     peerId1.toString());
684         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
685
686         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
687                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
688                     peerId2.toString());
689         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
690
691         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
692                 .shardIsolatedLeaderCheckIntervalInMillis(500);
693
694         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
695                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
696                     leaderId.toString());
697
698         ShardTestKit.waitUntilLeader(leader);
699
700         // Add entity1 candidates for all members with the leader as the owner
701
702         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
703         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
704         kit.expectMsgClass(SuccessReply.class);
705         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
706
707         peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
708         kit.expectMsgClass(SuccessReply.class);
709         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
710
711         peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
712         kit.expectMsgClass(SuccessReply.class);
713         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
714
715         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
716         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
717         verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
718
719         // Add entity2 candidates for all members with peer1 as the owner
720
721         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
722         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
723         kit.expectMsgClass(SuccessReply.class);
724         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
725
726         peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
727         kit.expectMsgClass(SuccessReply.class);
728         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
729
730         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
731         kit.expectMsgClass(SuccessReply.class);
732         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
733
734         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
735         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
736         verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
737
738         // Add entity3 candidates for all members with peer2 as the owner
739
740         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
741         peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
742         kit.expectMsgClass(SuccessReply.class);
743         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
744
745         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
746         kit.expectMsgClass(SuccessReply.class);
747         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
748
749         peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
750         kit.expectMsgClass(SuccessReply.class);
751         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
752
753         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
754         verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
755         verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
756
757         // Add listeners on all members
758
759         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
760         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
761         kit.expectMsgClass(SuccessReply.class);
762         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
763                 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
764                 ownershipChange(entity3, false, false, true)));
765         reset(leaderListener);
766
767         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
768         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
769         kit.expectMsgClass(SuccessReply.class);
770         verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
771                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
772                 ownershipChange(entity3, false, false, true)));
773         reset(peer1Listener);
774
775         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
776         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
777         kit.expectMsgClass(SuccessReply.class);
778         verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
779                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
780                 ownershipChange(entity3, false, true, true)));
781         reset(peer2Listener);
782
783         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
784
785         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
786         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
787
788         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
789             ae -> ae.getLeaderId().equals(leaderId.toString()));
790         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
791
792         // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
793
794         peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
795
796         // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
797         // isolated peers.
798
799         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
800         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
801
802         verifyRaftState(leader, state ->
803                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
804
805         // Expect inJeopardy notification on the isolated leader.
806
807         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
808                 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
809                 ownershipChange(entity3, false, false, true, true)));
810         reset(leaderListener);
811
812         verifyRaftState(peer1, state ->
813                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
814
815         // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
816         // isolated leader.
817
818         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
819
820         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
821
822         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
823         reset(peer1Listener);
824
825         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
826         reset(peer2Listener);
827
828         // Remove the isolation.
829
830         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
831         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
832         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
833         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
834
835         // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
836
837         verifyRaftState(leader, state ->
838                 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
839
840         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
841                 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
842                 ownershipChange(entity3, false, false, true)));
843
844         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
845         verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
846
847         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
848         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
849         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
850
851         verifyNoMoreInteractions(leaderListener);
852         verifyNoMoreInteractions(peer1Listener);
853         verifyNoMoreInteractions(peer2Listener);
854
855         testLog.info("testLeaderIsolation ending");
856     }
857
858     @Test
859     public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
860         testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
861
862         final ShardTestKit kit = new ShardTestKit(getSystem());
863
864         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
865         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
866         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
867
868         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
869                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
870
871         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
872                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
873                 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
874         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
875
876         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
877                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
878                 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
879         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
880
881         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
882                 .shardIsolatedLeaderCheckIntervalInMillis(500);
883
884         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
885                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
886                 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
887
888         ShardTestKit.waitUntilLeader(leader);
889
890         // Add listeners on all members
891
892         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
893                 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
894         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
895         kit.expectMsgClass(SuccessReply.class);
896
897         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
898                 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
899         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
900         kit.expectMsgClass(SuccessReply.class);
901
902         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
903                 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
904         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
905         kit.expectMsgClass(SuccessReply.class);
906
907         // Drop the CandidateAdded message to the leader for now.
908
909         leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
910
911         // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
912         // assigned the owner.
913
914         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
915         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
916         kit.expectMsgClass(SuccessReply.class);
917         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
918         verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
919         verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
920
921         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
922         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
923         kit.expectMsgClass(SuccessReply.class);
924         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
925         verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
926         verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
927
928         // Capture the CandidateAdded messages.
929
930         final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
931                 CandidateAdded.class, 2);
932
933         // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
934         // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
935
936         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
937         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
938
939         // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
940
941         leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
942         leader.tell(candidateAdded.get(0), leader);
943         leader.tell(candidateAdded.get(1), leader);
944
945         expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
946             ae -> ae.getEntries().size() > 0);
947
948         // Verify no owner assigned.
949
950         verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
951         verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
952
953         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
954
955         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
956         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
957
958         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
959             ae -> ae.getLeaderId().equals(leaderId.toString()));
960         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
961
962         // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
963
964         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
965         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
966
967         // Verify the leader transitions to IsolatedLeader.
968
969         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
970                 state.getRaftState()));
971
972         // Send PeerDown to the new leader peer1.
973
974         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
975
976         // Make peer1 start an election and become leader by sending the TimeoutNow message.
977
978         peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
979
980         // Verify the peer1 transitions to Leader.
981
982         verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
983                 state.getRaftState()));
984
985         verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
986         verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
987
988         verifyNoMoreInteractions(peer1Listener);
989         verifyNoMoreInteractions(peer2Listener);
990
991         // Add candidate peer1 candidate for entity2.
992
993         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
994
995         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
996         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
997         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
998
999         reset(leaderListener, peer1Listener, peer2Listener);
1000
1001         // Remove the isolation.
1002
1003         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1004         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1005         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1006         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1007
1008         // Previous leader should switch to Follower.
1009
1010         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1011                 state.getRaftState()));
1012
1013         // Send PeerUp to peer1 and peer2.
1014
1015         peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1016         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1017
1018         // The previous leader should become the owner of entity1.
1019
1020         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1021
1022         // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1023         //     - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1024         //     - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1025         //     - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1026         //     - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1027         verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1028                 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1029                 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1030
1031         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1032         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1033
1034         // Verify entity2's owner doesn't change.
1035
1036         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1037         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1038
1039         verifyNoMoreInteractions(leaderListener);
1040         verifyNoMoreInteractions(peer1Listener);
1041         verifyNoMoreInteractions(peer2Listener);
1042
1043         testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1044     }
1045
1046     @Test
1047     public void testListenerRegistration() {
1048         testLog.info("testListenerRegistration starting");
1049
1050         ShardTestKit kit = new ShardTestKit(getSystem());
1051
1052         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1053         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1054
1055         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1056                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1057         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1058
1059         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1060                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1061
1062         ShardTestKit.waitUntilLeader(leader);
1063
1064         String otherEntityType = "otherEntityType";
1065         final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1066         final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1067         final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1068         final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1069         DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1070
1071         // Register listener
1072
1073         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1074         kit.expectMsgClass(SuccessReply.class);
1075
1076         // Register a couple candidates for the desired entity type and verify listener is notified.
1077
1078         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1079         kit.expectMsgClass(SuccessReply.class);
1080
1081         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1082
1083         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1084         kit.expectMsgClass(SuccessReply.class);
1085
1086         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1087         reset(listener);
1088
1089         // Register another candidate for another entity type and verify listener is not notified.
1090
1091         leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1092         kit.expectMsgClass(SuccessReply.class);
1093
1094         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1095         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1096
1097         // Register remote candidate for entity1
1098
1099         peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1100         kit.expectMsgClass(SuccessReply.class);
1101         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1102
1103         // Unregister the local candidate for entity1 and verify listener is notified
1104
1105         leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1106         kit.expectMsgClass(SuccessReply.class);
1107
1108         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1109         reset(listener);
1110
1111         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1112
1113         leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1114         kit.expectMsgClass(SuccessReply.class);
1115
1116         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1117         kit.expectMsgClass(SuccessReply.class);
1118
1119         verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1120         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1121         verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1122
1123         // Re-register the listener and verify it gets notified of currently owned entities
1124
1125         reset(listener);
1126
1127         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1128         kit.expectMsgClass(SuccessReply.class);
1129
1130         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1131                 ownershipChange(entity3, false, true, true)));
1132         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1133         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1134         verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1135
1136         testLog.info("testListenerRegistration ending");
1137     }
1138
1139     @Test
1140     public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
1141         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1142
1143         ShardTestKit kit = new ShardTestKit(getSystem());
1144         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1145                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1146
1147         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1148         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1149
1150         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1151                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1152         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1153
1154         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1155                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1156                 leaderId.toString());
1157
1158         ShardTestKit.waitUntilLeader(leader);
1159
1160         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1161
1162         // Add a remote candidate
1163
1164         peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1165         kit.expectMsgClass(SuccessReply.class);
1166
1167         // Register local
1168
1169         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1170         kit.expectMsgClass(SuccessReply.class);
1171
1172         // Verify the local candidate becomes owner
1173
1174         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1175         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1176         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1177
1178         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1179     }
1180
1181     @Test
1182     public void testDelayedEntityOwnerSelection() {
1183         testLog.info("testDelayedEntityOwnerSelection starting");
1184
1185         final ShardTestKit kit = new ShardTestKit(getSystem());
1186         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1187                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1188
1189         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1190
1191         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1192         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1193         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1194
1195         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1196                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1197                     peerId1.toString());
1198         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1199
1200         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1201                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1202                     peerId2.toString());
1203         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1204
1205         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1206                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1207                         builder.build()), leaderId.toString());
1208
1209         ShardTestKit.waitUntilLeader(leader);
1210
1211         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1212
1213         // Add a remote candidate
1214
1215         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1216         kit.expectMsgClass(SuccessReply.class);
1217
1218         // Register local
1219
1220         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1221         kit.expectMsgClass(SuccessReply.class);
1222
1223         // Verify the local candidate becomes owner
1224
1225         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1226         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1227         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1228
1229         testLog.info("testDelayedEntityOwnerSelection ending");
1230     }
1231
1232     private Props newLocalShardProps() {
1233         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1234     }
1235
1236     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1237             final String memberName) {
1238         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1239     }
1240
1241     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1242                                 final EntityOwnerSelectionStrategyConfig config) {
1243         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1244                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1245     }
1246
1247     private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1248             final String memberName) {
1249         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1250                 dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
1251                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1252                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1253     }
1254
1255     private Map<String, String> peerMap(final String... peerIds) {
1256         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1257         for (String peerId: peerIds) {
1258             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1259         }
1260
1261         return builder.build();
1262     }
1263
1264     private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1265         private final ActorRef collectorActor;
1266         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1267
1268         TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1269             super(builder);
1270             this.collectorActor = collectorActor;
1271         }
1272
1273         @SuppressWarnings({ "unchecked", "rawtypes" })
1274         @Override
1275         public void handleCommand(final Object message) {
1276             Predicate drop = dropMessagesOfType.get(message.getClass());
1277             if (drop == null || !drop.test(message)) {
1278                 super.handleCommand(message);
1279             }
1280
1281             if (collectorActor != null) {
1282                 collectorActor.tell(message, ActorRef.noSender());
1283             }
1284         }
1285
1286         void startDroppingMessagesOfType(final Class<?> msgClass) {
1287             dropMessagesOfType.put(msgClass, msg -> true);
1288         }
1289
1290         <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1291             dropMessagesOfType.put(msgClass, filter);
1292         }
1293
1294         void stopDroppingMessagesOfType(final Class<?> msgClass) {
1295             dropMessagesOfType.remove(msgClass);
1296         }
1297
1298         ActorRef collectorActor() {
1299             return collectorActor;
1300         }
1301
1302         static Props props(final Builder builder) {
1303             return props(builder, null);
1304         }
1305
1306         static Props props(final Builder builder, final ActorRef collectorActor) {
1307             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1308                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1309         }
1310     }
1311 }