ec2006d683e47076f30f00f8831dfdff38b4c213
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyBoolean;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
21 import akka.actor.ActorRef;
22 import akka.actor.Props;
23 import akka.actor.UntypedActor;
24 import akka.dispatch.Dispatchers;
25 import akka.testkit.TestActorRef;
26 import com.google.common.base.Function;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.CountDownLatch;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import org.junit.After;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
39 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
40 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
41 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
42 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
43 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
44 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
45 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
46 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
47 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
48 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
49 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
50 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
51 import org.opendaylight.controller.cluster.datastore.modification.Modification;
52 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
53 import org.opendaylight.controller.cluster.raft.TestActorFactory;
54 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
55 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
56 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
57 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
58 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
59 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
60 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
61 import org.opendaylight.yangtools.yang.common.QName;
62 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
63 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
64 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
65
66 /**
67  * Unit tests for EntityOwnershipShard.
68  *
69  * @author Thomas Pantelis
70  */
71 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
72     private static final String ENTITY_TYPE = "test type";
73     private static final YangInstanceIdentifier ENTITY_ID1 =
74             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
75     private static final YangInstanceIdentifier ENTITY_ID2 =
76             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
77     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
78     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
79     private static final String LOCAL_MEMBER_NAME = "member-1";
80
81     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
82             .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
83
84     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
85     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
86
87     @After
88     public void tearDown() {
89         actorFactory.close();
90     }
91
92     @Test
93     public void testOnRegisterCandidateLocal() throws Exception {
94         ShardTestKit kit = new ShardTestKit(getSystem());
95
96         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
97
98         kit.waitUntilLeader(shard);
99
100         YangInstanceIdentifier entityId = ENTITY_ID1;
101         Entity entity = new Entity(ENTITY_TYPE, entityId);
102         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
103
104         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
105         kit.expectMsgClass(SuccessReply.class);
106
107         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
108         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
109         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
110     }
111
112     @Test
113     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
114         ShardTestKit kit = new ShardTestKit(getSystem());
115
116         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
117
118         String peerId = actorFactory.generateActorId("follower");
119         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
120                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
121
122         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
123                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
124                 withDispatcher(Dispatchers.DefaultDispatcherId()));
125
126         YangInstanceIdentifier entityId = ENTITY_ID1;
127         Entity entity = new Entity(ENTITY_TYPE, entityId);
128         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
129
130         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
131         kit.expectMsgClass(SuccessReply.class);
132
133         // Now grant the vote so the shard becomes the leader. This should retry the commit.
134         peer.underlyingActor().grantVote = true;
135
136         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
137
138         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
139
140         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
141     }
142
143     @Test
144     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
145         ShardTestKit kit = new ShardTestKit(getSystem());
146
147         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
148                 shardTransactionCommitTimeoutInSeconds(1);
149
150         String peerId = actorFactory.generateActorId("follower");
151         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
152                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
153
154         MockFollower follower = peer.underlyingActor();
155         follower.grantVote = true;
156
157         // Drop AppendEntries so consensus isn't reached.
158         follower.dropAppendEntries = true;
159
160         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
161                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
162                 withDispatcher(Dispatchers.DefaultDispatcherId()));
163
164         kit.waitUntilLeader(shard);
165
166         YangInstanceIdentifier entityId = ENTITY_ID1;
167         Entity entity = new Entity(ENTITY_TYPE, entityId);
168         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
169
170         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
171         kit.expectMsgClass(SuccessReply.class);
172
173         // Wait enough time for the commit to timeout.
174         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
175
176         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
177         // write being applied to the state.
178         follower.dropAppendEntries = false;
179
180         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
181
182         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
183
184         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
185     }
186
187     @Test
188     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
189         ShardTestKit kit = new ShardTestKit(getSystem());
190
191         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
192                 shardIsolatedLeaderCheckIntervalInMillis(50);
193
194         String peerId = actorFactory.generateActorId("follower");
195         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
196                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
197
198         MockFollower follower = peer.underlyingActor();
199         follower.grantVote = true;
200
201         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
202                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
203                 withDispatcher(Dispatchers.DefaultDispatcherId()));
204
205         kit.waitUntilLeader(shard);
206
207         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
208         follower.dropAppendEntries = true;
209         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
210
211         YangInstanceIdentifier entityId = ENTITY_ID1;
212         Entity entity = new Entity(ENTITY_TYPE, entityId);
213         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
214
215         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
216         kit.expectMsgClass(SuccessReply.class);
217
218         // Resume AppendEntries - the candidate write should now be committed.
219         follower.dropAppendEntries = false;
220         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
221
222         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
223
224         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
225     }
226
227     @Test
228     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
229         ShardTestKit kit = new ShardTestKit(getSystem());
230
231         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
232                 shardBatchedModificationCount(5);
233
234         String peerId = actorFactory.generateActorId("leader");
235         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
236                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
237
238         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
239                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
240                 withDispatcher(Dispatchers.DefaultDispatcherId()));
241
242         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
243                 DataStoreVersions.CURRENT_VERSION), peer);
244
245         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
246
247         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
248         kit.expectMsgClass(SuccessReply.class);
249
250         MockLeader leader = peer.underlyingActor();
251         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
252                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
253         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
254                 LOCAL_MEMBER_NAME);
255
256         shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
257
258         // Test with initial commit timeout and subsequent retry.
259
260         leader.modificationsReceived = new CountDownLatch(1);
261         leader.sendReply = false;
262
263         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
264
265         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
266         kit.expectMsgClass(SuccessReply.class);
267
268         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
269                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
270         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
271                 LOCAL_MEMBER_NAME);
272
273         // Send a bunch of registration messages quickly and verify.
274
275         int max = 100;
276         leader.delay = 4;
277         leader.modificationsReceived = new CountDownLatch(max);
278         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
279         for(int i = 1; i <= max; i++) {
280             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
281             entityIds.add(id);
282             shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
283         }
284
285         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
286                 leader.modificationsReceived, 10, TimeUnit.SECONDS));
287
288         // Sleep a little to ensure no additional BatchedModifications are received.
289
290         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
291
292         List<Modification> receivedMods = leader.getAndClearReceivedModifications();
293         for(int i = 0; i < max; i++) {
294             verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
295         }
296
297         assertEquals("# modifications received", max, receivedMods.size());
298     }
299
300     @Test
301     public void testOnUnregisterCandidateLocal() throws Exception {
302         ShardTestKit kit = new ShardTestKit(getSystem());
303         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
304         kit.waitUntilLeader(shard);
305
306         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
307         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
308
309         // Register
310
311         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
312         kit.expectMsgClass(SuccessReply.class);
313
314         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
315         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
316         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
317
318         // Unregister
319
320         reset(candidate);
321
322         shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
323         kit.expectMsgClass(SuccessReply.class);
324
325         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
326         verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
327
328         // Register again
329
330         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
331         kit.expectMsgClass(SuccessReply.class);
332
333         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
334         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
335         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
336     }
337
338     @Test
339     public void testOwnershipChanges() throws Exception {
340         ShardTestKit kit = new ShardTestKit(getSystem());
341         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
342         kit.waitUntilLeader(shard);
343
344         Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
345         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
346         ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
347
348         // Add a remote candidate
349
350         String remoteMemberName1 = "remoteMember1";
351         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
352
353         // Register local
354
355         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
356         kit.expectMsgClass(SuccessReply.class);
357
358         // Verify the remote candidate becomes owner
359
360         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
361         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
362         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
363         verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
364
365         // Add another remote candidate and verify ownership doesn't change
366
367         reset(candidate);
368         String remoteMemberName2 = "remoteMember2";
369         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
370
371         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
372         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
373         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
374         verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
375
376         // Remove the second remote candidate and verify ownership doesn't change
377
378         reset(candidate);
379         deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
380
381         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
382         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
383         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
384         verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
385
386         // Remove the first remote candidate and verify the local candidate becomes owner
387
388         reset(candidate);
389         deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
390
391         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
392         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
393         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
394
395         // Add the second remote candidate back and verify ownership doesn't change
396
397         reset(candidate);
398         writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
399
400         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
401         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
402         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
403         verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
404
405         // Unregister the local candidate and verify the second remote candidate becomes owner
406
407         shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
408         kit.expectMsgClass(SuccessReply.class);
409
410         verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
411         verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
412     }
413
414     private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
415             YangInstanceIdentifier entityId, String candidateName) {
416         verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
417                 new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
418                     @Override
419                     public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
420                         try {
421                             return AbstractShardTest.readStore(shard, path);
422                         } catch(Exception e) {
423                             throw new AssertionError("Failed to read " + path, e);
424                         }
425                 }
426         });
427     }
428
429     private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
430             YangInstanceIdentifier entityId, String candidateName) {
431         verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
432             @Override
433             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
434                 try {
435                     return AbstractShardTest.readStore(shard, path);
436                 } catch(Exception e) {
437                     throw new AssertionError("Failed to read " + path, e);
438                 }
439             }
440         });
441     }
442
443     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
444             YangInstanceIdentifier entityId, String candidateName) throws Exception {
445         assertEquals("BatchedModifications size", 1, mods.size());
446         verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
447     }
448
449     private void verifyBatchedEntityCandidate(Modification mod, String entityType,
450             YangInstanceIdentifier entityId, String candidateName) throws Exception {
451         assertEquals("Modification type", MergeModification.class, mod.getClass());
452         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
453                 entityId, candidateName);
454     }
455
456     private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
457             String localMemberName) {
458         verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
459             @Override
460             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
461                 try {
462                     return AbstractShardTest.readStore(shard, path);
463                 } catch(Exception e) {
464                     return null;
465                 }
466             }
467         });
468     }
469
470     private Props newShardProps() {
471         return newShardProps(Collections.<String,String>emptyMap());
472     }
473
474     private Props newShardProps(Map<String,String> peers) {
475         return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
476                 LOCAL_MEMBER_NAME);
477     }
478
479     public static class MockFollower extends UntypedActor {
480         volatile boolean grantVote;
481         volatile boolean dropAppendEntries;
482         private final String myId;
483
484         public MockFollower(String myId) {
485             this.myId = myId;
486         }
487
488         @Override
489         public void onReceive(Object message) {
490             if(message instanceof RequestVote) {
491                 if(grantVote) {
492                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
493                 }
494             } else if(message instanceof AppendEntries) {
495                 if(!dropAppendEntries) {
496                     AppendEntries req = (AppendEntries) message;
497                     long lastIndex = req.getLeaderCommit();
498                     if (req.getEntries().size() > 0) {
499                         for(ReplicatedLogEntry entry : req.getEntries()) {
500                             lastIndex = entry.getIndex();
501                         }
502                     }
503
504                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
505                             DataStoreVersions.CURRENT_VERSION), getSelf());
506                 }
507             }
508         }
509     }
510
511     public static class MockLeader extends UntypedActor {
512         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
513         List<Modification> receivedModifications = new ArrayList<>();
514         volatile boolean sendReply = true;
515         volatile long delay;
516
517         @Override
518         public void onReceive(Object message) {
519             if(message instanceof BatchedModifications) {
520                 if(delay > 0) {
521                     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
522                 }
523
524                 if(sendReply) {
525                     BatchedModifications mods = (BatchedModifications) message;
526                     synchronized (receivedModifications) {
527                         for(int i = 0; i < mods.getModifications().size(); i++) {
528                             receivedModifications.add(mods.getModifications().get(i));
529                             modificationsReceived.countDown();
530                         }
531                     }
532
533                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
534                 } else {
535                     sendReply = true;
536                 }
537             }
538         }
539
540         List<Modification> getAndClearReceivedModifications() {
541             synchronized (receivedModifications) {
542                 List<Modification> ret = new ArrayList<>(receivedModifications);
543                 receivedModifications.clear();
544                 return ret;
545             }
546         }
547     }
548 }