Bug 6540: EOS - handle edge case with pruning pending owner change commits
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.junit.Assert.assertEquals;
14 import static org.mockito.AdditionalMatchers.or;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.reset;
19 import static org.mockito.Mockito.timeout;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.JavaTestKit;
30 import akka.testkit.TestActorRef;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.function.Predicate;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
46 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
59 import org.opendaylight.controller.cluster.raft.RaftState;
60 import org.opendaylight.controller.cluster.raft.TestActorFactory;
61 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
65 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
66 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
69 import org.opendaylight.yangtools.yang.common.QName;
70 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
71 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
72
73 /**
74  * Unit tests for EntityOwnershipShard.
75  *
76  * @author Thomas Pantelis
77  */
78 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
79     private static final String ENTITY_TYPE = "test type";
80     private static final YangInstanceIdentifier ENTITY_ID1 =
81             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
82     private static final YangInstanceIdentifier ENTITY_ID2 =
83             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
84     private static final YangInstanceIdentifier ENTITY_ID3 =
85             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
86     private static final YangInstanceIdentifier ENTITY_ID4 =
87             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
88     private static final YangInstanceIdentifier ENTITY_ID5 =
89             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
90     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
91     private static final String LOCAL_MEMBER_NAME = "local-member-1";
92     private static final String PEER_MEMBER_1_NAME = "peer-member-1";
93     private static final String PEER_MEMBER_2_NAME = "peer-member-2";
94
95     private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
96     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
97
98     @After
99     public void tearDown() {
100         actorFactory.close();
101     }
102
103     @Test
104     public void testOnRegisterCandidateLocal() throws Exception {
105         testLog.info("testOnRegisterCandidateLocal starting");
106
107         ShardTestKit kit = new ShardTestKit(getSystem());
108
109         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
110
111         ShardTestKit.waitUntilLeader(shard);
112
113         YangInstanceIdentifier entityId = ENTITY_ID1;
114         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
115
116         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
117         kit.expectMsgClass(SuccessReply.class);
118
119         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
120         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121
122         testLog.info("testOnRegisterCandidateLocal ending");
123     }
124
125     @Test
126     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
127         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
128
129         ShardTestKit kit = new ShardTestKit(getSystem());
130
131         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
132
133         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
134         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
135
136         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
137                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
138         TestEntityOwnershipShard peerShard = peer.underlyingActor();
139         peerShard.startDroppingMessagesOfType(RequestVote.class);
140         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
141
142         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
143                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
144
145         YangInstanceIdentifier entityId = ENTITY_ID1;
146         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
147
148         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
149         kit.expectMsgClass(SuccessReply.class);
150
151         // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
152         peerShard.stopDroppingMessagesOfType(RequestVote.class);
153
154         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
155         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
156
157         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
158     }
159
160     @Test
161     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
162         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
163
164         ShardTestKit kit = new ShardTestKit(getSystem());
165
166         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
167                 shardTransactionCommitTimeoutInSeconds(1);
168
169         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
170         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
171
172         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
173                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
174         TestEntityOwnershipShard peerShard = peer.underlyingActor();
175         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
176
177         // Drop AppendEntries so consensus isn't reached.
178         peerShard.startDroppingMessagesOfType(AppendEntries.class);
179
180         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
181                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
182
183         ShardTestKit.waitUntilLeader(leader);
184
185         YangInstanceIdentifier entityId = ENTITY_ID1;
186         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
187
188         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
189         kit.expectMsgClass(SuccessReply.class);
190
191         // Wait enough time for the commit to timeout.
192         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
193
194         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
195         // write being applied to the state.
196         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
197
198         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
199         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200
201         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
202     }
203
204     @Test
205     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
206         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
207
208         ShardTestKit kit = new ShardTestKit(getSystem());
209
210         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
211                 shardIsolatedLeaderCheckIntervalInMillis(50);
212
213         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
214         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
215
216         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
217                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
218         TestEntityOwnershipShard peerShard = peer.underlyingActor();
219         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
220
221         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
222                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
223
224         ShardTestKit.waitUntilLeader(leader);
225
226         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
227         peerShard.startDroppingMessagesOfType(AppendEntries.class);
228         verifyRaftState(leader, state ->
229                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
230
231         YangInstanceIdentifier entityId = ENTITY_ID1;
232         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
233
234         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
235         kit.expectMsgClass(SuccessReply.class);
236
237         // Resume AppendEntries - the candidate write should now be committed.
238         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
239         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
240         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
241
242         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
243     }
244
245     @Test
246     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
247         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
248
249         ShardTestKit kit = new ShardTestKit(getSystem());
250
251         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
252                 shardBatchedModificationCount(5);
253
254         ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
255         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
256         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
257                 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
258                 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
259         TestEntityOwnershipShard leaderShard = leader.underlyingActor();
260
261         TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
262                 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
263         local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
264
265         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
266         kit.expectMsgClass(SuccessReply.class);
267
268         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
269         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
270
271         // Test with initial commit timeout and subsequent retry.
272
273         local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
274         leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
275
276         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
277         kit.expectMsgClass(SuccessReply.class);
278
279         expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
280
281         // Send a bunch of registration messages quickly and verify.
282
283         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
284         clearMessages(leaderShard.collectorActor());
285
286         int max = 100;
287         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
288         for(int i = 1; i <= max; i++) {
289             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
290             entityIds.add(id);
291             local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
292         }
293
294         for(int i = 0; i < max; i++) {
295             verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
296         }
297
298         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
299     }
300
301     @Test
302     public void testOnUnregisterCandidateLocal() throws Exception {
303         testLog.info("testOnUnregisterCandidateLocal starting");
304
305         ShardTestKit kit = new ShardTestKit(getSystem());
306         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
307         ShardTestKit.waitUntilLeader(shard);
308
309         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
310
311         // Register
312
313         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
314         kit.expectMsgClass(SuccessReply.class);
315
316         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
317         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
318
319         // Unregister
320
321         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
322         kit.expectMsgClass(SuccessReply.class);
323
324         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
325
326         // Register again
327
328         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
329         kit.expectMsgClass(SuccessReply.class);
330
331         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
332         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
333
334         testLog.info("testOnUnregisterCandidateLocal ending");
335     }
336
337     @Test
338     public void testOwnershipChanges() throws Exception {
339         testLog.info("testOwnershipChanges starting");
340
341         ShardTestKit kit = new ShardTestKit(getSystem());
342
343         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
344
345         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
346         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
347         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
348
349         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
350                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
351                     peerId1.toString());
352         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
353
354         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
355                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
356                     peerId2.toString());
357         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
358
359         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
360                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
361                     leaderId.toString());
362
363         ShardTestKit.waitUntilLeader(leader);
364
365         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
366
367         // Add a remote candidate
368
369         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
370         kit.expectMsgClass(SuccessReply.class);
371
372         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
373
374         // Register local
375
376         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
377         kit.expectMsgClass(SuccessReply.class);
378
379         // Verify the remote candidate becomes owner
380
381         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
382         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
383
384         // Add another remote candidate and verify ownership doesn't change
385
386         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
387         kit.expectMsgClass(SuccessReply.class);
388
389         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
390         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
391         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
392
393         // Remove the second remote candidate and verify ownership doesn't change
394
395         peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
396         kit.expectMsgClass(SuccessReply.class);
397
398         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
399         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
400         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
401
402         // Remove the first remote candidate and verify the local candidate becomes owner
403
404         peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
405         kit.expectMsgClass(SuccessReply.class);
406
407         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
408         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
409
410         // Add the second remote candidate back and verify ownership doesn't change
411
412         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
413         kit.expectMsgClass(SuccessReply.class);
414
415         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
416         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
417         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
418
419         // Unregister the local candidate and verify the second remote candidate becomes owner
420
421         leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
422         kit.expectMsgClass(SuccessReply.class);
423
424         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
425         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
426
427         testLog.info("testOwnershipChanges ending");
428     }
429
430     @Test
431     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
432         testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
433
434         ShardTestKit kit = new ShardTestKit(getSystem());
435
436         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
437                 shardIsolatedLeaderCheckIntervalInMillis(100000);
438
439         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
440         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
441         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
442
443         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
444                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
445                     peerId1.toString());
446         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
447
448         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
449                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
450                     peerId2.toString());
451         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
452
453         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
454                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
455                     leaderId.toString());
456
457         verifyRaftState(leader, state ->
458                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
459
460         // Send PeerDown and PeerUp with no entities
461
462         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
463         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
464
465         // Add candidates for entity1 with the local leader as the owner
466
467         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
468         kit.expectMsgClass(SuccessReply.class);
469         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
470
471         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
472         kit.expectMsgClass(SuccessReply.class);
473         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
474
475         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
476         kit.expectMsgClass(SuccessReply.class);
477         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
478         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
479
480         // Add candidates for entity2 with peerMember2 as the owner
481
482         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
483         kit.expectMsgClass(SuccessReply.class);
484         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
485
486         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
487         kit.expectMsgClass(SuccessReply.class);
488         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
489         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
490
491         // Add candidates for entity3 with peerMember2 as the owner.
492
493         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
494         kit.expectMsgClass(SuccessReply.class);
495         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
496
497         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
498         kit.expectMsgClass(SuccessReply.class);
499         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
500
501         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
502         kit.expectMsgClass(SuccessReply.class);
503         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
504         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
505
506         // Add only candidate peerMember2 for entity4.
507
508         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
509         kit.expectMsgClass(SuccessReply.class);
510         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
511         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
512
513         // Add only candidate peerMember1 for entity5.
514
515         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
516         kit.expectMsgClass(SuccessReply.class);
517         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
518         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
519
520         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
521         // owner selected
522
523         kit.watch(peer2);
524         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
525         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
526         kit.unwatch(peer2);
527
528         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
529         // Send PeerDown again - should be noop
530         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
531         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
532
533         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
534         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
535         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
536         // no other candidates for entity4 so peerMember2 should remain owner.
537         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
538
539         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
540         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
541         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
542         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
543
544         // Reinstate peerMember2
545
546         peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
547                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
548                     peerId2.toString());
549         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
550         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
551         // Send PeerUp again - should be noop
552         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
553         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
554
555         // peerMember2's candidates should be removed on startup.
556         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
557         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
558         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
559         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
560
561         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
562         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
563         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
564         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
565
566         // Add back candidate peerMember2 for entities 1, 2, & 3.
567
568         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
569         kit.expectMsgClass(SuccessReply.class);
570         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
571         kit.expectMsgClass(SuccessReply.class);
572         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
573         kit.expectMsgClass(SuccessReply.class);
574         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
575         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
576         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
577         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
578         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
579         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
580         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
581         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
582         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
583         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
584         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
585         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
586         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
587
588         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
589
590         kit.watch(peer1);
591         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
592         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
593         kit.unwatch(peer1);
594         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
595
596         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
597
598         // Verify the reinstated peerMember2 is fully synced.
599
600         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
601         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
602         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
603         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
604
605         // Reinstate peerMember1 and verify no owner changes
606
607         peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
608                 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
609         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
610         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
611
612         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
613         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
614         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
615         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
616
617         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
618         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
619         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
620
621         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
622         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
623         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
624
625         // Verify the reinstated peerMember1 is fully synced.
626
627         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
628         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
629         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
630         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
631
632         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
633         // the entities (1 and 3) previously owned by the local leader member.
634
635         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
636         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
637         peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
638
639         kit.watch(leader);
640         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
641         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
642         kit.unwatch(leader);
643         peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
644         peer2.tell(TimeoutNow.INSTANCE, peer2);
645
646         verifyRaftState(peer2, state ->
647                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
648
649         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
650         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
651         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
652         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
653
654         testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
655     }
656
657     @Test
658     public void testLeaderIsolation() throws Exception {
659         testLog.info("testLeaderIsolation starting");
660
661         ShardTestKit kit = new ShardTestKit(getSystem());
662
663         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
664         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
665         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
666
667         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
668             shardIsolatedLeaderCheckIntervalInMillis(100000);
669
670         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
671                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
672                     peerId1.toString());
673         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
674
675         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
676                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
677                     peerId2.toString());
678         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
679
680         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
681                 shardIsolatedLeaderCheckIntervalInMillis(500);
682
683         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
684                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
685                     leaderId.toString());
686
687         ShardTestKit.waitUntilLeader(leader);
688
689         // Add entity1 candidates for all members with the leader as the owner
690
691         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
692         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
693         kit.expectMsgClass(SuccessReply.class);
694         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
695
696         peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
697         kit.expectMsgClass(SuccessReply.class);
698         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
699
700         peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
701         kit.expectMsgClass(SuccessReply.class);
702         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
703
704         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
705         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
706         verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
707
708         // Add entity2 candidates for all members with peer1 as the owner
709
710         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
711         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
712         kit.expectMsgClass(SuccessReply.class);
713         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
714
715         peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
716         kit.expectMsgClass(SuccessReply.class);
717         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
718
719         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
720         kit.expectMsgClass(SuccessReply.class);
721         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
722
723         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
724         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
725         verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
726
727         // Add entity3 candidates for all members with peer2 as the owner
728
729         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
730         peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
731         kit.expectMsgClass(SuccessReply.class);
732         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
733
734         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
735         kit.expectMsgClass(SuccessReply.class);
736         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
737
738         peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
739         kit.expectMsgClass(SuccessReply.class);
740         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
741
742         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
743         verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
744         verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
745
746         // Add listeners on all members
747
748         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
749         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
750         kit.expectMsgClass(SuccessReply.class);
751         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true),
752                 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
753         reset(leaderListener);
754
755         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
756         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
757         kit.expectMsgClass(SuccessReply.class);
758         verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
759                 ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true)));
760         reset(peer1Listener);
761
762         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
763         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
764         kit.expectMsgClass(SuccessReply.class);
765         verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
766                 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true)));
767         reset(peer2Listener);
768
769         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
770
771         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
772         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
773
774         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
775                 ae -> ae.getLeaderId().equals(leaderId.toString()));
776         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
777
778         // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
779
780         peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
781
782         // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
783         // isolated peers.
784
785         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
786         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
787
788         verifyRaftState(leader, state ->
789                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
790
791         // Expect inJeopardy notification on the isolated leader.
792
793         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true),
794                 ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true)));
795         reset(leaderListener);
796
797         verifyRaftState(peer1, state ->
798                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
799
800         // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
801         // isolated leader.
802
803         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
804
805         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
806
807         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
808         reset(peer1Listener);
809
810         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
811         reset(peer2Listener);
812
813         // Remove the isolation.
814
815         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
816         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
817         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
818         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
819
820         // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
821
822         verifyRaftState(leader, state ->
823                 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
824
825         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true),
826                 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
827
828         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
829         verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
830
831         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
832         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
833         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
834
835         verifyNoMoreInteractions(leaderListener);
836         verifyNoMoreInteractions(peer1Listener);
837         verifyNoMoreInteractions(peer2Listener);
838
839         testLog.info("testLeaderIsolation ending");
840     }
841
842     @Test
843     public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
844         testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
845
846         ShardTestKit kit = new ShardTestKit(getSystem());
847
848         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
849         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
850         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
851
852         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
853             shardIsolatedLeaderCheckIntervalInMillis(100000);
854
855         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
856                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
857                 actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString());
858         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
859
860         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
861                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
862                 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
863         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
864
865         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
866                 shardIsolatedLeaderCheckIntervalInMillis(500);
867
868         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
869                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
870                 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
871
872         ShardTestKit.waitUntilLeader(leader);
873
874         // Add listeners on all members
875
876         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
877                 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
878         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
879         kit.expectMsgClass(SuccessReply.class);
880
881         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
882                 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
883         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
884         kit.expectMsgClass(SuccessReply.class);
885
886         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
887                 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
888         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
889         kit.expectMsgClass(SuccessReply.class);
890
891         // Drop the CandidateAdded message to the leader for now.
892
893         leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
894
895         // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
896         // assigned the owner.
897
898         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
899         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
900         kit.expectMsgClass(SuccessReply.class);
901         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
902         verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
903         verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
904
905         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
906         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
907         kit.expectMsgClass(SuccessReply.class);
908         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
909         verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
910         verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
911
912         // Capture the CandidateAdded messages.
913
914         List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2);
915
916         // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
917         // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
918
919         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
920         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
921
922         // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
923
924         leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
925         leader.tell(candidateAdded.get(0), leader);
926         leader.tell(candidateAdded.get(1), leader);
927
928         expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0);
929
930         // Verify no owner assigned.
931
932         verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
933         verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
934
935         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
936
937         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
938         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
939
940         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
941                 ae -> ae.getLeaderId().equals(leaderId.toString()));
942         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
943
944         // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
945
946         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
947         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
948
949         // Verify the leader transitions to IsolatedLeader.
950
951         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
952
953         // Send PeerDown to the new leader peer1.
954
955         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
956
957         // Make peer1 start an election and become leader by sending the TimeoutNow message.
958
959         peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
960
961         // Verify the peer1 transitions to Leader.
962
963         verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
964
965         verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
966         verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
967
968         verifyNoMoreInteractions(peer1Listener);
969         verifyNoMoreInteractions(peer2Listener);
970
971         // Add candidate peer1 candidate for entity2.
972
973         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
974
975         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
976         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
977         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
978
979         reset(leaderListener, peer1Listener, peer2Listener);
980
981         // Remove the isolation.
982
983         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
984         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
985         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
986         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
987
988         // Previous leader should switch to Follower.
989
990         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
991
992         // Send PeerUp to peer1 and peer2.
993
994         peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
995         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
996
997         // The previous leader should become the owner of entity1.
998
999         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1000
1001         // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1002         //     - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1003         //     - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1004         //     - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1005         //     - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1006         verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false),
1007                 ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true),
1008                         ownershipChange(entity2, false, false, true))));
1009
1010         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1011         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1012
1013         // Verify entity2's owner doesn't change.
1014
1015         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1016         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1017
1018         verifyNoMoreInteractions(leaderListener);
1019         verifyNoMoreInteractions(peer1Listener);
1020         verifyNoMoreInteractions(peer2Listener);
1021
1022         testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1023     }
1024
1025     @Test
1026     public void testListenerRegistration() throws Exception {
1027         testLog.info("testListenerRegistration starting");
1028
1029         ShardTestKit kit = new ShardTestKit(getSystem());
1030
1031         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1032         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1033
1034         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1035                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1036         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1037
1038         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1039                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1040
1041         ShardTestKit.waitUntilLeader(leader);
1042
1043         String otherEntityType = "otherEntityType";
1044         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1045         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1046         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1047         DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1048         DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1049
1050         // Register listener
1051
1052         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1053         kit.expectMsgClass(SuccessReply.class);
1054
1055         // Register a couple candidates for the desired entity type and verify listener is notified.
1056
1057         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1058         kit.expectMsgClass(SuccessReply.class);
1059
1060         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1061
1062         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1063         kit.expectMsgClass(SuccessReply.class);
1064
1065         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1066         reset(listener);
1067
1068         // Register another candidate for another entity type and verify listener is not notified.
1069
1070         leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1071         kit.expectMsgClass(SuccessReply.class);
1072
1073         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1074         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1075
1076         // Register remote candidate for entity1
1077
1078         peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1079         kit.expectMsgClass(SuccessReply.class);
1080         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1081
1082         // Unregister the local candidate for entity1 and verify listener is notified
1083
1084         leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1085         kit.expectMsgClass(SuccessReply.class);
1086
1087         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1088         reset(listener);
1089
1090         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1091
1092         leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1093         kit.expectMsgClass(SuccessReply.class);
1094
1095         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1096         kit.expectMsgClass(SuccessReply.class);
1097
1098         verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1099         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1100         verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1101
1102         // Re-register the listener and verify it gets notified of currently owned entities
1103
1104         reset(listener);
1105
1106         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1107         kit.expectMsgClass(SuccessReply.class);
1108
1109         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1110                 ownershipChange(entity3, false, true, true)));
1111         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1112         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1113         verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1114
1115         testLog.info("testListenerRegistration ending");
1116     }
1117
1118     @Test
1119     public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
1120         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1121
1122         ShardTestKit kit = new ShardTestKit(getSystem());
1123         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
1124                 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1125
1126         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1127         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1128
1129         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1130                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1131         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1132
1133         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1134                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString());
1135
1136         ShardTestKit.waitUntilLeader(leader);
1137
1138         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1139
1140         // Add a remote candidate
1141
1142         peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1143         kit.expectMsgClass(SuccessReply.class);
1144
1145         // Register local
1146
1147         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1148         kit.expectMsgClass(SuccessReply.class);
1149
1150         // Verify the local candidate becomes owner
1151
1152         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1153         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1154         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1155
1156         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1157     }
1158
1159     @Test
1160     public void testDelayedEntityOwnerSelection() throws Exception {
1161         testLog.info("testDelayedEntityOwnerSelection starting");
1162
1163         ShardTestKit kit = new ShardTestKit(getSystem());
1164         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
1165                 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1166
1167         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1168
1169         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1170         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1171         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1172
1173         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1174                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1175                     peerId1.toString());
1176         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1177
1178         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1179                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1180                     peerId2.toString());
1181         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1182
1183         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1184                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()),
1185                     leaderId.toString());
1186
1187         ShardTestKit.waitUntilLeader(leader);
1188
1189         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1190
1191         // Add a remote candidate
1192
1193         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1194         kit.expectMsgClass(SuccessReply.class);
1195
1196         // Register local
1197
1198         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1199         kit.expectMsgClass(SuccessReply.class);
1200
1201         // Verify the local candidate becomes owner
1202
1203         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1204         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1205         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1206
1207         testLog.info("testDelayedEntityOwnerSelection ending");
1208     }
1209
1210     private Props newLocalShardProps() {
1211         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1212     }
1213
1214     private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
1215         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1216     }
1217
1218     private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
1219                                 EntityOwnerSelectionStrategyConfig config) {
1220         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1221                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1222     }
1223
1224     private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
1225             String memberName) {
1226         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1227                 dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
1228                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1229                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1230     }
1231
1232     private Map<String, String> peerMap(String... peerIds) {
1233         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1234         for(String peerId: peerIds) {
1235             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1236         }
1237
1238         return builder.build();
1239     }
1240
1241     private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1242         private final TestActorRef<MessageCollectorActor> collectorActor;
1243         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1244
1245         TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1246             super(builder);
1247             this.collectorActor = collectorActor;
1248         }
1249
1250         @SuppressWarnings({ "unchecked", "rawtypes" })
1251         @Override
1252         public void handleCommand(Object message) {
1253             Predicate drop = dropMessagesOfType.get(message.getClass());
1254             if(drop == null || !drop.test(message)) {
1255                 super.handleCommand(message);
1256             }
1257
1258             if(collectorActor != null) {
1259                 collectorActor.tell(message, ActorRef.noSender());
1260             }
1261         }
1262
1263         void startDroppingMessagesOfType(Class<?> msgClass) {
1264             dropMessagesOfType.put(msgClass, msg -> true);
1265         }
1266
1267         <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
1268             dropMessagesOfType.put(msgClass, filter);
1269         }
1270
1271         void stopDroppingMessagesOfType(Class<?> msgClass) {
1272             dropMessagesOfType.remove(msgClass);
1273         }
1274
1275         TestActorRef<MessageCollectorActor> collectorActor() {
1276             return collectorActor;
1277         }
1278
1279         static Props props(Builder builder) {
1280             return props(builder, null);
1281         }
1282
1283         static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1284             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor).
1285                     withDispatcher(Dispatchers.DefaultDispatcherId());
1286         }
1287     }
1288 }