Fix intermittent PreLeaderScenarioTest failure
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
23
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.JavaTestKit;
30 import akka.testkit.TestActorRef;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
36 import java.util.Map;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.function.Predicate;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.access.concepts.MemberName;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
53 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
59 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
60 import org.opendaylight.controller.cluster.raft.RaftState;
61 import org.opendaylight.controller.cluster.raft.TestActorFactory;
62 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
63 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
70 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
71 import org.opendaylight.yangtools.yang.common.QName;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
74
75 /**
76  * Unit tests for EntityOwnershipShard.
77  *
78  * @author Thomas Pantelis
79  */
80 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
81     private static final String ENTITY_TYPE = "test type";
82     private static final YangInstanceIdentifier ENTITY_ID1 =
83             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
84     private static final YangInstanceIdentifier ENTITY_ID2 =
85             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
86     private static final YangInstanceIdentifier ENTITY_ID3 =
87             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
88     private static final YangInstanceIdentifier ENTITY_ID4 =
89             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
90     private static final YangInstanceIdentifier ENTITY_ID5 =
91             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
92     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
93     private static final String LOCAL_MEMBER_NAME = "local-member-1";
94     private static final String PEER_MEMBER_1_NAME = "peer-member-1";
95     private static final String PEER_MEMBER_2_NAME = "peer-member-2";
96
97     private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
98     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
99
100     @After
101     public void tearDown() {
102         actorFactory.close();
103     }
104
105     @Test
106     public void testOnRegisterCandidateLocal() throws Exception {
107         testLog.info("testOnRegisterCandidateLocal starting");
108
109         ShardTestKit kit = new ShardTestKit(getSystem());
110
111         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
112
113         ShardTestKit.waitUntilLeader(shard);
114
115         YangInstanceIdentifier entityId = ENTITY_ID1;
116         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
117
118         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
119         kit.expectMsgClass(SuccessReply.class);
120
121         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
123
124         testLog.info("testOnRegisterCandidateLocal ending");
125     }
126
127     @Test
128     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
129         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
130
131         final ShardTestKit kit = new ShardTestKit(getSystem());
132
133         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
134
135         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
136         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
137
138         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
139                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
140         TestEntityOwnershipShard peerShard = peer.underlyingActor();
141         peerShard.startDroppingMessagesOfType(RequestVote.class);
142         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
143
144         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
145                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
146
147         YangInstanceIdentifier entityId = ENTITY_ID1;
148         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
149
150         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
151         kit.expectMsgClass(SuccessReply.class);
152
153         // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
154         peerShard.stopDroppingMessagesOfType(RequestVote.class);
155
156         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
157         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
158
159         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
160     }
161
162     @Test
163     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
164         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
165
166         final ShardTestKit kit = new ShardTestKit(getSystem());
167
168         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
169                 .shardTransactionCommitTimeoutInSeconds(1);
170
171         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
172         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
173
174         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
175                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
176         TestEntityOwnershipShard peerShard = peer.underlyingActor();
177         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
178
179         // Drop AppendEntries so consensus isn't reached.
180         peerShard.startDroppingMessagesOfType(AppendEntries.class);
181
182         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
183                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
184
185         ShardTestKit.waitUntilLeader(leader);
186
187         YangInstanceIdentifier entityId = ENTITY_ID1;
188         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
189
190         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
191         kit.expectMsgClass(SuccessReply.class);
192
193         // Wait enough time for the commit to timeout.
194         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
195
196         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
197         // write being applied to the state.
198         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
199
200         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
201         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
202
203         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
204     }
205
206     @Test
207     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
208         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
209
210         final ShardTestKit kit = new ShardTestKit(getSystem());
211
212         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
213                 .shardIsolatedLeaderCheckIntervalInMillis(50);
214
215         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
216         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
217
218         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
219                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
220         TestEntityOwnershipShard peerShard = peer.underlyingActor();
221         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
222
223         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
224                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
225
226         ShardTestKit.waitUntilLeader(leader);
227
228         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
229         peerShard.startDroppingMessagesOfType(AppendEntries.class);
230         verifyRaftState(leader, state ->
231                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
232
233         YangInstanceIdentifier entityId = ENTITY_ID1;
234         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
235
236         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
237         kit.expectMsgClass(SuccessReply.class);
238
239         // Resume AppendEntries - the candidate write should now be committed.
240         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
241         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
242         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
243
244         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
245     }
246
247     @Test
248     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
249         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
250
251         ShardTestKit kit = new ShardTestKit(getSystem());
252
253         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
254                 .shardBatchedModificationCount(5);
255
256         ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
257         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
258         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
259                 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
260                 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
261         final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
262
263         TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
264                 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
265         local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
266
267         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
268         kit.expectMsgClass(SuccessReply.class);
269
270         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
271         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
272
273         // Test with initial commit timeout and subsequent retry.
274
275         local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
276         leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
277
278         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
279         kit.expectMsgClass(SuccessReply.class);
280
281         expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
282
283         // Send a bunch of registration messages quickly and verify.
284
285         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
286         clearMessages(leaderShard.collectorActor());
287
288         int max = 100;
289         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
290         for (int i = 1; i <= max; i++) {
291             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
292             entityIds.add(id);
293             local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
294         }
295
296         for (int i = 0; i < max; i++) {
297             verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
298         }
299
300         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
301     }
302
303     @Test
304     public void testOnUnregisterCandidateLocal() throws Exception {
305         testLog.info("testOnUnregisterCandidateLocal starting");
306
307         ShardTestKit kit = new ShardTestKit(getSystem());
308         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
309         ShardTestKit.waitUntilLeader(shard);
310
311         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
312
313         // Register
314
315         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
316         kit.expectMsgClass(SuccessReply.class);
317
318         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
319         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
320
321         // Unregister
322
323         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
324         kit.expectMsgClass(SuccessReply.class);
325
326         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
327
328         // Register again
329
330         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
331         kit.expectMsgClass(SuccessReply.class);
332
333         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
334         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
335
336         testLog.info("testOnUnregisterCandidateLocal ending");
337     }
338
339     @Test
340     public void testOwnershipChanges() throws Exception {
341         testLog.info("testOwnershipChanges starting");
342
343         final ShardTestKit kit = new ShardTestKit(getSystem());
344
345         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
346
347         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
348         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
349         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
350
351         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
352                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
353                     peerId1.toString());
354         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
355
356         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
357                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
358                     peerId2.toString());
359         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
360
361         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
362                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
363                     leaderId.toString());
364
365         ShardTestKit.waitUntilLeader(leader);
366
367         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
368
369         // Add a remote candidate
370
371         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
372         kit.expectMsgClass(SuccessReply.class);
373
374         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
375
376         // Register local
377
378         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
379         kit.expectMsgClass(SuccessReply.class);
380
381         // Verify the remote candidate becomes owner
382
383         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
384         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
385
386         // Add another remote candidate and verify ownership doesn't change
387
388         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
389         kit.expectMsgClass(SuccessReply.class);
390
391         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
392         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
393         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
394
395         // Remove the second remote candidate and verify ownership doesn't change
396
397         peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
398         kit.expectMsgClass(SuccessReply.class);
399
400         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
401         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
402         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
403
404         // Remove the first remote candidate and verify the local candidate becomes owner
405
406         peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
407         kit.expectMsgClass(SuccessReply.class);
408
409         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
410         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
411
412         // Add the second remote candidate back and verify ownership doesn't change
413
414         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
415         kit.expectMsgClass(SuccessReply.class);
416
417         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
418         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
419         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
420
421         // Unregister the local candidate and verify the second remote candidate becomes owner
422
423         leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
424         kit.expectMsgClass(SuccessReply.class);
425
426         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
427         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
428
429         testLog.info("testOwnershipChanges ending");
430     }
431
432     @Test
433     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
434         testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
435
436         final ShardTestKit kit = new ShardTestKit(getSystem());
437
438         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
439                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
440
441         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
442         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
443         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
444
445         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
446                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
447                     peerId1.toString());
448         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
449
450         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
451                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
452                     peerId2.toString());
453         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
454
455         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
456                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
457                     leaderId.toString());
458
459         verifyRaftState(leader, state ->
460                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
461
462         // Send PeerDown and PeerUp with no entities
463
464         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
465         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
466
467         // Add candidates for entity1 with the local leader as the owner
468
469         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
470         kit.expectMsgClass(SuccessReply.class);
471         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
472
473         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
474         kit.expectMsgClass(SuccessReply.class);
475         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
476
477         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
478         kit.expectMsgClass(SuccessReply.class);
479         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
480         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
481
482         // Add candidates for entity2 with peerMember2 as the owner
483
484         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
485         kit.expectMsgClass(SuccessReply.class);
486         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
487
488         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
489         kit.expectMsgClass(SuccessReply.class);
490         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
491         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
492
493         // Add candidates for entity3 with peerMember2 as the owner.
494
495         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
496         kit.expectMsgClass(SuccessReply.class);
497         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
498
499         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
500         kit.expectMsgClass(SuccessReply.class);
501         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
502
503         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
504         kit.expectMsgClass(SuccessReply.class);
505         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
506         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
507
508         // Add only candidate peerMember2 for entity4.
509
510         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
511         kit.expectMsgClass(SuccessReply.class);
512         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
513         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
514
515         // Add only candidate peerMember1 for entity5.
516
517         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
518         kit.expectMsgClass(SuccessReply.class);
519         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
520         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
521
522         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
523         // owner selected
524
525         kit.watch(peer2);
526         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
527         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
528         kit.unwatch(peer2);
529
530         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
531         // Send PeerDown again - should be noop
532         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
533         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
534
535         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
536         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
537         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
538         // no other candidates for entity4 so peerMember2 should remain owner.
539         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
540
541         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
542         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
543         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
544         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
545
546         // Reinstate peerMember2
547
548         peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
549                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
550                     peerId2.toString());
551         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
552         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
553         // Send PeerUp again - should be noop
554         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
555         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
556
557         // peerMember2's candidates should be removed on startup.
558         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
559         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
560         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
561         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
562
563         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
564         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
565         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
566         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
567
568         // Add back candidate peerMember2 for entities 1, 2, & 3.
569
570         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
571         kit.expectMsgClass(SuccessReply.class);
572         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
573         kit.expectMsgClass(SuccessReply.class);
574         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
575         kit.expectMsgClass(SuccessReply.class);
576         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
577         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
578         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
579         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
580         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
581         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
582         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
583         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
584         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
585         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
586         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
587         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
588         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
589
590         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
591
592         kit.watch(peer1);
593         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
594         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
595         kit.unwatch(peer1);
596         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
597
598         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
599
600         // Verify the reinstated peerMember2 is fully synced.
601
602         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
603         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
604         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
605         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
606
607         // Reinstate peerMember1 and verify no owner changes
608
609         peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
610                 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
611         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
612         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
613
614         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
615         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
616         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
617         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
618
619         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
620         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
621         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
622
623         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
624         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
625         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
626
627         // Verify the reinstated peerMember1 is fully synced.
628
629         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
630         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
631         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
632         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
633
634         AtomicLong leaderLastApplied = new AtomicLong();
635         verifyRaftState(leader, rs -> {
636             assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
637             leaderLastApplied.set(rs.getLastApplied());
638         });
639
640         verifyRaftState(peer2, rs -> {
641             assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex());
642         });
643
644         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
645         // the entities (1 and 3) previously owned by the local leader member.
646
647         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
648         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
649         peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
650
651         kit.watch(leader);
652         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
653         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
654         kit.unwatch(leader);
655         peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
656         peer2.tell(TimeoutNow.INSTANCE, peer2);
657
658         verifyRaftState(peer2, state ->
659                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
660
661         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
662         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
663         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
664         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
665
666         testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
667     }
668
669     @Test
670     public void testLeaderIsolation() throws Exception {
671         testLog.info("testLeaderIsolation starting");
672
673         final ShardTestKit kit = new ShardTestKit(getSystem());
674
675         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
676         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
677         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
678
679         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
680                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
681
682         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
683                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
684                     peerId1.toString());
685         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
686
687         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
688                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
689                     peerId2.toString());
690         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
691
692         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
693                 .shardIsolatedLeaderCheckIntervalInMillis(500);
694
695         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
696                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
697                     leaderId.toString());
698
699         ShardTestKit.waitUntilLeader(leader);
700
701         // Add entity1 candidates for all members with the leader as the owner
702
703         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
704         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
705         kit.expectMsgClass(SuccessReply.class);
706         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
707
708         peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
709         kit.expectMsgClass(SuccessReply.class);
710         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
711
712         peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
713         kit.expectMsgClass(SuccessReply.class);
714         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
715
716         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
717         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
718         verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
719
720         // Add entity2 candidates for all members with peer1 as the owner
721
722         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
723         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
724         kit.expectMsgClass(SuccessReply.class);
725         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
726
727         peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
728         kit.expectMsgClass(SuccessReply.class);
729         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
730
731         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
732         kit.expectMsgClass(SuccessReply.class);
733         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
734
735         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
736         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
737         verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
738
739         // Add entity3 candidates for all members with peer2 as the owner
740
741         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
742         peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
743         kit.expectMsgClass(SuccessReply.class);
744         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
745
746         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
747         kit.expectMsgClass(SuccessReply.class);
748         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
749
750         peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
751         kit.expectMsgClass(SuccessReply.class);
752         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
753
754         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
755         verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
756         verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
757
758         // Add listeners on all members
759
760         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
761         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
762         kit.expectMsgClass(SuccessReply.class);
763         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
764                 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
765                 ownershipChange(entity3, false, false, true)));
766         reset(leaderListener);
767
768         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
769         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
770         kit.expectMsgClass(SuccessReply.class);
771         verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
772                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
773                 ownershipChange(entity3, false, false, true)));
774         reset(peer1Listener);
775
776         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
777         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
778         kit.expectMsgClass(SuccessReply.class);
779         verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
780                 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
781                 ownershipChange(entity3, false, true, true)));
782         reset(peer2Listener);
783
784         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
785
786         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
787         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
788
789         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
790             ae -> ae.getLeaderId().equals(leaderId.toString()));
791         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
792
793         // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
794
795         peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
796
797         // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
798         // isolated peers.
799
800         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
801         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
802
803         verifyRaftState(leader, state ->
804                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
805
806         // Expect inJeopardy notification on the isolated leader.
807
808         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
809                 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
810                 ownershipChange(entity3, false, false, true, true)));
811         reset(leaderListener);
812
813         verifyRaftState(peer1, state ->
814                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
815
816         // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
817         // isolated leader.
818
819         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
820
821         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
822
823         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
824         reset(peer1Listener);
825
826         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
827         reset(peer2Listener);
828
829         // Remove the isolation.
830
831         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
832         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
833         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
834         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
835
836         // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
837
838         verifyRaftState(leader, state ->
839                 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
840
841         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
842                 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
843                 ownershipChange(entity3, false, false, true)));
844
845         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
846         verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
847
848         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
849         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
850         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
851
852         verifyNoMoreInteractions(leaderListener);
853         verifyNoMoreInteractions(peer1Listener);
854         verifyNoMoreInteractions(peer2Listener);
855
856         testLog.info("testLeaderIsolation ending");
857     }
858
859     @Test
860     public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
861         testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
862
863         final ShardTestKit kit = new ShardTestKit(getSystem());
864
865         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
866         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
867         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
868
869         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
870                 .shardIsolatedLeaderCheckIntervalInMillis(100000);
871
872         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
873                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
874                 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
875         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
876
877         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
878                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
879                 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
880         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
881
882         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
883                 .shardIsolatedLeaderCheckIntervalInMillis(500);
884
885         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
886                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
887                 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
888
889         ShardTestKit.waitUntilLeader(leader);
890
891         // Add listeners on all members
892
893         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
894                 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
895         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
896         kit.expectMsgClass(SuccessReply.class);
897
898         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
899                 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
900         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
901         kit.expectMsgClass(SuccessReply.class);
902
903         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
904                 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
905         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
906         kit.expectMsgClass(SuccessReply.class);
907
908         // Drop the CandidateAdded message to the leader for now.
909
910         leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
911
912         // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
913         // assigned the owner.
914
915         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
916         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
917         kit.expectMsgClass(SuccessReply.class);
918         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
919         verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
920         verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
921
922         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
923         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
924         kit.expectMsgClass(SuccessReply.class);
925         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
926         verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
927         verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
928
929         // Capture the CandidateAdded messages.
930
931         final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
932                 CandidateAdded.class, 2);
933
934         // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
935         // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
936
937         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
938         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
939
940         // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
941
942         leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
943         leader.tell(candidateAdded.get(0), leader);
944         leader.tell(candidateAdded.get(1), leader);
945
946         expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
947             ae -> ae.getEntries().size() > 0);
948
949         // Verify no owner assigned.
950
951         verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
952         verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
953
954         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
955
956         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
957         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
958
959         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
960             ae -> ae.getLeaderId().equals(leaderId.toString()));
961         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
962
963         // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
964
965         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
966         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
967
968         // Verify the leader transitions to IsolatedLeader.
969
970         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
971                 state.getRaftState()));
972
973         // Send PeerDown to the new leader peer1.
974
975         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
976
977         // Make peer1 start an election and become leader by sending the TimeoutNow message.
978
979         peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
980
981         // Verify the peer1 transitions to Leader.
982
983         verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
984                 state.getRaftState()));
985
986         verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
987         verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
988
989         verifyNoMoreInteractions(peer1Listener);
990         verifyNoMoreInteractions(peer2Listener);
991
992         // Add candidate peer1 candidate for entity2.
993
994         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
995
996         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
997         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
998         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
999
1000         reset(leaderListener, peer1Listener, peer2Listener);
1001
1002         // Remove the isolation.
1003
1004         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1005         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1006         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1007         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1008
1009         // Previous leader should switch to Follower.
1010
1011         verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1012                 state.getRaftState()));
1013
1014         // Send PeerUp to peer1 and peer2.
1015
1016         peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1017         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1018
1019         // The previous leader should become the owner of entity1.
1020
1021         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1022
1023         // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1024         //     - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1025         //     - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1026         //     - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1027         //     - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1028         verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1029                 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1030                 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1031
1032         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1033         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1034
1035         // Verify entity2's owner doesn't change.
1036
1037         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1038         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1039
1040         verifyNoMoreInteractions(leaderListener);
1041         verifyNoMoreInteractions(peer1Listener);
1042         verifyNoMoreInteractions(peer2Listener);
1043
1044         testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1045     }
1046
1047     @Test
1048     public void testListenerRegistration() throws Exception {
1049         testLog.info("testListenerRegistration starting");
1050
1051         ShardTestKit kit = new ShardTestKit(getSystem());
1052
1053         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1054         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1055
1056         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1057                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1058         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1059
1060         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1061                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1062
1063         ShardTestKit.waitUntilLeader(leader);
1064
1065         String otherEntityType = "otherEntityType";
1066         final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1067         final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1068         final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1069         final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1070         DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1071
1072         // Register listener
1073
1074         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1075         kit.expectMsgClass(SuccessReply.class);
1076
1077         // Register a couple candidates for the desired entity type and verify listener is notified.
1078
1079         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1080         kit.expectMsgClass(SuccessReply.class);
1081
1082         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1083
1084         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1085         kit.expectMsgClass(SuccessReply.class);
1086
1087         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1088         reset(listener);
1089
1090         // Register another candidate for another entity type and verify listener is not notified.
1091
1092         leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1093         kit.expectMsgClass(SuccessReply.class);
1094
1095         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1096         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1097
1098         // Register remote candidate for entity1
1099
1100         peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1101         kit.expectMsgClass(SuccessReply.class);
1102         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1103
1104         // Unregister the local candidate for entity1 and verify listener is notified
1105
1106         leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1107         kit.expectMsgClass(SuccessReply.class);
1108
1109         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1110         reset(listener);
1111
1112         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1113
1114         leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1115         kit.expectMsgClass(SuccessReply.class);
1116
1117         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1118         kit.expectMsgClass(SuccessReply.class);
1119
1120         verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1121         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1122         verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1123
1124         // Re-register the listener and verify it gets notified of currently owned entities
1125
1126         reset(listener);
1127
1128         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1129         kit.expectMsgClass(SuccessReply.class);
1130
1131         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1132                 ownershipChange(entity3, false, true, true)));
1133         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1134         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1135         verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1136
1137         testLog.info("testListenerRegistration ending");
1138     }
1139
1140     @Test
1141     public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
1142         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1143
1144         ShardTestKit kit = new ShardTestKit(getSystem());
1145         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1146                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1147
1148         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1149         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1150
1151         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1152                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1153         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1154
1155         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1156                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1157                 leaderId.toString());
1158
1159         ShardTestKit.waitUntilLeader(leader);
1160
1161         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1162
1163         // Add a remote candidate
1164
1165         peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1166         kit.expectMsgClass(SuccessReply.class);
1167
1168         // Register local
1169
1170         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1171         kit.expectMsgClass(SuccessReply.class);
1172
1173         // Verify the local candidate becomes owner
1174
1175         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1176         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1177         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1178
1179         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1180     }
1181
1182     @Test
1183     public void testDelayedEntityOwnerSelection() throws Exception {
1184         testLog.info("testDelayedEntityOwnerSelection starting");
1185
1186         final ShardTestKit kit = new ShardTestKit(getSystem());
1187         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1188                 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1189
1190         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1191
1192         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1193         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1194         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1195
1196         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1197                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1198                     peerId1.toString());
1199         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1200
1201         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1202                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1203                     peerId2.toString());
1204         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1205
1206         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1207                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1208                         builder.build()), leaderId.toString());
1209
1210         ShardTestKit.waitUntilLeader(leader);
1211
1212         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1213
1214         // Add a remote candidate
1215
1216         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1217         kit.expectMsgClass(SuccessReply.class);
1218
1219         // Register local
1220
1221         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1222         kit.expectMsgClass(SuccessReply.class);
1223
1224         // Verify the local candidate becomes owner
1225
1226         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1227         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1228         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1229
1230         testLog.info("testDelayedEntityOwnerSelection ending");
1231     }
1232
1233     private Props newLocalShardProps() {
1234         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1235     }
1236
1237     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1238             final String memberName) {
1239         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1240     }
1241
1242     private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1243                                 final EntityOwnerSelectionStrategyConfig config) {
1244         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1245                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1246     }
1247
1248     private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1249             final String memberName) {
1250         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1251                 dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
1252                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1253                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1254     }
1255
1256     private Map<String, String> peerMap(final String... peerIds) {
1257         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1258         for (String peerId: peerIds) {
1259             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1260         }
1261
1262         return builder.build();
1263     }
1264
1265     private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1266         private final ActorRef collectorActor;
1267         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1268
1269         TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1270             super(builder);
1271             this.collectorActor = collectorActor;
1272         }
1273
1274         @SuppressWarnings({ "unchecked", "rawtypes" })
1275         @Override
1276         public void handleCommand(final Object message) {
1277             Predicate drop = dropMessagesOfType.get(message.getClass());
1278             if (drop == null || !drop.test(message)) {
1279                 super.handleCommand(message);
1280             }
1281
1282             if (collectorActor != null) {
1283                 collectorActor.tell(message, ActorRef.noSender());
1284             }
1285         }
1286
1287         void startDroppingMessagesOfType(final Class<?> msgClass) {
1288             dropMessagesOfType.put(msgClass, msg -> true);
1289         }
1290
1291         <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1292             dropMessagesOfType.put(msgClass, filter);
1293         }
1294
1295         void stopDroppingMessagesOfType(final Class<?> msgClass) {
1296             dropMessagesOfType.remove(msgClass);
1297         }
1298
1299         ActorRef collectorActor() {
1300             return collectorActor;
1301         }
1302
1303         static Props props(final Builder builder) {
1304             return props(builder, null);
1305         }
1306
1307         static Props props(final Builder builder, final ActorRef collectorActor) {
1308             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1309                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1310         }
1311     }
1312 }