Bug 6540: EOS - Prune pending owner change commits on leader change
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.dispatch.Dispatchers;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Predicate;
36 import org.junit.After;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.access.concepts.MemberName;
39 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
40 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
41 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
42 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
43 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
44 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
45 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
46 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
47 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
48 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
50 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
52 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
53 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
54 import org.opendaylight.controller.cluster.raft.RaftState;
55 import org.opendaylight.controller.cluster.raft.TestActorFactory;
56 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
57 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
63 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
64 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
65 import org.opendaylight.yangtools.yang.common.QName;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
68
69 /**
70  * Unit tests for EntityOwnershipShard.
71  *
72  * @author Thomas Pantelis
73  */
74 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
75     private static final String ENTITY_TYPE = "test type";
76     private static final YangInstanceIdentifier ENTITY_ID1 =
77             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
78     private static final YangInstanceIdentifier ENTITY_ID2 =
79             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
80     private static final YangInstanceIdentifier ENTITY_ID3 =
81             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
82     private static final YangInstanceIdentifier ENTITY_ID4 =
83             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
84     private static final YangInstanceIdentifier ENTITY_ID5 =
85             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
86     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
87     private static final String LOCAL_MEMBER_NAME = "local-member-1";
88     private static final String PEER_MEMBER_1_NAME = "peer-member-1";
89     private static final String PEER_MEMBER_2_NAME = "peer-member-2";
90
91     private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
92     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
93
94     @After
95     public void tearDown() {
96         actorFactory.close();
97     }
98
99     @Test
100     public void testOnRegisterCandidateLocal() throws Exception {
101         testLog.info("testOnRegisterCandidateLocal starting");
102
103         ShardTestKit kit = new ShardTestKit(getSystem());
104
105         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
106
107         ShardTestKit.waitUntilLeader(shard);
108
109         YangInstanceIdentifier entityId = ENTITY_ID1;
110         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
111
112         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
113         kit.expectMsgClass(SuccessReply.class);
114
115         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
116         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
117
118         testLog.info("testOnRegisterCandidateLocal ending");
119     }
120
121     @Test
122     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
123         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
124
125         ShardTestKit kit = new ShardTestKit(getSystem());
126
127         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
128
129         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
130         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
131
132         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
133                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
134         TestEntityOwnershipShard peerShard = peer.underlyingActor();
135         peerShard.startDroppingMessagesOfType(RequestVote.class);
136         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
137
138         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
139                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
140
141         YangInstanceIdentifier entityId = ENTITY_ID1;
142         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
143
144         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
145         kit.expectMsgClass(SuccessReply.class);
146
147         // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
148         peerShard.stopDroppingMessagesOfType(RequestVote.class);
149
150         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
151         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
152
153         testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
154     }
155
156     @Test
157     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
158         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
159
160         ShardTestKit kit = new ShardTestKit(getSystem());
161
162         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
163                 shardTransactionCommitTimeoutInSeconds(1);
164
165         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
166         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
167
168         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
169                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
170         TestEntityOwnershipShard peerShard = peer.underlyingActor();
171         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
172
173         // Drop AppendEntries so consensus isn't reached.
174         peerShard.startDroppingMessagesOfType(AppendEntries.class);
175
176         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
177                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
178
179         ShardTestKit.waitUntilLeader(leader);
180
181         YangInstanceIdentifier entityId = ENTITY_ID1;
182         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
183
184         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
185         kit.expectMsgClass(SuccessReply.class);
186
187         // Wait enough time for the commit to timeout.
188         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
189
190         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
191         // write being applied to the state.
192         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
193
194         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
195         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
196
197         testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
198     }
199
200     @Test
201     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
202         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
203
204         ShardTestKit kit = new ShardTestKit(getSystem());
205
206         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
207                 shardIsolatedLeaderCheckIntervalInMillis(50);
208
209         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
210         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
211
212         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
213                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
214         TestEntityOwnershipShard peerShard = peer.underlyingActor();
215         peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
216
217         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
218                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
219
220         ShardTestKit.waitUntilLeader(leader);
221
222         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
223         peerShard.startDroppingMessagesOfType(AppendEntries.class);
224         verifyRaftState(leader, state ->
225                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
226
227         YangInstanceIdentifier entityId = ENTITY_ID1;
228         DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
229
230         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
231         kit.expectMsgClass(SuccessReply.class);
232
233         // Resume AppendEntries - the candidate write should now be committed.
234         peerShard.stopDroppingMessagesOfType(AppendEntries.class);
235         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
236         verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
237
238         testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
239     }
240
241     @Test
242     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
243         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
244
245         ShardTestKit kit = new ShardTestKit(getSystem());
246
247         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
248                 shardBatchedModificationCount(5);
249
250         ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
251         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
252         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
253                 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
254                 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
255         TestEntityOwnershipShard leaderShard = leader.underlyingActor();
256
257         TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
258                 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
259         local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
260
261         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
262         kit.expectMsgClass(SuccessReply.class);
263
264         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
265         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
266
267         // Test with initial commit timeout and subsequent retry.
268
269         local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
270         leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
271
272         local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
273         kit.expectMsgClass(SuccessReply.class);
274
275         MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
276
277         // Send a bunch of registration messages quickly and verify.
278
279         leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
280         MessageCollectorActor.clearMessages(leaderShard.collectorActor());
281
282         int max = 100;
283         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
284         for(int i = 1; i <= max; i++) {
285             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
286             entityIds.add(id);
287             local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
288         }
289
290         for(int i = 0; i < max; i++) {
291             verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
292         }
293
294         testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
295     }
296
297     @Test
298     public void testOnUnregisterCandidateLocal() throws Exception {
299         testLog.info("testOnUnregisterCandidateLocal starting");
300
301         ShardTestKit kit = new ShardTestKit(getSystem());
302         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
303         ShardTestKit.waitUntilLeader(shard);
304
305         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
306
307         // Register
308
309         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
310         kit.expectMsgClass(SuccessReply.class);
311
312         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
313         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
314
315         // Unregister
316
317         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
318         kit.expectMsgClass(SuccessReply.class);
319
320         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
321
322         // Register again
323
324         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
325         kit.expectMsgClass(SuccessReply.class);
326
327         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
328         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
329
330         testLog.info("testOnUnregisterCandidateLocal ending");
331     }
332
333     @Test
334     public void testOwnershipChanges() throws Exception {
335         testLog.info("testOwnershipChanges starting");
336
337         ShardTestKit kit = new ShardTestKit(getSystem());
338
339         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
340
341         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
342         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
343         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
344
345         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
346                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
347                     peerId1.toString());
348         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
349
350         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
351                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
352                     peerId2.toString());
353         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
354
355         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
356                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
357                     leaderId.toString());
358
359         ShardTestKit.waitUntilLeader(leader);
360
361         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
362
363         // Add a remote candidate
364
365         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
366         kit.expectMsgClass(SuccessReply.class);
367
368         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
369
370         // Register local
371
372         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
373         kit.expectMsgClass(SuccessReply.class);
374
375         // Verify the remote candidate becomes owner
376
377         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
378         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
379
380         // Add another remote candidate and verify ownership doesn't change
381
382         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
383         kit.expectMsgClass(SuccessReply.class);
384
385         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
386         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
387         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
388
389         // Remove the second remote candidate and verify ownership doesn't change
390
391         peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
392         kit.expectMsgClass(SuccessReply.class);
393
394         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
395         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
396         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
397
398         // Remove the first remote candidate and verify the local candidate becomes owner
399
400         peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
401         kit.expectMsgClass(SuccessReply.class);
402
403         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
404         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
405
406         // Add the second remote candidate back and verify ownership doesn't change
407
408         peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
409         kit.expectMsgClass(SuccessReply.class);
410
411         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
412         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
413         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
414
415         // Unregister the local candidate and verify the second remote candidate becomes owner
416
417         leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
418         kit.expectMsgClass(SuccessReply.class);
419
420         verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
421         verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
422
423         testLog.info("testOwnershipChanges ending");
424     }
425
426     @Test
427     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
428         testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
429
430         ShardTestKit kit = new ShardTestKit(getSystem());
431
432         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
433                 shardIsolatedLeaderCheckIntervalInMillis(100000);
434
435         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
436         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
437         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
438
439         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
440                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
441                     peerId1.toString());
442         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
443
444         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
445                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
446                     peerId2.toString());
447         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
448
449         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
450                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
451                     leaderId.toString());
452
453         verifyRaftState(leader, state ->
454                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
455
456         // Send PeerDown and PeerUp with no entities
457
458         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
459         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
460
461         // Add candidates for entity1 with the local leader as the owner
462
463         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
464         kit.expectMsgClass(SuccessReply.class);
465         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
466
467         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
468         kit.expectMsgClass(SuccessReply.class);
469         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
470
471         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
472         kit.expectMsgClass(SuccessReply.class);
473         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
474         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
475
476         // Add candidates for entity2 with peerMember2 as the owner
477
478         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
479         kit.expectMsgClass(SuccessReply.class);
480         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
481
482         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
483         kit.expectMsgClass(SuccessReply.class);
484         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
485         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
486
487         // Add candidates for entity3 with peerMember2 as the owner.
488
489         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
490         kit.expectMsgClass(SuccessReply.class);
491         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
492
493         leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
494         kit.expectMsgClass(SuccessReply.class);
495         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
496
497         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
498         kit.expectMsgClass(SuccessReply.class);
499         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
500         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
501
502         // Add only candidate peerMember2 for entity4.
503
504         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
505         kit.expectMsgClass(SuccessReply.class);
506         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
507         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
508
509         // Add only candidate peerMember1 for entity5.
510
511         peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
512         kit.expectMsgClass(SuccessReply.class);
513         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
514         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
515
516         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
517         // owner selected
518
519         kit.watch(peer2);
520         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
521         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
522         kit.unwatch(peer2);
523
524         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
525         // Send PeerDown again - should be noop
526         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
527         peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
528
529         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
530         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
531         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
532         // no other candidates for entity4 so peerMember2 should remain owner.
533         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
534
535         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
536         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
537         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
538         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
539
540         // Reinstate peerMember2
541
542         peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
543                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
544                     peerId2.toString());
545         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
546         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
547         // Send PeerUp again - should be noop
548         leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
549         peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
550
551         // peerMember2's candidates should be removed on startup.
552         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
553         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
554         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
555         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
556
557         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
558         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
559         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
560         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
561
562         // Add back candidate peerMember2 for entities 1, 2, & 3.
563
564         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
565         kit.expectMsgClass(SuccessReply.class);
566         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
567         kit.expectMsgClass(SuccessReply.class);
568         peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
569         kit.expectMsgClass(SuccessReply.class);
570         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
571         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
572         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
573         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
574         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
575         verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
576         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
577         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
578         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
579         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
580         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
581         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
582         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
583
584         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
585
586         kit.watch(peer1);
587         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
588         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
589         kit.unwatch(peer1);
590         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
591
592         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
593
594         // Verify the reinstated peerMember2 is fully synced.
595
596         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
597         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
598         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
599         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
600
601         // Reinstate peerMember1 and verify no owner changes
602
603         peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
604                 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
605         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
606         leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
607
608         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
609         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
610         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
611         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
612
613         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
614         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
615         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
616
617         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
618         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
619         verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
620
621         // Verify the reinstated peerMember1 is fully synced.
622
623         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
624         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
625         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
626         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
627
628         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
629         // the entities (1 and 3) previously owned by the local leader member.
630
631         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
632         peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
633         peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
634
635         kit.watch(leader);
636         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
637         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
638         kit.unwatch(leader);
639         peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
640         peer2.tell(TimeoutNow.INSTANCE, peer2);
641
642         verifyRaftState(peer2, state ->
643                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
644
645         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
646         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
647         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
648         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
649
650         testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
651     }
652
653     @Test
654     public void testLeaderIsolation() throws Exception {
655         testLog.info("testLeaderIsolation starting");
656
657         ShardTestKit kit = new ShardTestKit(getSystem());
658
659         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
660         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
661         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
662
663         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
664             shardIsolatedLeaderCheckIntervalInMillis(100000);
665
666         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
667                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
668                     peerId1.toString());
669         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
670
671         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
672                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
673                     peerId2.toString());
674         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
675
676         dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
677                 shardIsolatedLeaderCheckIntervalInMillis(500);
678
679         TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
680                 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
681                     leaderId.toString());
682
683         ShardTestKit.waitUntilLeader(leader);
684
685         // Add entity1 candidates for all members with the leader as the owner
686
687         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
688         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
689         kit.expectMsgClass(SuccessReply.class);
690         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
691
692         peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
693         kit.expectMsgClass(SuccessReply.class);
694         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
695
696         peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
697         kit.expectMsgClass(SuccessReply.class);
698         verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
699
700         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
701         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
702         verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
703
704         // Add entity2 candidates for all members with peer1 as the owner
705
706         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
707         peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
708         kit.expectMsgClass(SuccessReply.class);
709         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
710
711         peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
712         kit.expectMsgClass(SuccessReply.class);
713         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
714
715         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
716         kit.expectMsgClass(SuccessReply.class);
717         verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
718
719         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
720         verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
721         verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
722
723         // Add entity3 candidates for all members with peer2 as the owner
724
725         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
726         peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
727         kit.expectMsgClass(SuccessReply.class);
728         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
729
730         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
731         kit.expectMsgClass(SuccessReply.class);
732         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
733
734         peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
735         kit.expectMsgClass(SuccessReply.class);
736         verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
737
738         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
739         verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
740         verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
741
742         // Add listeners on all members
743
744         DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
745         leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
746         kit.expectMsgClass(SuccessReply.class);
747         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true),
748                 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
749         reset(leaderListener);
750
751         DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
752         peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
753         kit.expectMsgClass(SuccessReply.class);
754         verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
755                 ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true)));
756         reset(peer1Listener);
757
758         DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
759         peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
760         kit.expectMsgClass(SuccessReply.class);
761         verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
762                 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true)));
763         reset(peer2Listener);
764
765         // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
766
767         leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
768         leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
769
770         peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
771                 ae -> ae.getLeaderId().equals(leaderId.toString()));
772         peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
773
774         // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
775
776         peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
777
778         // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
779         // isolated peers.
780
781         leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
782         leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
783
784         verifyRaftState(leader, state ->
785                 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
786
787         // Expect inJeopardy notification on the isolated leader.
788
789         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true),
790                 ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true)));
791         reset(leaderListener);
792
793         verifyRaftState(peer1, state ->
794                 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
795
796         // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
797         // isolated leader.
798
799         peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
800
801         verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
802
803         verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
804         reset(peer1Listener);
805
806         verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
807         reset(peer2Listener);
808
809         // Remove the isolation.
810
811         leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
812         leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
813         peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
814         peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
815
816         // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
817
818         verifyRaftState(leader, state ->
819                 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
820
821         verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true),
822                 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
823
824         verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
825         verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
826
827         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
828         verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
829         verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
830
831         verifyNoMoreInteractions(leaderListener);
832         verifyNoMoreInteractions(peer1Listener);
833         verifyNoMoreInteractions(peer2Listener);
834
835         testLog.info("testLeaderIsolation ending");
836     }
837
838     @Test
839     public void testListenerRegistration() throws Exception {
840         testLog.info("testListenerRegistration starting");
841
842         ShardTestKit kit = new ShardTestKit(getSystem());
843
844         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
845         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
846
847         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
848                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
849         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
850
851         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
852                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
853
854         ShardTestKit.waitUntilLeader(leader);
855
856         String otherEntityType = "otherEntityType";
857         DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
858         DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
859         DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
860         DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
861         DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
862
863         // Register listener
864
865         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
866         kit.expectMsgClass(SuccessReply.class);
867
868         // Register a couple candidates for the desired entity type and verify listener is notified.
869
870         leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
871         kit.expectMsgClass(SuccessReply.class);
872
873         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
874
875         leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
876         kit.expectMsgClass(SuccessReply.class);
877
878         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
879         reset(listener);
880
881         // Register another candidate for another entity type and verify listener is not notified.
882
883         leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
884         kit.expectMsgClass(SuccessReply.class);
885
886         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
887         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
888
889         // Register remote candidate for entity1
890
891         peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
892         kit.expectMsgClass(SuccessReply.class);
893         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
894
895         // Unregister the local candidate for entity1 and verify listener is notified
896
897         leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
898         kit.expectMsgClass(SuccessReply.class);
899
900         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
901         reset(listener);
902
903         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
904
905         leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
906         kit.expectMsgClass(SuccessReply.class);
907
908         leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
909         kit.expectMsgClass(SuccessReply.class);
910
911         verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
912         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
913         verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
914
915         // Re-register the listener and verify it gets notified of currently owned entities
916
917         reset(listener);
918
919         leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
920         kit.expectMsgClass(SuccessReply.class);
921
922         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
923                 ownershipChange(entity3, false, true, true)));
924         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
925         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
926         verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
927
928         testLog.info("testListenerRegistration ending");
929     }
930
931     @Test
932     public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
933         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
934
935         ShardTestKit kit = new ShardTestKit(getSystem());
936         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
937                 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
938
939         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
940         ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
941
942         TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
943                 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
944         peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
945
946         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
947                 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString());
948
949         ShardTestKit.waitUntilLeader(leader);
950
951         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
952
953         // Add a remote candidate
954
955         peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
956         kit.expectMsgClass(SuccessReply.class);
957
958         // Register local
959
960         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
961         kit.expectMsgClass(SuccessReply.class);
962
963         // Verify the local candidate becomes owner
964
965         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
966         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
967         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
968
969         testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
970     }
971
972     @Test
973     public void testDelayedEntityOwnerSelection() throws Exception {
974         testLog.info("testDelayedEntityOwnerSelection starting");
975
976         ShardTestKit kit = new ShardTestKit(getSystem());
977         EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
978                 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
979
980         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
981
982         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
983         ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
984         ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
985
986         TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
987                 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
988                     peerId1.toString());
989         peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
990
991         TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
992                 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
993                     peerId2.toString());
994         peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
995
996         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
997                 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()),
998                     leaderId.toString());
999
1000         ShardTestKit.waitUntilLeader(leader);
1001
1002         DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1003
1004         // Add a remote candidate
1005
1006         peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1007         kit.expectMsgClass(SuccessReply.class);
1008
1009         // Register local
1010
1011         leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1012         kit.expectMsgClass(SuccessReply.class);
1013
1014         // Verify the local candidate becomes owner
1015
1016         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1017         verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1018         verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1019
1020         testLog.info("testDelayedEntityOwnerSelection ending");
1021     }
1022
1023     private Props newLocalShardProps() {
1024         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1025     }
1026
1027     private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
1028         return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1029     }
1030
1031     private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
1032                                 EntityOwnerSelectionStrategyConfig config) {
1033         return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1034                     .withDispatcher(Dispatchers.DefaultDispatcherId());
1035     }
1036
1037     private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
1038             String memberName) {
1039         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1040                 dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
1041                         MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1042                                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1043     }
1044
1045     private Map<String, String> peerMap(String... peerIds) {
1046         ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1047         for(String peerId: peerIds) {
1048             builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1049         }
1050
1051         return builder.build();
1052     }
1053
1054     private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1055         private final TestActorRef<MessageCollectorActor> collectorActor;
1056         private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1057
1058         TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1059             super(builder);
1060             this.collectorActor = collectorActor;
1061         }
1062
1063         @SuppressWarnings({ "unchecked", "rawtypes" })
1064         @Override
1065         public void handleCommand(Object message) {
1066             if(collectorActor != null) {
1067                 collectorActor.tell(message, ActorRef.noSender());
1068             }
1069
1070             Predicate drop = dropMessagesOfType.get(message.getClass());
1071             if(drop == null || !drop.test(message)) {
1072                 super.handleCommand(message);
1073             }
1074         }
1075
1076         void startDroppingMessagesOfType(Class<?> msgClass) {
1077             dropMessagesOfType.put(msgClass, msg -> true);
1078         }
1079
1080         <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
1081             dropMessagesOfType.put(msgClass, filter);
1082         }
1083
1084         void stopDroppingMessagesOfType(Class<?> msgClass) {
1085             dropMessagesOfType.remove(msgClass);
1086         }
1087
1088         TestActorRef<MessageCollectorActor> collectorActor() {
1089             return collectorActor;
1090         }
1091
1092         static Props props(Builder builder) {
1093             return props(builder, null);
1094         }
1095
1096         static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1097             return Props.create(TestEntityOwnershipShard.class, builder, collectorActor).
1098                     withDispatcher(Dispatchers.DefaultDispatcherId());
1099         }
1100     }
1101 }