2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Matchers.any;
12 import static org.mockito.Matchers.anyBoolean;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.verify;
18 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
21 import akka.actor.ActorRef;
22 import akka.actor.PoisonPill;
23 import akka.actor.Props;
24 import akka.actor.Terminated;
25 import akka.actor.UntypedActor;
26 import akka.dispatch.Dispatchers;
27 import akka.testkit.JavaTestKit;
28 import akka.testkit.TestActorRef;
29 import com.google.common.base.Function;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.List;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import org.junit.After;
40 import org.junit.Test;
41 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
42 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
49 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
50 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
51 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
52 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
53 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
54 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
55 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
56 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
57 import org.opendaylight.controller.cluster.datastore.modification.Modification;
58 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
59 import org.opendaylight.controller.cluster.raft.TestActorFactory;
60 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
61 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
62 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
63 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
65 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
66 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
67 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
68 import org.opendaylight.yangtools.yang.common.QName;
69 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
70 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
71 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
74 * Unit tests for EntityOwnershipShard.
76 * @author Thomas Pantelis
78 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
79 private static final String ENTITY_TYPE = "test type";
80 private static final YangInstanceIdentifier ENTITY_ID1 =
81 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
82 private static final YangInstanceIdentifier ENTITY_ID2 =
83 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
84 private static final YangInstanceIdentifier ENTITY_ID3 =
85 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
86 private static final YangInstanceIdentifier ENTITY_ID4 =
87 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
88 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
89 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
90 private static final String LOCAL_MEMBER_NAME = "member-1";
92 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
93 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
96 public void tearDown() {
101 public void testOnRegisterCandidateLocal() throws Exception {
102 ShardTestKit kit = new ShardTestKit(getSystem());
104 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
106 kit.waitUntilLeader(shard);
108 YangInstanceIdentifier entityId = ENTITY_ID1;
109 Entity entity = new Entity(ENTITY_TYPE, entityId);
110 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
112 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
113 kit.expectMsgClass(SuccessReply.class);
115 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
116 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
117 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
121 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
122 ShardTestKit kit = new ShardTestKit(getSystem());
124 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
126 String peerId = newShardId("follower").toString();
127 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
128 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
130 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
131 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
132 withDispatcher(Dispatchers.DefaultDispatcherId()));
134 YangInstanceIdentifier entityId = ENTITY_ID1;
135 Entity entity = new Entity(ENTITY_TYPE, entityId);
136 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
138 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
139 kit.expectMsgClass(SuccessReply.class);
141 // Now grant the vote so the shard becomes the leader. This should retry the commit.
142 peer.underlyingActor().grantVote = true;
144 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
146 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
148 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
152 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
153 ShardTestKit kit = new ShardTestKit(getSystem());
155 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
156 shardTransactionCommitTimeoutInSeconds(1);
158 String peerId = newShardId("follower").toString();
159 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
160 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
162 MockFollower follower = peer.underlyingActor();
164 // Drop AppendEntries so consensus isn't reached.
165 follower.dropAppendEntries = true;
167 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
168 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
169 withDispatcher(Dispatchers.DefaultDispatcherId()));
171 kit.waitUntilLeader(shard);
173 YangInstanceIdentifier entityId = ENTITY_ID1;
174 Entity entity = new Entity(ENTITY_TYPE, entityId);
175 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
177 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
178 kit.expectMsgClass(SuccessReply.class);
180 // Wait enough time for the commit to timeout.
181 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
183 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
184 // write being applied to the state.
185 follower.dropAppendEntries = false;
187 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
189 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
191 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
195 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
196 ShardTestKit kit = new ShardTestKit(getSystem());
198 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
199 shardIsolatedLeaderCheckIntervalInMillis(50);
201 String peerId = newShardId("follower").toString();
202 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
203 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
205 MockFollower follower = peer.underlyingActor();
207 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
208 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
209 withDispatcher(Dispatchers.DefaultDispatcherId()));
211 kit.waitUntilLeader(shard);
213 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
214 follower.dropAppendEntries = true;
215 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
217 YangInstanceIdentifier entityId = ENTITY_ID1;
218 Entity entity = new Entity(ENTITY_TYPE, entityId);
219 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
221 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
222 kit.expectMsgClass(SuccessReply.class);
224 // Resume AppendEntries - the candidate write should now be committed.
225 follower.dropAppendEntries = false;
226 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
228 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
230 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
234 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
235 ShardTestKit kit = new ShardTestKit(getSystem());
237 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
238 shardBatchedModificationCount(5);
240 String peerId = newShardId("leader").toString();
241 TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
242 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
244 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
245 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
246 withDispatcher(Dispatchers.DefaultDispatcherId()));
248 shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
249 DataStoreVersions.CURRENT_VERSION), peer);
251 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
253 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
254 kit.expectMsgClass(SuccessReply.class);
256 MockLeader leader = peer.underlyingActor();
257 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
258 leader.modificationsReceived, 5, TimeUnit.SECONDS));
259 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
262 shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
264 // Test with initial commit timeout and subsequent retry.
266 leader.modificationsReceived = new CountDownLatch(1);
267 leader.sendReply = false;
269 shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
271 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
272 kit.expectMsgClass(SuccessReply.class);
274 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
275 leader.modificationsReceived, 5, TimeUnit.SECONDS));
276 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
279 // Send a bunch of registration messages quickly and verify.
283 leader.modificationsReceived = new CountDownLatch(max);
284 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
285 for(int i = 1; i <= max; i++) {
286 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
288 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
291 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
292 leader.modificationsReceived, 10, TimeUnit.SECONDS));
294 // Sleep a little to ensure no additional BatchedModifications are received.
296 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
298 List<Modification> receivedMods = leader.getAndClearReceivedModifications();
299 for(int i = 0; i < max; i++) {
300 verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
303 assertEquals("# modifications received", max, receivedMods.size());
307 public void testOnUnregisterCandidateLocal() throws Exception {
308 ShardTestKit kit = new ShardTestKit(getSystem());
309 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
310 kit.waitUntilLeader(shard);
312 Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
313 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
317 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
318 kit.expectMsgClass(SuccessReply.class);
320 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
321 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
322 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
328 shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
329 kit.expectMsgClass(SuccessReply.class);
331 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
332 verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
336 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
337 kit.expectMsgClass(SuccessReply.class);
339 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
340 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
341 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
345 public void testOwnershipChanges() throws Exception {
346 ShardTestKit kit = new ShardTestKit(getSystem());
347 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
348 kit.waitUntilLeader(shard);
350 Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
351 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
352 ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
354 // Add a remote candidate
356 String remoteMemberName1 = "remoteMember1";
357 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
361 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
362 kit.expectMsgClass(SuccessReply.class);
364 // Verify the remote candidate becomes owner
366 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
367 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
368 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
369 verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
371 // Add another remote candidate and verify ownership doesn't change
374 String remoteMemberName2 = "remoteMember2";
375 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
377 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
378 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
379 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
380 verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
382 // Remove the second remote candidate and verify ownership doesn't change
385 deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
387 verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
388 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
389 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
390 verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
392 // Remove the first remote candidate and verify the local candidate becomes owner
395 deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
397 verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
398 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
399 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
401 // Add the second remote candidate back and verify ownership doesn't change
404 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
406 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
407 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
408 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
409 verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
411 // Unregister the local candidate and verify the second remote candidate becomes owner
413 shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
414 kit.expectMsgClass(SuccessReply.class);
416 verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
417 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
421 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
422 ShardTestKit kit = new ShardTestKit(getSystem());
424 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000);
426 String peerMemberName1 = "peerMember1";
427 String peerMemberName2 = "peerMember2";
429 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
430 ShardIdentifier peerId1 = newShardId(peerMemberName1);
431 ShardIdentifier peerId2 = newShardId(peerMemberName2);
433 TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
434 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
435 peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
437 TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
438 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
439 peerMemberName2). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
441 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
442 ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
443 put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME).
444 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
445 leader.tell(new ElectionTimeout(), leader);
447 kit.waitUntilLeader(leader);
449 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
451 // Send PeerDown and PeerUp with no entities
453 leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
454 leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
456 // Add candidates for entity1 with the local leader as the owner
458 leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
459 kit.expectMsgClass(SuccessReply.class);
460 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
462 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
463 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
465 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
466 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
467 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
469 // Add candidates for entity2 with peerMember2 as the owner
471 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
472 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
474 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
475 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
477 leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
478 kit.expectMsgClass(SuccessReply.class);
479 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
480 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
482 // Add candidates for entity3 with peerMember2 as the owner.
484 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
485 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
487 leader.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
488 kit.expectMsgClass(SuccessReply.class);
489 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
491 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit);
492 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
493 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
495 // Add only candidate peerMember2 for entity4.
497 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit);
498 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
499 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
501 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
505 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
506 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
509 leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
510 // Send PeerDown again - should be noop
511 leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
512 peer1.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
514 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear
515 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
516 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
517 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
519 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
520 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
521 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
522 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
524 // Reinstate peerMember2 - should become owner again for entity 4
526 peer2 = actorFactory.createTestActor(newShardProps(peerId2,
527 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
528 peerMemberName2). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
529 leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
530 // Send PeerUp again - should be noop
531 leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
532 peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
534 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
535 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
536 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
537 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
539 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
541 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
542 leader.tell(new PeerDown(peerMemberName1, peerId1.toString()), ActorRef.noSender());
544 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
546 // Verify the reinstated peerMember2 is fully synced.
548 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
549 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
550 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
551 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
553 // Reinstate peerMember1 and verify no owner changes
555 peer1 = actorFactory.createTestActor(newShardProps(peerId1,
556 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
557 peerMemberName1).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
558 leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
560 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
561 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
562 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
563 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
565 // Verify the reinstated peerMember1 is fully synced.
567 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
568 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
569 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
570 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
572 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
573 // the entities (1 and 3) previously owned by the local leader member.
575 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
576 peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
577 peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
579 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
580 peer2.tell(new PeerDown(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
581 peer2.tell(new ElectionTimeout(), peer2);
583 kit.waitUntilLeader(peer2);
585 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
586 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
587 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
588 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
591 private void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
592 JavaTestKit sender) {
593 BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
594 modifications.setDoCommitOnReady(true);
595 modifications.setReady(true);
596 modifications.setTotalMessagesSent(1);
597 modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
599 shard.tell(modifications, sender.getRef());
600 sender.expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
603 private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
604 YangInstanceIdentifier entityId, String candidateName) {
605 verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
606 new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
608 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
610 return AbstractShardTest.readStore(shard, path);
611 } catch(Exception e) {
612 throw new AssertionError("Failed to read " + path, e);
618 private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
619 YangInstanceIdentifier entityId, String candidateName) {
620 verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
622 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
624 return AbstractShardTest.readStore(shard, path);
625 } catch(Exception e) {
626 throw new AssertionError("Failed to read " + path, e);
632 private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
633 YangInstanceIdentifier entityId, String candidateName) throws Exception {
634 assertEquals("BatchedModifications size", 1, mods.size());
635 verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
638 private void verifyBatchedEntityCandidate(Modification mod, String entityType,
639 YangInstanceIdentifier entityId, String candidateName) throws Exception {
640 assertEquals("Modification type", MergeModification.class, mod.getClass());
641 verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
642 entityId, candidateName);
645 private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
646 String localMemberName) {
647 verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
649 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
651 return AbstractShardTest.readStore(shard, path);
652 } catch(Exception e) {
659 private Props newShardProps() {
660 return newShardProps(Collections.<String,String>emptyMap());
663 private Props newShardProps(Map<String,String> peers) {
664 return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME);
667 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
668 return EntityOwnershipShard.props(shardId, peers, dataStoreContextBuilder.build(),
669 SCHEMA_CONTEXT, memberName);
672 private ShardIdentifier newShardId(String memberName) {
673 return ShardIdentifier.builder().memberName(memberName).shardName("entity-ownership").
674 type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
677 public static class MockFollower extends UntypedActor {
678 volatile boolean grantVote;
679 volatile boolean dropAppendEntries;
680 private final String myId;
682 public MockFollower(String myId) {
686 public MockFollower(String myId, boolean grantVote) {
688 this.grantVote = grantVote;
692 public void onReceive(Object message) {
693 if(message instanceof RequestVote) {
695 getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
697 } else if(message instanceof AppendEntries) {
698 if(!dropAppendEntries) {
699 AppendEntries req = (AppendEntries) message;
700 long lastIndex = req.getLeaderCommit();
701 if (req.getEntries().size() > 0) {
702 for(ReplicatedLogEntry entry : req.getEntries()) {
703 lastIndex = entry.getIndex();
707 getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
708 DataStoreVersions.CURRENT_VERSION), getSelf());
714 public static class MockLeader extends UntypedActor {
715 volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
716 List<Modification> receivedModifications = new ArrayList<>();
717 volatile boolean sendReply = true;
721 public void onReceive(Object message) {
722 if(message instanceof BatchedModifications) {
724 Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
728 BatchedModifications mods = (BatchedModifications) message;
729 synchronized (receivedModifications) {
730 for(int i = 0; i < mods.getModifications().size(); i++) {
731 receivedModifications.add(mods.getModifications().get(i));
732 modificationsReceived.countDown();
736 getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
743 List<Modification> getAndClearReceivedModifications() {
744 synchronized (receivedModifications) {
745 List<Modification> ret = new ArrayList<>(receivedModifications);
746 receivedModifications.clear();