Split out sal-distributed-eos
[controller.git] / opendaylight / md-sal / sal-distributed-eos / src / test / java / org / opendaylight / controller / cluster / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
23
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.function.Predicate;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.access.concepts.MemberName;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
49 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
50 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
52 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
53 import org.opendaylight.controller.cluster.entityownership.messages.CandidateAdded;
54 import org.opendaylight.controller.cluster.entityownership.messages.RegisterCandidateLocal;
55 import org.opendaylight.controller.cluster.entityownership.messages.RegisterListenerLocal;
56 import org.opendaylight.controller.cluster.entityownership.messages.UnregisterCandidateLocal;
57 import org.opendaylight.controller.cluster.entityownership.messages.UnregisterListenerLocal;
58 import org.opendaylight.controller.cluster.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
59 import org.opendaylight.controller.cluster.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
60 import org.opendaylight.controller.cluster.raft.RaftState;
61 import org.opendaylight.controller.cluster.raft.TestActorFactory;
62 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
63 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72
73 /**
74  * Unit tests for EntityOwnershipShard.
75  *
76  * @author Thomas Pantelis
77  */
78 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
79     private static final String ENTITY_TYPE = "test type";
80     private static final YangInstanceIdentifier ENTITY_ID1 =
81             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
82     private static final YangInstanceIdentifier ENTITY_ID2 =
83             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
84     private static final YangInstanceIdentifier ENTITY_ID3 =
85             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
86     private static final YangInstanceIdentifier ENTITY_ID4 =
87             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
88     private static final YangInstanceIdentifier ENTITY_ID5 =
89             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
90     private static final String LOCAL_MEMBER_NAME = "local-member-1";
91     private static final String PEER_MEMBER_1_NAME = "peer-member-1";
92     private static final String PEER_MEMBER_2_NAME = "peer-member-2";
93
94     private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
95     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
96
97     @After
98     public void tearDown() {
99         actorFactory.close();
100     }
101
102     @Test
103     public void testOnRegisterCandidateLocal() {
104         testLog.info("testOnRegisterCandidateLocal starting");
105
106         ShardTestKit kit = new ShardTestKit(getSystem());
107
108         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
109
110         ShardTestKit.waitUntilLeader(shard);
111
112         YangInstanceIdentifier entityId = ENTITY_ID1;
113         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
114
115         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
116         kit.expectMsgClass(SuccessReply.class);
117
118         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
119         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
120
121         testLog.info("testOnRegisterCandidateLocal ending");
122     }
123
124     @Test
125     public void testOnRegisterCandidateLocalWithNoInitialLeader() {
126         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
127
128         final ShardTestKit kit = new ShardTestKit(getSystem());
129
130         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
131
132         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
133         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
134
135         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
136                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
137         TestEntityOwnershipShard peerShard = peer.underlyingActor();
138         peerShard.startDroppingMessagesOfType(RequestVote.class);
139         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
140
141         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
142                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
143
144         YangInstanceIdentifier entityId = ENTITY_ID1;
145         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
146
147         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
148         kit.expectMsgClass(SuccessReply.class);
149
150         // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
151         peerShard.stopDroppingMessagesOfType(RequestVote.class);
152
153         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
154         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
155
156         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
157     }
158
159     @Test
160     public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
161         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
162
163         final ShardTestKit kit = new ShardTestKit(getSystem());
164
165         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
166                 .shardTransactionCommitTimeoutInSeconds(1);
167
168         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
169         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
170
171         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
172                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
173         TestEntityOwnershipShard peerShard = peer.underlyingActor();
174         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
175
176         // Drop AppendEntries so consensus isn't reached.
177         peerShard.startDroppingMessagesOfType(AppendEntries.class);
178
179         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
180                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
181
182         ShardTestKit.waitUntilLeader(leader);
183
184         YangInstanceIdentifier entityId = ENTITY_ID1;
185         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
186
187         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
188         kit.expectMsgClass(SuccessReply.class);
189
190         // Wait enough time for the commit to timeout.
191         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
192
193         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
194         // write being applied to the state.
195         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
196
197         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
198         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
199
200         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
201     }
202
203     @Test
204     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
205         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
206
207         final ShardTestKit kit = new ShardTestKit(getSystem());
208
209         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
210                 .shardIsolatedLeaderCheckIntervalInMillis(50);
211
212         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
213         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
214
215         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
216                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
217         TestEntityOwnershipShard peerShard = peer.underlyingActor();
218         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
219
220         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
221                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
222
223         ShardTestKit.waitUntilLeader(leader);
224
225         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
226         peerShard.startDroppingMessagesOfType(AppendEntries.class);
227         verifyRaftState(leader, state ->
228                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
229
230         YangInstanceIdentifier entityId = ENTITY_ID1;
231         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
232
233         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
234         kit.expectMsgClass(SuccessReply.class);
235
236         // Resume AppendEntries - the candidate write should now be committed.
237         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
238         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
239         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
240
241         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
242     }
243
244     @Test
245     public void testOnRegisterCandidateLocalWithRemoteLeader() {
246         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
247
248         ShardTestKit kit = new ShardTestKit(getSystem());
249
250         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
251                 .shardBatchedModificationCount(5);
252
253         ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
254         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
255         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
256                 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
257                 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
258         final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
259
260         TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
261                 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
262         local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
263
264         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
265         kit.expectMsgClass(SuccessReply.class);
266
267         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
268         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
269
270         // Test with initial commit timeout and subsequent retry.
271
272         local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
273         leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
274
275         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
276         kit.expectMsgClass(SuccessReply.class);
277
278         expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
279
280         // Send a bunch of registration messages quickly and verify.
281
282         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
283         clearMessages(leaderShard.collectorActor());
284
285         int max = 100;
286         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
287         for (int i = 1; i <= max; i++) {
288             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
289             entityIds.add(id);
290             local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
291         }
292
293         for (int i = 0; i < max; i++) {
294             verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
295         }
296
297         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
298     }
299
300     @Test
301     public void testOnUnregisterCandidateLocal() {
302         testLog.info("testOnUnregisterCandidateLocal starting");
303
304         ShardTestKit kit = new ShardTestKit(getSystem());
305         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
306         ShardTestKit.waitUntilLeader(shard);
307
308         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
309
310         // Register
311
312         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
313         kit.expectMsgClass(SuccessReply.class);
314
315         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
316         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
317
318         // Unregister
319
320         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
321         kit.expectMsgClass(SuccessReply.class);
322
323         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
324
325         // Register again
326
327         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
328         kit.expectMsgClass(SuccessReply.class);
329
330         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
331         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
332
333         testLog.info("testOnUnregisterCandidateLocal ending");
334     }
335
336     @Test
337     public void testOwnershipChanges() {
338         testLog.info("testOwnershipChanges starting");
339
340         final ShardTestKit kit = new ShardTestKit(getSystem());
341
342         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
343
344         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
345         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
346         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
347
348         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
349                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
350                     peerId1.toString());
351         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
352
353         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
354                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
355                     peerId2.toString());
356         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
357
358         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
359                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
360                     leaderId.toString());
361
362         ShardTestKit.waitUntilLeader(leader);
363
364         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
365
366         // Add a remote candidate
367
368         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
369         kit.expectMsgClass(SuccessReply.class);
370
371         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
372
373         // Register local
374
375         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
376         kit.expectMsgClass(SuccessReply.class);
377
378         // Verify the remote candidate becomes owner
379
380         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
381         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
382
383         // Add another remote candidate and verify ownership doesn't change
384
385         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
386         kit.expectMsgClass(SuccessReply.class);
387
388         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
389         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
390         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
391
392         // Remove the second remote candidate and verify ownership doesn't change
393
394         peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
395         kit.expectMsgClass(SuccessReply.class);
396
397         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
398         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
399         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
400
401         // Remove the first remote candidate and verify the local candidate becomes owner
402
403         peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
404         kit.expectMsgClass(SuccessReply.class);
405
406         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
407         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
408
409         // Add the second remote candidate back and verify ownership doesn't change
410
411         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
412         kit.expectMsgClass(SuccessReply.class);
413
414         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
415         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
416         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
417
418         // Unregister the local candidate and verify the second remote candidate becomes owner
419
420         leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
421         kit.expectMsgClass(SuccessReply.class);
422
423         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
424         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
425
426         testLog.info("testOwnershipChanges ending");
427     }
428
429     @Test
430     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
431         testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
432
433         final ShardTestKit kit = new ShardTestKit(getSystem());
434
435         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
436                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
437
438         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
439         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
440         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
441
442         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
443                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
444                     peerId1.toString());
445         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
446
447         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
448                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
449                     peerId2.toString());
450         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
451
452         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
453                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
454                     leaderId.toString());
455
456         verifyRaftState(leader, state ->
457                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
458
459         // Send PeerDown and PeerUp with no entities
460
461         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
462         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
463
464         // Add candidates for entity1 with the local leader as the owner
465
466         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
467         kit.expectMsgClass(SuccessReply.class);
468         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
469
470         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
471         kit.expectMsgClass(SuccessReply.class);
472         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
473
474         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
475         kit.expectMsgClass(SuccessReply.class);
476         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
477         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
478
479         // Add candidates for entity2 with peerMember2 as the owner
480
481         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
482         kit.expectMsgClass(SuccessReply.class);
483         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
484
485         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
486         kit.expectMsgClass(SuccessReply.class);
487         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
488         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
489
490         // Add candidates for entity3 with peerMember2 as the owner.
491
492         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
493         kit.expectMsgClass(SuccessReply.class);
494         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
495
496         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
497         kit.expectMsgClass(SuccessReply.class);
498         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
499
500         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
501         kit.expectMsgClass(SuccessReply.class);
502         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
503         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
504
505         // Add only candidate peerMember2 for entity4.
506
507         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
508         kit.expectMsgClass(SuccessReply.class);
509         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
510         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
511
512         // Add only candidate peerMember1 for entity5.
513
514         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
515         kit.expectMsgClass(SuccessReply.class);
516         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
517         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
518
519         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
520         // owner selected
521
522         kit.watch(peer2);
523         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
524         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
525         kit.unwatch(peer2);
526
527         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
528         // Send PeerDown again - should be noop
529         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
530         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
531
532         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
533         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
534         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
535         // no other candidates for entity4 so peerMember2 should remain owner.
536         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
537
538         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
539         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
540         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
541         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
542
543         // Reinstate peerMember2
544
545         peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
546                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
547                     peerId2.toString());
548         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
549         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
550         // Send PeerUp again - should be noop
551         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
552         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
553
554         // peerMember2's candidates should be removed on startup.
555         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
556         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
557         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
558         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
559
560         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
561         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
562         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
563         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
564
565         // Add back candidate peerMember2 for entities 1, 2, & 3.
566
567         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
568         kit.expectMsgClass(SuccessReply.class);
569         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
570         kit.expectMsgClass(SuccessReply.class);
571         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
572         kit.expectMsgClass(SuccessReply.class);
573         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
574         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
575         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
576         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
577         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
578         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
579         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
580         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
581         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
582         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
583         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
584         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
585         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
586
587         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
588
589         kit.watch(peer1);
590         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
591         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
592         kit.unwatch(peer1);
593         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
594
595         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
596
597         // Verify the reinstated peerMember2 is fully synced.
598
599         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
600         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
601         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
602         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
603
604         // Reinstate peerMember1 and verify no owner changes
605
606         peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
607                 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
608         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
609         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
610
611         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
612         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
613         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
614         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
615
616         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
617         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
618         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
619
620         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
621         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
622         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
623
624         // Verify the reinstated peerMember1 is fully synced.
625
626         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
627         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
628         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
629         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
630
631         AtomicLong leaderLastApplied = new AtomicLong();
632         verifyRaftState(leader, rs -> {
633             assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
634             leaderLastApplied.set(rs.getLastApplied());
635         });
636
637         verifyRaftState(peer2, rs -> assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()));
638
639         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
640         // the entities (1 and 3) previously owned by the local leader member.
641
642         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
643         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
644         peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
645
646         kit.watch(leader);
647         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
648         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
649         kit.unwatch(leader);
650         peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
651         peer2.tell(TimeoutNow.INSTANCE, peer2);
652
653         verifyRaftState(peer2, state ->
654                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
655
656         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
657         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
658         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
659         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
660
661         testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
662     }
663
664     @Test
665     public void testLeaderIsolation() throws Exception {
666         testLog.info("testLeaderIsolation starting");
667
668         final ShardTestKit kit = new ShardTestKit(getSystem());
669
670         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
671         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
672         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
673
674         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
675                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
676
677         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
678                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
679                     peerId1.toString());
680         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
681
682         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
683                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
684                     peerId2.toString());
685         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
686
687         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
688                 .shardIsolatedLeaderCheckIntervalInMillis(500);
689
690         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
691                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
692                     leaderId.toString());
693
694         ShardTestKit.waitUntilLeader(leader);
695
696         // Add entity1 candidates for all members with the leader as the owner
697
698         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
699         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
700         kit.expectMsgClass(SuccessReply.class);
701         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
702
703         peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
704         kit.expectMsgClass(SuccessReply.class);
705         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
706
707         peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
708         kit.expectMsgClass(SuccessReply.class);
709         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
710
711         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
712         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
713         verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
714
715         // Add entity2 candidates for all members with peer1 as the owner
716
717         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
718         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
719         kit.expectMsgClass(SuccessReply.class);
720         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
721
722         peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
723         kit.expectMsgClass(SuccessReply.class);
724         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
725
726         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
727         kit.expectMsgClass(SuccessReply.class);
728         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
729
730         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
731         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
732         verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
733
734         // Add entity3 candidates for all members with peer2 as the owner
735
736         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
737         peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
738         kit.expectMsgClass(SuccessReply.class);
739         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
740
741         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
742         kit.expectMsgClass(SuccessReply.class);
743         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
744
745         peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
746         kit.expectMsgClass(SuccessReply.class);
747         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
748
749         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
750         verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
751         verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
752
753         // Add listeners on all members
754
755         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
756         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
757         kit.expectMsgClass(SuccessReply.class);
758         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
759                 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
760                 ownershipChange(entity3, false, false, true)));
761         reset(leaderListener);
762
763         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
764         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
765         kit.expectMsgClass(SuccessReply.class);
766         verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
767                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
768                 ownershipChange(entity3, false, false, true)));
769         reset(peer1Listener);
770
771         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
772         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
773         kit.expectMsgClass(SuccessReply.class);
774         verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
775                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
776                 ownershipChange(entity3, false, true, true)));
777         reset(peer2Listener);
778
779         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
780
781         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
782         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
783
784         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
785             ae -> ae.getLeaderId().equals(leaderId.toString()));
786         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
787
788         // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
789
790         peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
791
792         // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
793         // isolated peers.
794
795         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
796         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
797
798         verifyRaftState(leader, state ->
799                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
800
801         // Expect inJeopardy notification on the isolated leader.
802
803         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
804                 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
805                 ownershipChange(entity3, false, false, true, true)));
806         reset(leaderListener);
807
808         verifyRaftState(peer1, state ->
809                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
810
811         // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
812         // isolated leader.
813
814         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
815
816         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
817
818         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
819         reset(peer1Listener);
820
821         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
822         reset(peer2Listener);
823
824         // Remove the isolation.
825
826         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
827         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
828         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
829         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
830
831         // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
832
833         verifyRaftState(leader, state ->
834                 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
835
836         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
837                 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
838                 ownershipChange(entity3, false, false, true)));
839
840         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
841         verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
842
843         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
844         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
845         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
846
847         verifyNoMoreInteractions(leaderListener);
848         verifyNoMoreInteractions(peer1Listener);
849         verifyNoMoreInteractions(peer2Listener);
850
851         testLog.info("testLeaderIsolation ending");
852     }
853
854     @Test
855     public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
856         testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
857
858         final ShardTestKit kit = new ShardTestKit(getSystem());
859
860         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
861         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
862         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
863
864         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
865                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
866
867         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
868                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
869                 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
870         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
871
872         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
873                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
874                 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
875         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
876
877         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
878                 .shardIsolatedLeaderCheckIntervalInMillis(500);
879
880         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
881                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
882                 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
883
884         ShardTestKit.waitUntilLeader(leader);
885
886         // Add listeners on all members
887
888         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
889                 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
890         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
891         kit.expectMsgClass(SuccessReply.class);
892
893         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
894                 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
895         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
896         kit.expectMsgClass(SuccessReply.class);
897
898         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
899                 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
900         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
901         kit.expectMsgClass(SuccessReply.class);
902
903         // Drop the CandidateAdded message to the leader for now.
904
905         leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
906
907         // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
908         // assigned the owner.
909
910         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
911         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
912         kit.expectMsgClass(SuccessReply.class);
913         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
914         verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
915         verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
916
917         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
918         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
919         kit.expectMsgClass(SuccessReply.class);
920         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
921         verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
922         verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
923
924         // Capture the CandidateAdded messages.
925
926         final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
927                 CandidateAdded.class, 2);
928
929         // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
930         // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
931
932         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
933         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
934
935         // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
936
937         leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
938         leader.tell(candidateAdded.get(0), leader);
939         leader.tell(candidateAdded.get(1), leader);
940
941         expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
942             ae -> ae.getEntries().size() > 0);
943
944         // Verify no owner assigned.
945
946         verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
947         verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
948
949         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
950
951         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
952         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
953
954         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
955             ae -> ae.getLeaderId().equals(leaderId.toString()));
956         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
957
958         // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
959
960         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
961         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
962
963         // Verify the leader transitions to IsolatedLeader.
964
965         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
966                 state.getRaftState()));
967
968         // Send PeerDown to the new leader peer1.
969
970         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
971
972         // Make peer1 start an election and become leader by sending the TimeoutNow message.
973
974         peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
975
976         // Verify the peer1 transitions to Leader.
977
978         verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
979                 state.getRaftState()));
980
981         verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
982         verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
983
984         verifyNoMoreInteractions(peer1Listener);
985         verifyNoMoreInteractions(peer2Listener);
986
987         // Add candidate peer1 candidate for entity2.
988
989         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
990
991         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
992         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
993         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
994
995         reset(leaderListener, peer1Listener, peer2Listener);
996
997         // Remove the isolation.
998
999         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1000         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1001         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1002         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1003
1004         // Previous leader should switch to Follower.
1005
1006         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1007                 state.getRaftState()));
1008
1009         // Send PeerUp to peer1 and peer2.
1010
1011         peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1012         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1013
1014         // The previous leader should become the owner of entity1.
1015
1016         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1017
1018         // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1019         //     - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1020         //     - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1021         //     - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1022         //     - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1023         verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1024                 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1025                 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1026
1027         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1028         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1029
1030         // Verify entity2's owner doesn't change.
1031
1032         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1033         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1034
1035         verifyNoMoreInteractions(leaderListener);
1036         verifyNoMoreInteractions(peer1Listener);
1037         verifyNoMoreInteractions(peer2Listener);
1038
1039         testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1040     }
1041
1042     @Test
1043     public void testListenerRegistration() {
1044         testLog.info("testListenerRegistration starting");
1045
1046         ShardTestKit kit = new ShardTestKit(getSystem());
1047
1048         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1049         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1050
1051         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1052                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1053         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1054
1055         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1056                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1057
1058         ShardTestKit.waitUntilLeader(leader);
1059
1060         String otherEntityType = "otherEntityType";
1061         final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1062         final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1063         final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1064         final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1065         DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1066
1067         // Register listener
1068
1069         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1070         kit.expectMsgClass(SuccessReply.class);
1071
1072         // Register a couple candidates for the desired entity type and verify listener is notified.
1073
1074         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1075         kit.expectMsgClass(SuccessReply.class);
1076
1077         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1078
1079         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1080         kit.expectMsgClass(SuccessReply.class);
1081
1082         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1083         reset(listener);
1084
1085         // Register another candidate for another entity type and verify listener is not notified.
1086
1087         leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1088         kit.expectMsgClass(SuccessReply.class);
1089
1090         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1091         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1092
1093         // Register remote candidate for entity1
1094
1095         peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1096         kit.expectMsgClass(SuccessReply.class);
1097         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1098
1099         // Unregister the local candidate for entity1 and verify listener is notified
1100
1101         leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1102         kit.expectMsgClass(SuccessReply.class);
1103
1104         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1105         reset(listener);
1106
1107         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1108
1109         leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1110         kit.expectMsgClass(SuccessReply.class);
1111
1112         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1113         kit.expectMsgClass(SuccessReply.class);
1114
1115         verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1116         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1117         verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1118
1119         // Re-register the listener and verify it gets notified of currently owned entities
1120
1121         reset(listener);
1122
1123         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1124         kit.expectMsgClass(SuccessReply.class);
1125
1126         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1127                 ownershipChange(entity3, false, true, true)));
1128         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1129         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1130         verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1131
1132         testLog.info("testListenerRegistration ending");
1133     }
1134
1135     @Test
1136     public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
1137         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1138
1139         ShardTestKit kit = new ShardTestKit(getSystem());
1140         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1141                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1142
1143         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1144         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1145
1146         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1147                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1148         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1149
1150         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1151                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1152                 leaderId.toString());
1153
1154         ShardTestKit.waitUntilLeader(leader);
1155
1156         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1157
1158         // Add a remote candidate
1159
1160         peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1161         kit.expectMsgClass(SuccessReply.class);
1162
1163         // Register local
1164
1165         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1166         kit.expectMsgClass(SuccessReply.class);
1167
1168         // Verify the local candidate becomes owner
1169
1170         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1171         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1172         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1173
1174         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1175     }
1176
1177     @Test
1178     public void testDelayedEntityOwnerSelection() {
1179         testLog.info("testDelayedEntityOwnerSelection starting");
1180
1181         final ShardTestKit kit = new ShardTestKit(getSystem());
1182         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1183                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1184
1185         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1186
1187         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1188         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1189         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1190
1191         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1192                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1193                     peerId1.toString());
1194         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1195
1196         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1197                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1198                     peerId2.toString());
1199         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1200
1201         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1202                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1203                         builder.build()), leaderId.toString());
1204
1205         ShardTestKit.waitUntilLeader(leader);
1206
1207         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1208
1209         // Add a remote candidate
1210
1211         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1212         kit.expectMsgClass(SuccessReply.class);
1213
1214         // Register local
1215
1216         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1217         kit.expectMsgClass(SuccessReply.class);
1218
1219         // Verify the local candidate becomes owner
1220
1221         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1222         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1223         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1224
1225         testLog.info("testDelayedEntityOwnerSelection ending");
1226     }
1227
1228     private Props newLocalShardProps() {
1229         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1230     }
1231
1232     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1233             final String memberName) {
1234         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1235     }
1236
1237     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1238                                 final EntityOwnerSelectionStrategyConfig config) {
1239         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1240                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1241     }
1242
1243     private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1244             final String memberName) {
1245         return EntityOwnershipShard.newBuilder()
1246                 .id(shardId)
1247                 .peerAddresses(peers)
1248                 .datastoreContext(dataStoreContextBuilder.build())
1249                 .schemaContextProvider(() -> EOSTestUtils.SCHEMA_CONTEXT)
1250                 .localMemberName(MemberName.forName(memberName))
1251                 .ownerSelectionStrategyConfig(EntityOwnerSelectionStrategyConfig.newBuilder().build());
1252     }
1253
1254     private Map<String, String> peerMap(final String... peerIds) {
1255         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1256         for (String peerId: peerIds) {
1257             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1258         }
1259
1260         return builder.build();
1261     }
1262
1263     private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1264         private final ActorRef collectorActor;
1265         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1266
1267         TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1268             super(builder);
1269             this.collectorActor = collectorActor;
1270         }
1271
1272         @SuppressWarnings({ "unchecked", "rawtypes" })
1273         @Override
1274         public void handleCommand(final Object message) {
1275             Predicate drop = dropMessagesOfType.get(message.getClass());
1276             if (drop == null || !drop.test(message)) {
1277                 super.handleCommand(message);
1278             }
1279
1280             if (collectorActor != null) {
1281                 collectorActor.tell(message, ActorRef.noSender());
1282             }
1283         }
1284
1285         void startDroppingMessagesOfType(final Class<?> msgClass) {
1286             dropMessagesOfType.put(msgClass, msg -> true);
1287         }
1288
1289         <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1290             dropMessagesOfType.put(msgClass, filter);
1291         }
1292
1293         void stopDroppingMessagesOfType(final Class<?> msgClass) {
1294             dropMessagesOfType.remove(msgClass);
1295         }
1296
1297         ActorRef collectorActor() {
1298             return collectorActor;
1299         }
1300
1301         static Props props(final Builder builder) {
1302             return props(builder, null);
1303         }
1304
1305         static Props props(final Builder builder, final ActorRef collectorActor) {
1306             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1307                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1308         }
1309     }
1310 }