Choose owner when all candidate registrations received.
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Dispatchers;
27 import akka.testkit.JavaTestKit;
28 import akka.testkit.TestActorRef;
29 import com.google.common.base.Function;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import org.junit.After;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
42 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
59 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
60 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
61 import org.opendaylight.controller.cluster.datastore.modification.Modification;
62 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
63 import org.opendaylight.controller.cluster.raft.TestActorFactory;
64 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
66 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
69 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
70 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
71 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
72 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
73 import org.opendaylight.yangtools.yang.common.QName;
74 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
75 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
76 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
77
78 /**
79  * Unit tests for EntityOwnershipShard.
80  *
81  * @author Thomas Pantelis
82  */
83 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
84     private static final String ENTITY_TYPE = "test type";
85     private static final YangInstanceIdentifier ENTITY_ID1 =
86             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
87     private static final YangInstanceIdentifier ENTITY_ID2 =
88             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
89     private static final YangInstanceIdentifier ENTITY_ID3 =
90             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
91     private static final YangInstanceIdentifier ENTITY_ID4 =
92             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
93     private static final YangInstanceIdentifier ENTITY_ID5 =
94             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
95     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
96     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
97     private static final String LOCAL_MEMBER_NAME = "member-1";
98
99     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
100     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
101
102     @After
103     public void tearDown() {
104         actorFactory.close();
105     }
106
107     @Test
108     public void testOnRegisterCandidateLocal() throws Exception {
109         ShardTestKit kit = new ShardTestKit(getSystem());
110
111         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
112
113         kit.waitUntilLeader(shard);
114
115         YangInstanceIdentifier entityId = ENTITY_ID1;
116         Entity entity = new Entity(ENTITY_TYPE, entityId);
117
118         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
119         kit.expectMsgClass(SuccessReply.class);
120
121         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
123     }
124
125     @Test
126     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
127         ShardTestKit kit = new ShardTestKit(getSystem());
128
129         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
130
131         String peerId = newShardId("follower").toString();
132         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
133                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
134
135         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
136                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
137                 withDispatcher(Dispatchers.DefaultDispatcherId()));
138
139         YangInstanceIdentifier entityId = ENTITY_ID1;
140         Entity entity = new Entity(ENTITY_TYPE, entityId);
141
142         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
143         kit.expectMsgClass(SuccessReply.class);
144
145         // Now grant the vote so the shard becomes the leader. This should retry the commit.
146         peer.underlyingActor().grantVote = true;
147
148         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
149         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
150     }
151
152     @Test
153     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
154         ShardTestKit kit = new ShardTestKit(getSystem());
155
156         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
157                 shardTransactionCommitTimeoutInSeconds(1);
158
159         String peerId = newShardId("follower").toString();
160         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
161                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
162
163         MockFollower follower = peer.underlyingActor();
164
165         // Drop AppendEntries so consensus isn't reached.
166         follower.dropAppendEntries = true;
167
168         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
169                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
170                 withDispatcher(Dispatchers.DefaultDispatcherId()));
171
172         kit.waitUntilLeader(shard);
173
174         YangInstanceIdentifier entityId = ENTITY_ID1;
175         Entity entity = new Entity(ENTITY_TYPE, entityId);
176
177         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
178         kit.expectMsgClass(SuccessReply.class);
179
180         // Wait enough time for the commit to timeout.
181         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
182
183         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
184         // write being applied to the state.
185         follower.dropAppendEntries = false;
186
187         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
188         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
189     }
190
191     @Test
192     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
193         ShardTestKit kit = new ShardTestKit(getSystem());
194
195         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
196                 shardIsolatedLeaderCheckIntervalInMillis(50);
197
198         String peerId = newShardId("follower").toString();
199         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
200                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
201
202         MockFollower follower = peer.underlyingActor();
203
204         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
205                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
206                 withDispatcher(Dispatchers.DefaultDispatcherId()));
207
208         kit.waitUntilLeader(shard);
209
210         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
211         follower.dropAppendEntries = true;
212         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
213
214         YangInstanceIdentifier entityId = ENTITY_ID1;
215         Entity entity = new Entity(ENTITY_TYPE, entityId);
216
217         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
218         kit.expectMsgClass(SuccessReply.class);
219
220         // Resume AppendEntries - the candidate write should now be committed.
221         follower.dropAppendEntries = false;
222         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
223         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
224     }
225
226     @Test
227     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
228         ShardTestKit kit = new ShardTestKit(getSystem());
229
230         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
231                 shardBatchedModificationCount(5);
232
233         String peerId = newShardId("leader").toString();
234         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
235                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
236
237         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
238                 TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME),
239                         ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build(),
240                         dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
241
242         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
243                 DataStoreVersions.CURRENT_VERSION), peer);
244
245         shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
246         kit.expectMsgClass(SuccessReply.class);
247
248         MockLeader leader = peer.underlyingActor();
249         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
250                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
251         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
252                 LOCAL_MEMBER_NAME);
253
254         // Test with initial commit timeout and subsequent retry.
255
256         leader.modificationsReceived = new CountDownLatch(1);
257         leader.sendReply = false;
258
259         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
260
261         shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
262         kit.expectMsgClass(SuccessReply.class);
263
264         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
265                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
266         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
267                 LOCAL_MEMBER_NAME);
268
269         // Send a bunch of registration messages quickly and verify.
270
271         int max = 100;
272         leader.delay = 4;
273         leader.modificationsReceived = new CountDownLatch(max);
274         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
275         for(int i = 1; i <= max; i++) {
276             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
277             entityIds.add(id);
278             shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, id)), kit.getRef());
279         }
280
281         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
282                 leader.modificationsReceived, 10, TimeUnit.SECONDS));
283
284         // Sleep a little to ensure no additional BatchedModifications are received.
285
286         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
287
288         List<Modification> receivedMods = leader.getAndClearReceivedModifications();
289         for(int i = 0; i < max; i++) {
290             verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
291         }
292
293         assertEquals("# modifications received", max, receivedMods.size());
294     }
295
296     @Test
297     public void testOnUnregisterCandidateLocal() throws Exception {
298         ShardTestKit kit = new ShardTestKit(getSystem());
299         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
300         kit.waitUntilLeader(shard);
301
302         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
303
304         // Register
305
306         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
307         kit.expectMsgClass(SuccessReply.class);
308
309         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
310         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
311
312         // Unregister
313
314         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
315         kit.expectMsgClass(SuccessReply.class);
316
317         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
318
319         // Register again
320
321         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
322         kit.expectMsgClass(SuccessReply.class);
323
324         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
325         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
326     }
327
328     @Test
329     public void testOwnershipChanges() throws Exception {
330         ShardTestKit kit = new ShardTestKit(getSystem());
331         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
332         kit.waitUntilLeader(shard);
333
334         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
335         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
336
337         // Add a remote candidate
338
339         String remoteMemberName1 = "remoteMember1";
340         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
341
342         // Register local
343
344         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
345         kit.expectMsgClass(SuccessReply.class);
346
347         // Verify the remote candidate becomes owner
348
349         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
350         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
351         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
352
353         // Add another remote candidate and verify ownership doesn't change
354
355         String remoteMemberName2 = "remoteMember2";
356         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
357
358         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
359         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
360         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
361
362         // Remove the second remote candidate and verify ownership doesn't change
363
364         deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
365
366         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
367         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
368         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
369
370         // Remove the first remote candidate and verify the local candidate becomes owner
371
372         deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
373
374         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
375         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
376
377         // Add the second remote candidate back and verify ownership doesn't change
378
379         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
380
381         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
382         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
383         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
384
385         // Unregister the local candidate and verify the second remote candidate becomes owner
386
387         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
388         kit.expectMsgClass(SuccessReply.class);
389
390         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
391         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
392     }
393
394     @Test
395     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
396         ShardTestKit kit = new ShardTestKit(getSystem());
397
398         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000);
399
400         String peerMemberName1 = "peerMember1";
401         String peerMemberName2 = "peerMember2";
402
403         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
404         ShardIdentifier peerId1 = newShardId(peerMemberName1);
405         ShardIdentifier peerId2 = newShardId(peerMemberName2);
406
407         TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
408                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
409                         peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
410
411         TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
412                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
413                         peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
414
415         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
416                 ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
417                         put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).
418                 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
419         leader.tell(new ElectionTimeout(), leader);
420
421         kit.waitUntilLeader(leader);
422
423         // Send PeerDown and PeerUp with no entities
424
425         leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
426         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
427
428         // Add candidates for entity1 with the local leader as the owner
429
430         leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
431         kit.expectMsgClass(SuccessReply.class);
432         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
433
434         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
435         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
436
437         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
438         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
439         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
440
441         // Add candidates for entity2 with peerMember2 as the owner
442
443         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
444         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
445
446         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
447         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
448         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
449
450         // Add candidates for entity3 with peerMember2 as the owner.
451
452         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
453         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
454
455         leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
456         kit.expectMsgClass(SuccessReply.class);
457         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
458
459         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit);
460         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
461         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
462
463         // Add only candidate peerMember2 for entity4.
464
465         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit);
466         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
467         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
468
469         // Add only candidate peerMember1 for entity5.
470
471         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit);
472         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
473         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
474
475         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
476         // owner selected
477
478         kit.watch(peer2);
479         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
480         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
481         kit.unwatch(peer2);
482
483         leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
484         // Send PeerDown again - should be noop
485         leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
486         peer1.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
487
488         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear
489         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
490         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
491         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
492
493         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
494         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
495         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
496         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
497
498         // Reinstate peerMember2 - no owners should change
499
500         peer2 = actorFactory.createTestActor(newShardProps(peerId2,
501                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
502                         peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
503         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
504         // Send PeerUp again - should be noop
505         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
506         peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
507
508         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
509         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
510         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
511         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
512
513         // Add back candidate peerMember2 for entities 1, 2, & 3.
514
515         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
516         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
517         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
518         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
519         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
520         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
521         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
522         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
523         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
524
525         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
526
527         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
528         leader.tell(new PeerDown(peerMemberName1, peerId1.toString()), ActorRef.noSender());
529
530         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
531
532         // Verify the reinstated peerMember2 is fully synced.
533
534         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
535         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
536         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
537         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
538
539         // Reinstate peerMember1 and verify no owner changes
540
541         peer1 = actorFactory.createTestActor(newShardProps(peerId1,
542                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
543                         peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
544         leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
545
546         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
547         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
548         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
549         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
550
551         // Verify the reinstated peerMember1 is fully synced.
552
553         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
554         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
555         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
556         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
557
558         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
559         // the entities (1 and 3) previously owned by the local leader member.
560
561         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
562         peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
563         peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
564
565         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
566         peer2.tell(new PeerDown(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
567         peer2.tell(new ElectionTimeout(), peer2);
568
569         kit.waitUntilLeader(peer2);
570
571         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
572         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
573         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
574         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
575     }
576
577     @Test
578     public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
579         ShardTestKit kit = new ShardTestKit(getSystem());
580
581         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
582         ShardIdentifier leaderId = newShardId("leader");
583         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
584
585         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
586                 TestEntityOwnershipShard.class, localId,
587                 ImmutableMap.<String, String>builder().put(leaderId.toString(), "".toString()).build(),
588                 dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
589
590         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
591                 ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
592                     LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
593         leader.tell(new ElectionTimeout(), leader);
594
595         kit.waitUntilLeader(leader);
596
597         shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
598
599         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
600         EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
601
602         shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
603         kit.expectMsgClass(SuccessReply.class);
604
605         // Register local candidate
606
607         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
608         kit.expectMsgClass(SuccessReply.class);
609         verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
610         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
611         reset(listener);
612
613         // Simulate a replicated commit from the leader to remove the local candidate that would occur after a
614         // network partition is healed.
615
616         leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
617
618         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false));
619
620         // Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
621
622         verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
623         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
624
625         // Unregister the local candidate and verify it's removed and no re-added.
626
627         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
628         kit.expectMsgClass(SuccessReply.class);
629
630         verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
631         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
632         verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
633     }
634
635     @Test
636     public void testListenerRegistration() throws Exception {
637         ShardTestKit kit = new ShardTestKit(getSystem());
638         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
639         kit.waitUntilLeader(shard);
640         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
641
642         String otherEntityType = "otherEntityType";
643         Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1);
644         Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2);
645         Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
646         Entity entity4 = new Entity(otherEntityType, ENTITY_ID3);
647         EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
648
649         // Register listener
650
651         shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
652         kit.expectMsgClass(SuccessReply.class);
653
654         // Register a couple candidates for the desired entity type and verify listener is notified.
655
656         shard.tell(new RegisterCandidateLocal(entity1), kit.getRef());
657         kit.expectMsgClass(SuccessReply.class);
658
659         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
660
661         shard.tell(new RegisterCandidateLocal(entity2), kit.getRef());
662         kit.expectMsgClass(SuccessReply.class);
663
664         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
665         reset(listener);
666
667         // Register another candidate for another entity type and verify listener is not notified.
668
669         shard.tell(new RegisterCandidateLocal(entity4), kit.getRef());
670         kit.expectMsgClass(SuccessReply.class);
671
672         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
673         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
674
675         // Register remote candidate for entity1
676
677         String remoteMemberName = "remoteMember";
678         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName),
679                 shardDataTree);
680         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName);
681
682         // Unregister the local candidate for entity1 and verify listener is notified
683
684         shard.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
685         kit.expectMsgClass(SuccessReply.class);
686
687         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
688         reset(listener);
689
690         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
691
692         shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
693         kit.expectMsgClass(SuccessReply.class);
694
695         shard.tell(new RegisterCandidateLocal(entity3), kit.getRef());
696         kit.expectMsgClass(SuccessReply.class);
697
698         verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
699         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
700         verify(listener, never()).ownershipChanged(any(EntityOwnershipChange.class));
701
702         // Re-register the listener and verify it gets notified of currently owned entities
703
704         reset(listener);
705
706         shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
707         kit.expectMsgClass(SuccessReply.class);
708
709         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
710                 ownershipChange(entity3, false, true, true)));
711         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
712         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
713         verify(listener, never()).ownershipChanged(ownershipChange(entity1));
714     }
715
716     private static void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
717             JavaTestKit sender) {
718         BatchedModifications modifications = newBatchedModifications();
719         modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
720
721         shard.tell(modifications, sender.getRef());
722         sender.expectMsgClass(CommitTransactionReply.class);
723     }
724
725     private static BatchedModifications newBatchedModifications() {
726         BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
727         modifications.setDoCommitOnReady(true);
728         modifications.setReady(true);
729         modifications.setTotalMessagesSent(1);
730         return modifications;
731     }
732
733     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
734             YangInstanceIdentifier entityId, String candidateName) {
735         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
736                 new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
737                     @Override
738                     public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
739                         try {
740                             return AbstractShardTest.readStore(shard, path);
741                         } catch(Exception e) {
742                             throw new AssertionError("Failed to read " + path, e);
743                         }
744                 }
745         });
746     }
747
748     private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
749             YangInstanceIdentifier entityId, String candidateName) {
750         verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
751             @Override
752             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
753                 try {
754                     return AbstractShardTest.readStore(shard, path);
755                 } catch(Exception e) {
756                     throw new AssertionError("Failed to read " + path, e);
757                 }
758             }
759         });
760     }
761
762     private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
763             YangInstanceIdentifier entityId, String candidateName) {
764         verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
765             @Override
766             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
767                 try {
768                     return AbstractShardTest.readStore(shard, path);
769                 } catch(Exception e) {
770                     throw new AssertionError("Failed to read " + path, e);
771                 }
772             }
773         }, false);
774     }
775
776     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
777             YangInstanceIdentifier entityId, String candidateName) throws Exception {
778         assertEquals("BatchedModifications size", 1, mods.size());
779         verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
780     }
781
782     private void verifyBatchedEntityCandidate(Modification mod, String entityType,
783             YangInstanceIdentifier entityId, String candidateName) throws Exception {
784         assertEquals("Modification type", MergeModification.class, mod.getClass());
785         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
786                 entityId, candidateName, true);
787     }
788
789     private static void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType,
790             YangInstanceIdentifier entityId, String localMemberName) {
791         verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
792             @Override
793             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
794                 try {
795                     return AbstractShardTest.readStore(shard, path);
796                 } catch(Exception e) {
797                     return null;
798                 }
799             }
800         });
801     }
802
803     private Props newShardProps() {
804         return newShardProps(Collections.<String,String>emptyMap());
805     }
806
807     private Props newShardProps(EntityOwnerSelectionStrategyConfig strategyConfig) {
808         return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(),
809                 LOCAL_MEMBER_NAME, strategyConfig);
810     }
811
812     private Props newShardProps(EntityOwnerSelectionStrategyConfig strategyConfig, Map<String, String> peers) {
813         return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, strategyConfig);
814     }
815
816
817     private Props newShardProps(Map<String,String> peers) {
818         return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build());
819     }
820
821     private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
822                                 EntityOwnerSelectionStrategyConfig config) {
823         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).
824                 datastoreContext(dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).
825                 localMemberName(memberName).ownerSelectionStrategyConfig(config).props().withDispatcher(Dispatchers.DefaultDispatcherId());
826     }
827
828     private static ShardIdentifier newShardId(String memberName) {
829         return ShardIdentifier.builder().memberName(memberName).shardName("entity-ownership").
830                 type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
831     }
832
833     public static class TestEntityOwnershipShard extends EntityOwnershipShard {
834
835         TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
836                 DatastoreContext datastoreContext) {
837             super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext).
838                     schemaContext(SCHEMA_CONTEXT).localMemberName(LOCAL_MEMBER_NAME));
839         }
840
841         @Override
842         public void onReceiveCommand(Object message) throws Exception {
843             if(!(message instanceof ElectionTimeout)) {
844                 super.onReceiveCommand(message);
845             }
846         }
847
848
849     }
850
851     public static class MockFollower extends UntypedActor {
852         volatile boolean grantVote;
853         volatile boolean dropAppendEntries;
854         private final String myId;
855
856         public MockFollower(String myId) {
857             this(myId, true);
858         }
859
860         public MockFollower(String myId, boolean grantVote) {
861             this.myId = myId;
862             this.grantVote = grantVote;
863         }
864
865         @Override
866         public void onReceive(Object message) {
867             if(message instanceof RequestVote) {
868                 if(grantVote) {
869                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
870                 }
871             } else if(message instanceof AppendEntries) {
872                 if(!dropAppendEntries) {
873                     AppendEntries req = (AppendEntries) message;
874                     long lastIndex = req.getLeaderCommit();
875                     if (req.getEntries().size() > 0) {
876                         for(ReplicatedLogEntry entry : req.getEntries()) {
877                             lastIndex = entry.getIndex();
878                         }
879                     }
880
881                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
882                             DataStoreVersions.CURRENT_VERSION), getSelf());
883                 }
884             }
885         }
886     }
887
888
889     @Test
890     public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
891         ShardTestKit kit = new ShardTestKit(getSystem());
892         EntityOwnerSelectionStrategyConfig.Builder builder
893                 = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
894
895         String peerId = newShardId("follower").toString();
896         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
897                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
898
899         peer.underlyingActor().grantVote = true;
900
901         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
902                 ImmutableMap.of(peerId.toString(), peer.path().toString())));
903         kit.waitUntilLeader(shard);
904
905         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
906         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
907
908         // Add a remote candidate
909
910         String remoteMemberName1 = "follower";
911         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
912
913         // Register local
914
915         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
916         kit.expectMsgClass(SuccessReply.class);
917
918         // Verify the local candidate becomes owner
919
920         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
921         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
922         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
923     }
924
925     @Test
926     public void testDelayedEntityOwnerSelection() throws Exception {
927         ShardTestKit kit = new ShardTestKit(getSystem());
928         EntityOwnerSelectionStrategyConfig.Builder builder
929                 = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
930
931         String follower1Id = newShardId("follower1").toString();
932         TestActorRef<MockFollower> follower1 = actorFactory.createTestActor(Props.create(MockFollower.class, follower1Id, false).
933                 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1Id);
934
935         follower1.underlyingActor().grantVote = true;
936
937         String follower2Id = newShardId("follower").toString();
938         TestActorRef<MockFollower> follower2 = actorFactory.createTestActor(Props.create(MockFollower.class, follower2Id, false).
939                 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2Id);
940
941         follower2.underlyingActor().grantVote = true;
942
943
944         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
945                 ImmutableMap.of(follower1Id.toString(), follower2.path().toString(), follower2Id.toString(), follower2.path().toString())));
946         kit.waitUntilLeader(shard);
947
948         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
949         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
950
951         // Add a remote candidate
952
953         String remoteMemberName1 = "follower";
954         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
955
956         // Register local
957
958         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
959         kit.expectMsgClass(SuccessReply.class);
960
961         // Verify the local candidate becomes owner
962
963         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
964         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
965         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
966     }
967
968     public static class MockLeader extends UntypedActor {
969         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
970         List<Modification> receivedModifications = new ArrayList<>();
971         volatile boolean sendReply = true;
972         volatile long delay;
973
974         @Override
975         public void onReceive(Object message) {
976             if(message instanceof BatchedModifications) {
977                 if(delay > 0) {
978                     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
979                 }
980
981                 if(sendReply) {
982                     BatchedModifications mods = (BatchedModifications) message;
983                     synchronized (receivedModifications) {
984                         for(int i = 0; i < mods.getModifications().size(); i++) {
985                             receivedModifications.add(mods.getModifications().get(i));
986                             modificationsReceived.countDown();
987                         }
988                     }
989
990                     getSender().tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).
991                             toSerializable(), getSelf());
992                 } else {
993                     sendReply = true;
994                 }
995             }
996         }
997
998         List<Modification> getAndClearReceivedModifications() {
999             synchronized (receivedModifications) {
1000                 List<Modification> ret = new ArrayList<>(receivedModifications);
1001                 receivedModifications.clear();
1002                 return ret;
1003             }
1004         }
1005     }
1006 }