Add Delayed Owner selection base on strategy
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Dispatchers;
27 import akka.testkit.JavaTestKit;
28 import akka.testkit.TestActorRef;
29 import com.google.common.base.Function;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.List;
35 import java.util.Map;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import org.junit.After;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
42 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
52 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
53 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
54 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
59 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
60 import org.opendaylight.controller.cluster.datastore.modification.Modification;
61 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
62 import org.opendaylight.controller.cluster.raft.TestActorFactory;
63 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
66 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
67 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
68 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
69 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
70 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
71 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
72 import org.opendaylight.yangtools.yang.common.QName;
73 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
74 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
75 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
76
77 /**
78  * Unit tests for EntityOwnershipShard.
79  *
80  * @author Thomas Pantelis
81  */
82 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
83     private static final String ENTITY_TYPE = "test type";
84     private static final YangInstanceIdentifier ENTITY_ID1 =
85             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
86     private static final YangInstanceIdentifier ENTITY_ID2 =
87             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
88     private static final YangInstanceIdentifier ENTITY_ID3 =
89             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
90     private static final YangInstanceIdentifier ENTITY_ID4 =
91             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
92     private static final YangInstanceIdentifier ENTITY_ID5 =
93             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
94     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
95     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
96     private static final String LOCAL_MEMBER_NAME = "member-1";
97
98     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
99     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
100
101     @After
102     public void tearDown() {
103         actorFactory.close();
104     }
105
106     @Test
107     public void testOnRegisterCandidateLocal() throws Exception {
108         ShardTestKit kit = new ShardTestKit(getSystem());
109
110         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
111
112         kit.waitUntilLeader(shard);
113
114         YangInstanceIdentifier entityId = ENTITY_ID1;
115         Entity entity = new Entity(ENTITY_TYPE, entityId);
116
117         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
118         kit.expectMsgClass(SuccessReply.class);
119
120         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122     }
123
124     @Test
125     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
126         ShardTestKit kit = new ShardTestKit(getSystem());
127
128         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
129
130         String peerId = newShardId("follower").toString();
131         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
132                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
133
134         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
135                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
136                 withDispatcher(Dispatchers.DefaultDispatcherId()));
137
138         YangInstanceIdentifier entityId = ENTITY_ID1;
139         Entity entity = new Entity(ENTITY_TYPE, entityId);
140
141         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
142         kit.expectMsgClass(SuccessReply.class);
143
144         // Now grant the vote so the shard becomes the leader. This should retry the commit.
145         peer.underlyingActor().grantVote = true;
146
147         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
148         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
149     }
150
151     @Test
152     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
153         ShardTestKit kit = new ShardTestKit(getSystem());
154
155         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
156                 shardTransactionCommitTimeoutInSeconds(1);
157
158         String peerId = newShardId("follower").toString();
159         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
160                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
161
162         MockFollower follower = peer.underlyingActor();
163
164         // Drop AppendEntries so consensus isn't reached.
165         follower.dropAppendEntries = true;
166
167         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
168                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
169                 withDispatcher(Dispatchers.DefaultDispatcherId()));
170
171         kit.waitUntilLeader(shard);
172
173         YangInstanceIdentifier entityId = ENTITY_ID1;
174         Entity entity = new Entity(ENTITY_TYPE, entityId);
175
176         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
177         kit.expectMsgClass(SuccessReply.class);
178
179         // Wait enough time for the commit to timeout.
180         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
181
182         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
183         // write being applied to the state.
184         follower.dropAppendEntries = false;
185
186         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
187         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
188     }
189
190     @Test
191     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
192         ShardTestKit kit = new ShardTestKit(getSystem());
193
194         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
195                 shardIsolatedLeaderCheckIntervalInMillis(50);
196
197         String peerId = newShardId("follower").toString();
198         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
199                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
200
201         MockFollower follower = peer.underlyingActor();
202
203         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
204                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
205                 withDispatcher(Dispatchers.DefaultDispatcherId()));
206
207         kit.waitUntilLeader(shard);
208
209         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
210         follower.dropAppendEntries = true;
211         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
212
213         YangInstanceIdentifier entityId = ENTITY_ID1;
214         Entity entity = new Entity(ENTITY_TYPE, entityId);
215
216         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
217         kit.expectMsgClass(SuccessReply.class);
218
219         // Resume AppendEntries - the candidate write should now be committed.
220         follower.dropAppendEntries = false;
221         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
222         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
223     }
224
225     @Test
226     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
227         ShardTestKit kit = new ShardTestKit(getSystem());
228
229         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
230                 shardBatchedModificationCount(5);
231
232         String peerId = newShardId("leader").toString();
233         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
234                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
235
236         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
237                 TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME),
238                         ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build(),
239                         dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
240
241         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
242                 DataStoreVersions.CURRENT_VERSION), peer);
243
244         shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
245         kit.expectMsgClass(SuccessReply.class);
246
247         MockLeader leader = peer.underlyingActor();
248         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
249                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
250         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
251                 LOCAL_MEMBER_NAME);
252
253         // Test with initial commit timeout and subsequent retry.
254
255         leader.modificationsReceived = new CountDownLatch(1);
256         leader.sendReply = false;
257
258         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
259
260         shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
261         kit.expectMsgClass(SuccessReply.class);
262
263         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
264                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
265         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
266                 LOCAL_MEMBER_NAME);
267
268         // Send a bunch of registration messages quickly and verify.
269
270         int max = 100;
271         leader.delay = 4;
272         leader.modificationsReceived = new CountDownLatch(max);
273         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
274         for(int i = 1; i <= max; i++) {
275             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
276             entityIds.add(id);
277             shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, id)), kit.getRef());
278         }
279
280         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
281                 leader.modificationsReceived, 10, TimeUnit.SECONDS));
282
283         // Sleep a little to ensure no additional BatchedModifications are received.
284
285         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
286
287         List<Modification> receivedMods = leader.getAndClearReceivedModifications();
288         for(int i = 0; i < max; i++) {
289             verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
290         }
291
292         assertEquals("# modifications received", max, receivedMods.size());
293     }
294
295     @Test
296     public void testOnUnregisterCandidateLocal() throws Exception {
297         ShardTestKit kit = new ShardTestKit(getSystem());
298         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
299         kit.waitUntilLeader(shard);
300
301         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
302
303         // Register
304
305         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
306         kit.expectMsgClass(SuccessReply.class);
307
308         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
309         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
310
311         // Unregister
312
313         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
314         kit.expectMsgClass(SuccessReply.class);
315
316         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
317
318         // Register again
319
320         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
321         kit.expectMsgClass(SuccessReply.class);
322
323         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
324         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
325     }
326
327     @Test
328     public void testOwnershipChanges() throws Exception {
329         ShardTestKit kit = new ShardTestKit(getSystem());
330         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
331         kit.waitUntilLeader(shard);
332
333         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
334         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
335
336         // Add a remote candidate
337
338         String remoteMemberName1 = "remoteMember1";
339         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
340
341         // Register local
342
343         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
344         kit.expectMsgClass(SuccessReply.class);
345
346         // Verify the remote candidate becomes owner
347
348         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
349         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
350         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
351
352         // Add another remote candidate and verify ownership doesn't change
353
354         String remoteMemberName2 = "remoteMember2";
355         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
356
357         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
358         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
359         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
360
361         // Remove the second remote candidate and verify ownership doesn't change
362
363         deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
364
365         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
366         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
367         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
368
369         // Remove the first remote candidate and verify the local candidate becomes owner
370
371         deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
372
373         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
374         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
375
376         // Add the second remote candidate back and verify ownership doesn't change
377
378         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
379
380         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
381         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
382         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
383
384         // Unregister the local candidate and verify the second remote candidate becomes owner
385
386         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
387         kit.expectMsgClass(SuccessReply.class);
388
389         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
390         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
391     }
392
393     @Test
394     public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
395         ShardTestKit kit = new ShardTestKit(getSystem());
396
397         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000);
398
399         String peerMemberName1 = "peerMember1";
400         String peerMemberName2 = "peerMember2";
401
402         ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
403         ShardIdentifier peerId1 = newShardId(peerMemberName1);
404         ShardIdentifier peerId2 = newShardId(peerMemberName2);
405
406         TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
407                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
408                         peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
409
410         TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
411                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
412                         peerMemberName2). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
413
414         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
415                 ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
416                         put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME).
417                 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
418         leader.tell(new ElectionTimeout(), leader);
419
420         kit.waitUntilLeader(leader);
421
422         // Send PeerDown and PeerUp with no entities
423
424         leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
425         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
426
427         // Add candidates for entity1 with the local leader as the owner
428
429         leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
430         kit.expectMsgClass(SuccessReply.class);
431         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
432
433         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
434         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
435
436         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
437         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
438         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
439
440         // Add candidates for entity2 with peerMember2 as the owner
441
442         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
443         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
444
445         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
446         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
447         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
448
449         // Add candidates for entity3 with peerMember2 as the owner.
450
451         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
452         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
453
454         leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
455         kit.expectMsgClass(SuccessReply.class);
456         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
457
458         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit);
459         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
460         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
461
462         // Add only candidate peerMember2 for entity4.
463
464         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit);
465         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
466         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
467
468         // Add only candidate peerMember1 for entity5.
469
470         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit);
471         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
472         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
473
474         // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
475         // owner selected
476
477         kit.watch(peer2);
478         peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
479         kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
480         kit.unwatch(peer2);
481
482         leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
483         // Send PeerDown again - should be noop
484         leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
485         peer1.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
486
487         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear
488         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
489         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
490         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
491
492         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
493         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
494         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
495         verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
496
497         // Reinstate peerMember2 - no owners should change
498
499         peer2 = actorFactory.createTestActor(newShardProps(peerId2,
500                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
501                         peerMemberName2). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
502         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
503         // Send PeerUp again - should be noop
504         leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
505         peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
506
507         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
508         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
509         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
510         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
511
512         // Add back candidate peerMember2 for entities 1, 2, & 3.
513
514         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
515         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
516         commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
517         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
518         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
519         verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
520         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
521         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
522         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
523
524         // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
525
526         peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
527         leader.tell(new PeerDown(peerMemberName1, peerId1.toString()), ActorRef.noSender());
528
529         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
530
531         // Verify the reinstated peerMember2 is fully synced.
532
533         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
534         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
535         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
536         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
537
538         // Reinstate peerMember1 and verify no owner changes
539
540         peer1 = actorFactory.createTestActor(newShardProps(peerId1,
541                 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
542                         peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
543         leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
544
545         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
546         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
547         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
548         verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
549
550         // Verify the reinstated peerMember1 is fully synced.
551
552         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
553         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
554         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
555         verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
556
557         // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
558         // the entities (1 and 3) previously owned by the local leader member.
559
560         peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
561         peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
562         peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
563
564         leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
565         peer2.tell(new PeerDown(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
566         peer2.tell(new ElectionTimeout(), peer2);
567
568         kit.waitUntilLeader(peer2);
569
570         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
571         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
572         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
573         verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
574     }
575
576     @Test
577     public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
578         ShardTestKit kit = new ShardTestKit(getSystem());
579
580         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
581         ShardIdentifier leaderId = newShardId("leader");
582         ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
583
584         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
585                 TestEntityOwnershipShard.class, localId,
586                 ImmutableMap.<String, String>builder().put(leaderId.toString(), "".toString()).build(),
587                 dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
588
589         TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
590                 ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
591                     LOCAL_MEMBER_NAME).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
592         leader.tell(new ElectionTimeout(), leader);
593
594         kit.waitUntilLeader(leader);
595
596         shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
597
598         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
599         EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
600
601         shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
602         kit.expectMsgClass(SuccessReply.class);
603
604         // Register local candidate
605
606         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
607         kit.expectMsgClass(SuccessReply.class);
608         verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
609         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
610         reset(listener);
611
612         // Simulate a replicated commit from the leader to remove the local candidate that would occur after a
613         // network partition is healed.
614
615         leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
616
617         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false));
618
619         // Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
620
621         verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
622         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
623
624         // Unregister the local candidate and verify it's removed and no re-added.
625
626         shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
627         kit.expectMsgClass(SuccessReply.class);
628
629         verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
630         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
631         verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
632     }
633
634     @Test
635     public void testListenerRegistration() throws Exception {
636         ShardTestKit kit = new ShardTestKit(getSystem());
637         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
638         kit.waitUntilLeader(shard);
639         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
640
641         String otherEntityType = "otherEntityType";
642         Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1);
643         Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2);
644         Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
645         Entity entity4 = new Entity(otherEntityType, ENTITY_ID3);
646         EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
647
648         // Register listener
649
650         shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
651         kit.expectMsgClass(SuccessReply.class);
652
653         // Register a couple candidates for the desired entity type and verify listener is notified.
654
655         shard.tell(new RegisterCandidateLocal(entity1), kit.getRef());
656         kit.expectMsgClass(SuccessReply.class);
657
658         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
659
660         shard.tell(new RegisterCandidateLocal(entity2), kit.getRef());
661         kit.expectMsgClass(SuccessReply.class);
662
663         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
664         reset(listener);
665
666         // Register another candidate for another entity type and verify listener is not notified.
667
668         shard.tell(new RegisterCandidateLocal(entity4), kit.getRef());
669         kit.expectMsgClass(SuccessReply.class);
670
671         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
672         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
673
674         // Register remote candidate for entity1
675
676         String remoteMemberName = "remoteMember";
677         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName),
678                 shardDataTree);
679         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName);
680
681         // Unregister the local candidate for entity1 and verify listener is notified
682
683         shard.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
684         kit.expectMsgClass(SuccessReply.class);
685
686         verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
687         reset(listener);
688
689         // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
690
691         shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
692         kit.expectMsgClass(SuccessReply.class);
693
694         shard.tell(new RegisterCandidateLocal(entity3), kit.getRef());
695         kit.expectMsgClass(SuccessReply.class);
696
697         verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
698         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
699         verify(listener, never()).ownershipChanged(any(EntityOwnershipChange.class));
700
701         // Re-register the listener and verify it gets notified of currently owned entities
702
703         reset(listener);
704
705         shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
706         kit.expectMsgClass(SuccessReply.class);
707
708         verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
709                 ownershipChange(entity3, false, true, true)));
710         Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
711         verify(listener, never()).ownershipChanged(ownershipChange(entity4));
712         verify(listener, never()).ownershipChanged(ownershipChange(entity1));
713     }
714
715     private static void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
716             JavaTestKit sender) {
717         BatchedModifications modifications = newBatchedModifications();
718         modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
719
720         shard.tell(modifications, sender.getRef());
721         sender.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
722     }
723
724     private static BatchedModifications newBatchedModifications() {
725         BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
726         modifications.setDoCommitOnReady(true);
727         modifications.setReady(true);
728         modifications.setTotalMessagesSent(1);
729         return modifications;
730     }
731
732     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
733             YangInstanceIdentifier entityId, String candidateName) {
734         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
735                 new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
736                     @Override
737                     public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
738                         try {
739                             return AbstractShardTest.readStore(shard, path);
740                         } catch(Exception e) {
741                             throw new AssertionError("Failed to read " + path, e);
742                         }
743                 }
744         });
745     }
746
747     private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
748             YangInstanceIdentifier entityId, String candidateName) {
749         verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
750             @Override
751             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
752                 try {
753                     return AbstractShardTest.readStore(shard, path);
754                 } catch(Exception e) {
755                     throw new AssertionError("Failed to read " + path, e);
756                 }
757             }
758         });
759     }
760
761     private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
762             YangInstanceIdentifier entityId, String candidateName) {
763         verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
764             @Override
765             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
766                 try {
767                     return AbstractShardTest.readStore(shard, path);
768                 } catch(Exception e) {
769                     throw new AssertionError("Failed to read " + path, e);
770                 }
771             }
772         }, false);
773     }
774
775     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
776             YangInstanceIdentifier entityId, String candidateName) throws Exception {
777         assertEquals("BatchedModifications size", 1, mods.size());
778         verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
779     }
780
781     private void verifyBatchedEntityCandidate(Modification mod, String entityType,
782             YangInstanceIdentifier entityId, String candidateName) throws Exception {
783         assertEquals("Modification type", MergeModification.class, mod.getClass());
784         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
785                 entityId, candidateName, true);
786     }
787
788     private static void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType,
789             YangInstanceIdentifier entityId, String localMemberName) {
790         verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
791             @Override
792             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
793                 try {
794                     return AbstractShardTest.readStore(shard, path);
795                 } catch(Exception e) {
796                     return null;
797                 }
798             }
799         });
800     }
801
802     private Props newShardProps() {
803         return newShardProps(Collections.<String,String>emptyMap());
804     }
805
806     private Props newShardProps(Map<String,String> peers) {
807         return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME);
808     }
809
810     private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
811         return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).
812                 datastoreContext(dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).
813                 localMemberName(memberName).props().withDispatcher(Dispatchers.DefaultDispatcherId());
814     }
815
816     private static ShardIdentifier newShardId(String memberName) {
817         return ShardIdentifier.builder().memberName(memberName).shardName("entity-ownership").
818                 type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
819     }
820
821     public static class TestEntityOwnershipShard extends EntityOwnershipShard {
822
823         TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
824                 DatastoreContext datastoreContext) {
825             super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext).
826                     schemaContext(SCHEMA_CONTEXT).localMemberName(LOCAL_MEMBER_NAME));
827         }
828
829         @Override
830         public void onReceiveCommand(Object message) throws Exception {
831             if(!(message instanceof ElectionTimeout)) {
832                 super.onReceiveCommand(message);
833             }
834         }
835
836
837     }
838
839     public static class MockFollower extends UntypedActor {
840         volatile boolean grantVote;
841         volatile boolean dropAppendEntries;
842         private final String myId;
843
844         public MockFollower(String myId) {
845             this(myId, true);
846         }
847
848         public MockFollower(String myId, boolean grantVote) {
849             this.myId = myId;
850             this.grantVote = grantVote;
851         }
852
853         @Override
854         public void onReceive(Object message) {
855             if(message instanceof RequestVote) {
856                 if(grantVote) {
857                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
858                 }
859             } else if(message instanceof AppendEntries) {
860                 if(!dropAppendEntries) {
861                     AppendEntries req = (AppendEntries) message;
862                     long lastIndex = req.getLeaderCommit();
863                     if (req.getEntries().size() > 0) {
864                         for(ReplicatedLogEntry entry : req.getEntries()) {
865                             lastIndex = entry.getIndex();
866                         }
867                     }
868
869                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
870                             DataStoreVersions.CURRENT_VERSION), getSelf());
871                 }
872             }
873         }
874     }
875
876
877     @Test
878     public void testDelayedEntityOwnerSelection() throws Exception {
879         ShardTestKit kit = new ShardTestKit(getSystem());
880         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
881         shard.underlyingActor().addEntityOwnerSelectionStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class);
882         kit.waitUntilLeader(shard);
883
884         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
885         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
886
887         // Add a remote candidate
888
889         String remoteMemberName1 = "remoteMember1";
890         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
891
892
893         // Register local
894
895         shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
896         kit.expectMsgClass(SuccessReply.class);
897
898         // Verify the local candidate becomes owner
899
900         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
901         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
902         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
903     }
904
905     public static class MockLeader extends UntypedActor {
906         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
907         List<Modification> receivedModifications = new ArrayList<>();
908         volatile boolean sendReply = true;
909         volatile long delay;
910
911         @Override
912         public void onReceive(Object message) {
913             if(message instanceof BatchedModifications) {
914                 if(delay > 0) {
915                     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
916                 }
917
918                 if(sendReply) {
919                     BatchedModifications mods = (BatchedModifications) message;
920                     synchronized (receivedModifications) {
921                         for(int i = 0; i < mods.getModifications().size(); i++) {
922                             receivedModifications.add(mods.getModifications().get(i));
923                             modificationsReceived.countDown();
924                         }
925                     }
926
927                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
928                 } else {
929                     sendReply = true;
930                 }
931             }
932         }
933
934         List<Modification> getAndClearReceivedModifications() {
935             synchronized (receivedModifications) {
936                 List<Modification> ret = new ArrayList<>(receivedModifications);
937                 receivedModifications.clear();
938                 return ret;
939             }
940         }
941     }
942 }