Refactor AbstractClientHandle a bit
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
23
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.function.Predicate;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.access.concepts.MemberName;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
53 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
59 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
60 import org.opendaylight.controller.cluster.raft.RaftState;
61 import org.opendaylight.controller.cluster.raft.TestActorFactory;
62 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
63 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
70 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
71 import org.opendaylight.yangtools.yang.common.QName;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
74
75 /**
76  * Unit tests for EntityOwnershipShard.
77  *
78  * @author Thomas Pantelis
79  */
80 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
81     private static final String ENTITY_TYPE = "test type";
82     private static final YangInstanceIdentifier ENTITY_ID1 =
83             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
84     private static final YangInstanceIdentifier ENTITY_ID2 =
85             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
86     private static final YangInstanceIdentifier ENTITY_ID3 =
87             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
88     private static final YangInstanceIdentifier ENTITY_ID4 =
89             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
90     private static final YangInstanceIdentifier ENTITY_ID5 =
91             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
92     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
93     private static final String LOCAL_MEMBER_NAME = "local-member-1";
94     private static final String PEER_MEMBER_1_NAME = "peer-member-1";
95     private static final String PEER_MEMBER_2_NAME = "peer-member-2";
96
97     private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
98     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
99
100     @After
101     public void tearDown() {
102         actorFactory.close();
103     }
104
105     @Test
106     public void testOnRegisterCandidateLocal() {
107         testLog.info("testOnRegisterCandidateLocal starting");
108
109         ShardTestKit kit = new ShardTestKit(getSystem());
110
111         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
112
113         ShardTestKit.waitUntilLeader(shard);
114
115         YangInstanceIdentifier entityId = ENTITY_ID1;
116         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
117
118         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
119         kit.expectMsgClass(SuccessReply.class);
120
121         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
123
124         testLog.info("testOnRegisterCandidateLocal ending");
125     }
126
127     @Test
128     public void testOnRegisterCandidateLocalWithNoInitialLeader() {
129         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
130
131         final ShardTestKit kit = new ShardTestKit(getSystem());
132
133         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
134
135         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
136         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
137
138         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
139                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
140         TestEntityOwnershipShard peerShard = peer.underlyingActor();
141         peerShard.startDroppingMessagesOfType(RequestVote.class);
142         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
143
144         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
145                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
146
147         YangInstanceIdentifier entityId = ENTITY_ID1;
148         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
149
150         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
151         kit.expectMsgClass(SuccessReply.class);
152
153         // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
154         peerShard.stopDroppingMessagesOfType(RequestVote.class);
155
156         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
157         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
158
159         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
160     }
161
162     @Test
163     public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
164         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
165
166         final ShardTestKit kit = new ShardTestKit(getSystem());
167
168         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
169                 .shardTransactionCommitTimeoutInSeconds(1);
170
171         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
172         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
173
174         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
175                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
176         TestEntityOwnershipShard peerShard = peer.underlyingActor();
177         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
178
179         // Drop AppendEntries so consensus isn't reached.
180         peerShard.startDroppingMessagesOfType(AppendEntries.class);
181
182         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
183                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
184
185         ShardTestKit.waitUntilLeader(leader);
186
187         YangInstanceIdentifier entityId = ENTITY_ID1;
188         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
189
190         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
191         kit.expectMsgClass(SuccessReply.class);
192
193         // Wait enough time for the commit to timeout.
194         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
195
196         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
197         // write being applied to the state.
198         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
199
200         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
201         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
202
203         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
204     }
205
206     @Test
207     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
208         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
209
210         final ShardTestKit kit = new ShardTestKit(getSystem());
211
212         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
213                 .shardIsolatedLeaderCheckIntervalInMillis(50);
214
215         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
216         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
217
218         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
219                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
220         TestEntityOwnershipShard peerShard = peer.underlyingActor();
221         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
222
223         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
224                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
225
226         ShardTestKit.waitUntilLeader(leader);
227
228         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
229         peerShard.startDroppingMessagesOfType(AppendEntries.class);
230         verifyRaftState(leader, state ->
231                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
232
233         YangInstanceIdentifier entityId = ENTITY_ID1;
234         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
235
236         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
237         kit.expectMsgClass(SuccessReply.class);
238
239         // Resume AppendEntries - the candidate write should now be committed.
240         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
241         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
242         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
243
244         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
245     }
246
247     @Test
248     public void testOnRegisterCandidateLocalWithRemoteLeader() {
249         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
250
251         ShardTestKit kit = new ShardTestKit(getSystem());
252
253         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
254                 .shardBatchedModificationCount(5);
255
256         ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
257         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
258         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
259                 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
260                 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
261         final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
262
263         TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
264                 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
265         local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
266
267         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
268         kit.expectMsgClass(SuccessReply.class);
269
270         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
271         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
272
273         // Test with initial commit timeout and subsequent retry.
274
275         local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
276         leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
277
278         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
279         kit.expectMsgClass(SuccessReply.class);
280
281         expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
282
283         // Send a bunch of registration messages quickly and verify.
284
285         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
286         clearMessages(leaderShard.collectorActor());
287
288         int max = 100;
289         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
290         for (int i = 1; i <= max; i++) {
291             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
292             entityIds.add(id);
293             local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
294         }
295
296         for (int i = 0; i < max; i++) {
297             verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
298         }
299
300         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
301     }
302
303     @Test
304     public void testOnUnregisterCandidateLocal() {
305         testLog.info("testOnUnregisterCandidateLocal starting");
306
307         ShardTestKit kit = new ShardTestKit(getSystem());
308         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
309         ShardTestKit.waitUntilLeader(shard);
310
311         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
312
313         // Register
314
315         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
316         kit.expectMsgClass(SuccessReply.class);
317
318         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
319         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
320
321         // Unregister
322
323         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
324         kit.expectMsgClass(SuccessReply.class);
325
326         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
327
328         // Register again
329
330         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
331         kit.expectMsgClass(SuccessReply.class);
332
333         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
334         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
335
336         testLog.info("testOnUnregisterCandidateLocal ending");
337     }
338
339     @Test
340     public void testOwnershipChanges() {
341         testLog.info("testOwnershipChanges starting");
342
343         final ShardTestKit kit = new ShardTestKit(getSystem());
344
345         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
346
347         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
348         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
349         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
350
351         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
352                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
353                     peerId1.toString());
354         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
355
356         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
357                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
358                     peerId2.toString());
359         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
360
361         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
362                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
363                     leaderId.toString());
364
365         ShardTestKit.waitUntilLeader(leader);
366
367         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
368
369         // Add a remote candidate
370
371         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
372         kit.expectMsgClass(SuccessReply.class);
373
374         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
375
376         // Register local
377
378         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
379         kit.expectMsgClass(SuccessReply.class);
380
381         // Verify the remote candidate becomes owner
382
383         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
384         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
385
386         // Add another remote candidate and verify ownership doesn't change
387
388         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
389         kit.expectMsgClass(SuccessReply.class);
390
391         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
392         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
393         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
394
395         // Remove the second remote candidate and verify ownership doesn't change
396
397         peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
398         kit.expectMsgClass(SuccessReply.class);
399
400         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
401         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
402         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
403
404         // Remove the first remote candidate and verify the local candidate becomes owner
405
406         peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
407         kit.expectMsgClass(SuccessReply.class);
408
409         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
410         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
411
412         // Add the second remote candidate back and verify ownership doesn't change
413
414         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
415         kit.expectMsgClass(SuccessReply.class);
416
417         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
418         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
419         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
420
421         // Unregister the local candidate and verify the second remote candidate becomes owner
422
423         leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
424         kit.expectMsgClass(SuccessReply.class);
425
426         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
427         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
428
429         testLog.info("testOwnershipChanges ending");
430     }
431
432     @Test
433     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
434         testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
435
436         final ShardTestKit kit = new ShardTestKit(getSystem());
437
438         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
439                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
440
441         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
442         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
443         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
444
445         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
446                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
447                     peerId1.toString());
448         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
449
450         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
451                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
452                     peerId2.toString());
453         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
454
455         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
456                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
457                     leaderId.toString());
458
459         verifyRaftState(leader, state ->
460                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
461
462         // Send PeerDown and PeerUp with no entities
463
464         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
465         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
466
467         // Add candidates for entity1 with the local leader as the owner
468
469         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
470         kit.expectMsgClass(SuccessReply.class);
471         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
472
473         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
474         kit.expectMsgClass(SuccessReply.class);
475         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
476
477         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
478         kit.expectMsgClass(SuccessReply.class);
479         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
480         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
481
482         // Add candidates for entity2 with peerMember2 as the owner
483
484         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
485         kit.expectMsgClass(SuccessReply.class);
486         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
487
488         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
489         kit.expectMsgClass(SuccessReply.class);
490         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
491         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
492
493         // Add candidates for entity3 with peerMember2 as the owner.
494
495         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
496         kit.expectMsgClass(SuccessReply.class);
497         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
498
499         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
500         kit.expectMsgClass(SuccessReply.class);
501         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
502
503         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
504         kit.expectMsgClass(SuccessReply.class);
505         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
506         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
507
508         // Add only candidate peerMember2 for entity4.
509
510         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
511         kit.expectMsgClass(SuccessReply.class);
512         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
513         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
514
515         // Add only candidate peerMember1 for entity5.
516
517         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
518         kit.expectMsgClass(SuccessReply.class);
519         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
520         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
521
522         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
523         // owner selected
524
525         kit.watch(peer2);
526         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
527         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
528         kit.unwatch(peer2);
529
530         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
531         // Send PeerDown again - should be noop
532         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
533         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
534
535         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
536         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
537         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
538         // no other candidates for entity4 so peerMember2 should remain owner.
539         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
540
541         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
542         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
543         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
544         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
545
546         // Reinstate peerMember2
547
548         peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
549                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
550                     peerId2.toString());
551         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
552         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
553         // Send PeerUp again - should be noop
554         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
555         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
556
557         // peerMember2's candidates should be removed on startup.
558         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
559         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
560         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
561         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
562
563         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
564         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
565         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
566         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
567
568         // Add back candidate peerMember2 for entities 1, 2, & 3.
569
570         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
571         kit.expectMsgClass(SuccessReply.class);
572         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
573         kit.expectMsgClass(SuccessReply.class);
574         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
575         kit.expectMsgClass(SuccessReply.class);
576         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
577         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
578         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
579         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
580         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
581         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
582         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
583         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
584         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
585         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
586         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
587         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
588         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
589
590         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
591
592         kit.watch(peer1);
593         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
594         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
595         kit.unwatch(peer1);
596         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
597
598         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
599
600         // Verify the reinstated peerMember2 is fully synced.
601
602         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
603         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
604         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
605         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
606
607         // Reinstate peerMember1 and verify no owner changes
608
609         peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
610                 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
611         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
612         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
613
614         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
615         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
616         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
617         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
618
619         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
620         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
621         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
622
623         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
624         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
625         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
626
627         // Verify the reinstated peerMember1 is fully synced.
628
629         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
630         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
631         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
632         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
633
634         AtomicLong leaderLastApplied = new AtomicLong();
635         verifyRaftState(leader, rs -> {
636             assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
637             leaderLastApplied.set(rs.getLastApplied());
638         });
639
640         verifyRaftState(peer2, rs -> assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()));
641
642         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
643         // the entities (1 and 3) previously owned by the local leader member.
644
645         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
646         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
647         peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
648
649         kit.watch(leader);
650         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
651         kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
652         kit.unwatch(leader);
653         peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
654         peer2.tell(TimeoutNow.INSTANCE, peer2);
655
656         verifyRaftState(peer2, state ->
657                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
658
659         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
660         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
661         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
662         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
663
664         testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
665     }
666
667     @Test
668     public void testLeaderIsolation() throws Exception {
669         testLog.info("testLeaderIsolation starting");
670
671         final ShardTestKit kit = new ShardTestKit(getSystem());
672
673         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
674         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
675         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
676
677         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
678                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
679
680         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
681                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
682                     peerId1.toString());
683         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
684
685         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
686                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
687                     peerId2.toString());
688         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
689
690         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
691                 .shardIsolatedLeaderCheckIntervalInMillis(500);
692
693         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
694                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
695                     leaderId.toString());
696
697         ShardTestKit.waitUntilLeader(leader);
698
699         // Add entity1 candidates for all members with the leader as the owner
700
701         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
702         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
703         kit.expectMsgClass(SuccessReply.class);
704         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
705
706         peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
707         kit.expectMsgClass(SuccessReply.class);
708         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
709
710         peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
711         kit.expectMsgClass(SuccessReply.class);
712         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
713
714         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
715         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
716         verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
717
718         // Add entity2 candidates for all members with peer1 as the owner
719
720         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
721         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
722         kit.expectMsgClass(SuccessReply.class);
723         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
724
725         peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
726         kit.expectMsgClass(SuccessReply.class);
727         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
728
729         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
730         kit.expectMsgClass(SuccessReply.class);
731         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
732
733         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
734         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
735         verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
736
737         // Add entity3 candidates for all members with peer2 as the owner
738
739         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
740         peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
741         kit.expectMsgClass(SuccessReply.class);
742         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
743
744         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
745         kit.expectMsgClass(SuccessReply.class);
746         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
747
748         peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
749         kit.expectMsgClass(SuccessReply.class);
750         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
751
752         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
753         verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
754         verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
755
756         // Add listeners on all members
757
758         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
759         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
760         kit.expectMsgClass(SuccessReply.class);
761         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
762                 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
763                 ownershipChange(entity3, false, false, true)));
764         reset(leaderListener);
765
766         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
767         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
768         kit.expectMsgClass(SuccessReply.class);
769         verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
770                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
771                 ownershipChange(entity3, false, false, true)));
772         reset(peer1Listener);
773
774         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
775         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
776         kit.expectMsgClass(SuccessReply.class);
777         verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
778                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
779                 ownershipChange(entity3, false, true, true)));
780         reset(peer2Listener);
781
782         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
783
784         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
785         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
786
787         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
788             ae -> ae.getLeaderId().equals(leaderId.toString()));
789         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
790
791         // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
792
793         peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
794
795         // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
796         // isolated peers.
797
798         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
799         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
800
801         verifyRaftState(leader, state ->
802                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
803
804         // Expect inJeopardy notification on the isolated leader.
805
806         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
807                 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
808                 ownershipChange(entity3, false, false, true, true)));
809         reset(leaderListener);
810
811         verifyRaftState(peer1, state ->
812                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
813
814         // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
815         // isolated leader.
816
817         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
818
819         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
820
821         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
822         reset(peer1Listener);
823
824         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
825         reset(peer2Listener);
826
827         // Remove the isolation.
828
829         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
830         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
831         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
832         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
833
834         // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
835
836         verifyRaftState(leader, state ->
837                 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
838
839         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
840                 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
841                 ownershipChange(entity3, false, false, true)));
842
843         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
844         verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
845
846         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
847         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
848         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
849
850         verifyNoMoreInteractions(leaderListener);
851         verifyNoMoreInteractions(peer1Listener);
852         verifyNoMoreInteractions(peer2Listener);
853
854         testLog.info("testLeaderIsolation ending");
855     }
856
857     @Test
858     public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
859         testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
860
861         final ShardTestKit kit = new ShardTestKit(getSystem());
862
863         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
864         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
865         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
866
867         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
868                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
869
870         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
871                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
872                 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
873         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
874
875         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
876                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
877                 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
878         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
879
880         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
881                 .shardIsolatedLeaderCheckIntervalInMillis(500);
882
883         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
884                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
885                 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
886
887         ShardTestKit.waitUntilLeader(leader);
888
889         // Add listeners on all members
890
891         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
892                 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
893         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
894         kit.expectMsgClass(SuccessReply.class);
895
896         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
897                 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
898         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
899         kit.expectMsgClass(SuccessReply.class);
900
901         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
902                 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
903         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
904         kit.expectMsgClass(SuccessReply.class);
905
906         // Drop the CandidateAdded message to the leader for now.
907
908         leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
909
910         // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
911         // assigned the owner.
912
913         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
914         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
915         kit.expectMsgClass(SuccessReply.class);
916         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
917         verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
918         verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
919
920         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
921         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
922         kit.expectMsgClass(SuccessReply.class);
923         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
924         verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
925         verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
926
927         // Capture the CandidateAdded messages.
928
929         final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
930                 CandidateAdded.class, 2);
931
932         // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
933         // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
934
935         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
936         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
937
938         // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
939
940         leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
941         leader.tell(candidateAdded.get(0), leader);
942         leader.tell(candidateAdded.get(1), leader);
943
944         expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
945             ae -> ae.getEntries().size() > 0);
946
947         // Verify no owner assigned.
948
949         verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
950         verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
951
952         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
953
954         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
955         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
956
957         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
958             ae -> ae.getLeaderId().equals(leaderId.toString()));
959         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
960
961         // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
962
963         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
964         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
965
966         // Verify the leader transitions to IsolatedLeader.
967
968         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
969                 state.getRaftState()));
970
971         // Send PeerDown to the new leader peer1.
972
973         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
974
975         // Make peer1 start an election and become leader by sending the TimeoutNow message.
976
977         peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
978
979         // Verify the peer1 transitions to Leader.
980
981         verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
982                 state.getRaftState()));
983
984         verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
985         verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
986
987         verifyNoMoreInteractions(peer1Listener);
988         verifyNoMoreInteractions(peer2Listener);
989
990         // Add candidate peer1 candidate for entity2.
991
992         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
993
994         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
995         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
996         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
997
998         reset(leaderListener, peer1Listener, peer2Listener);
999
1000         // Remove the isolation.
1001
1002         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1003         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1004         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1005         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1006
1007         // Previous leader should switch to Follower.
1008
1009         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1010                 state.getRaftState()));
1011
1012         // Send PeerUp to peer1 and peer2.
1013
1014         peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1015         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1016
1017         // The previous leader should become the owner of entity1.
1018
1019         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1020
1021         // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1022         //     - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1023         //     - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1024         //     - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1025         //     - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1026         verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1027                 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1028                 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1029
1030         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1031         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1032
1033         // Verify entity2's owner doesn't change.
1034
1035         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1036         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1037
1038         verifyNoMoreInteractions(leaderListener);
1039         verifyNoMoreInteractions(peer1Listener);
1040         verifyNoMoreInteractions(peer2Listener);
1041
1042         testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1043     }
1044
1045     @Test
1046     public void testListenerRegistration() {
1047         testLog.info("testListenerRegistration starting");
1048
1049         ShardTestKit kit = new ShardTestKit(getSystem());
1050
1051         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1052         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1053
1054         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1055                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1056         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1057
1058         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1059                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1060
1061         ShardTestKit.waitUntilLeader(leader);
1062
1063         String otherEntityType = "otherEntityType";
1064         final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1065         final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1066         final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1067         final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1068         DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1069
1070         // Register listener
1071
1072         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1073         kit.expectMsgClass(SuccessReply.class);
1074
1075         // Register a couple candidates for the desired entity type and verify listener is notified.
1076
1077         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1078         kit.expectMsgClass(SuccessReply.class);
1079
1080         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1081
1082         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1083         kit.expectMsgClass(SuccessReply.class);
1084
1085         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1086         reset(listener);
1087
1088         // Register another candidate for another entity type and verify listener is not notified.
1089
1090         leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1091         kit.expectMsgClass(SuccessReply.class);
1092
1093         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1094         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1095
1096         // Register remote candidate for entity1
1097
1098         peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1099         kit.expectMsgClass(SuccessReply.class);
1100         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1101
1102         // Unregister the local candidate for entity1 and verify listener is notified
1103
1104         leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1105         kit.expectMsgClass(SuccessReply.class);
1106
1107         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1108         reset(listener);
1109
1110         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1111
1112         leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1113         kit.expectMsgClass(SuccessReply.class);
1114
1115         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1116         kit.expectMsgClass(SuccessReply.class);
1117
1118         verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1119         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1120         verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1121
1122         // Re-register the listener and verify it gets notified of currently owned entities
1123
1124         reset(listener);
1125
1126         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1127         kit.expectMsgClass(SuccessReply.class);
1128
1129         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1130                 ownershipChange(entity3, false, true, true)));
1131         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1132         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1133         verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1134
1135         testLog.info("testListenerRegistration ending");
1136     }
1137
1138     @Test
1139     public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
1140         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1141
1142         ShardTestKit kit = new ShardTestKit(getSystem());
1143         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1144                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1145
1146         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1147         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1148
1149         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1150                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1151         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1152
1153         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1154                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1155                 leaderId.toString());
1156
1157         ShardTestKit.waitUntilLeader(leader);
1158
1159         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1160
1161         // Add a remote candidate
1162
1163         peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1164         kit.expectMsgClass(SuccessReply.class);
1165
1166         // Register local
1167
1168         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1169         kit.expectMsgClass(SuccessReply.class);
1170
1171         // Verify the local candidate becomes owner
1172
1173         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1174         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1175         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1176
1177         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1178     }
1179
1180     @Test
1181     public void testDelayedEntityOwnerSelection() {
1182         testLog.info("testDelayedEntityOwnerSelection starting");
1183
1184         final ShardTestKit kit = new ShardTestKit(getSystem());
1185         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1186                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1187
1188         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1189
1190         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1191         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1192         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1193
1194         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1195                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1196                     peerId1.toString());
1197         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1198
1199         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1200                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1201                     peerId2.toString());
1202         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1203
1204         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1205                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1206                         builder.build()), leaderId.toString());
1207
1208         ShardTestKit.waitUntilLeader(leader);
1209
1210         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1211
1212         // Add a remote candidate
1213
1214         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1215         kit.expectMsgClass(SuccessReply.class);
1216
1217         // Register local
1218
1219         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1220         kit.expectMsgClass(SuccessReply.class);
1221
1222         // Verify the local candidate becomes owner
1223
1224         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1225         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1226         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1227
1228         testLog.info("testDelayedEntityOwnerSelection ending");
1229     }
1230
1231     private Props newLocalShardProps() {
1232         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1233     }
1234
1235     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1236             final String memberName) {
1237         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1238     }
1239
1240     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1241                                 final EntityOwnerSelectionStrategyConfig config) {
1242         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1243                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1244     }
1245
1246     private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1247             final String memberName) {
1248         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1249                 dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
1250                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1251                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1252     }
1253
1254     private Map<String, String> peerMap(final String... peerIds) {
1255         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1256         for (String peerId: peerIds) {
1257             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1258         }
1259
1260         return builder.build();
1261     }
1262
1263     private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1264         private final ActorRef collectorActor;
1265         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1266
1267         TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1268             super(builder);
1269             this.collectorActor = collectorActor;
1270         }
1271
1272         @SuppressWarnings({ "unchecked", "rawtypes" })
1273         @Override
1274         public void handleCommand(final Object message) {
1275             Predicate drop = dropMessagesOfType.get(message.getClass());
1276             if (drop == null || !drop.test(message)) {
1277                 super.handleCommand(message);
1278             }
1279
1280             if (collectorActor != null) {
1281                 collectorActor.tell(message, ActorRef.noSender());
1282             }
1283         }
1284
1285         void startDroppingMessagesOfType(final Class<?> msgClass) {
1286             dropMessagesOfType.put(msgClass, msg -> true);
1287         }
1288
1289         <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1290             dropMessagesOfType.put(msgClass, filter);
1291         }
1292
1293         void stopDroppingMessagesOfType(final Class<?> msgClass) {
1294             dropMessagesOfType.remove(msgClass);
1295         }
1296
1297         ActorRef collectorActor() {
1298             return collectorActor;
1299         }
1300
1301         static Props props(final Builder builder) {
1302             return props(builder, null);
1303         }
1304
1305         static Props props(final Builder builder, final ActorRef collectorActor) {
1306             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1307                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1308         }
1309     }
1310 }