2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.times;
16 import static org.mockito.Mockito.reset;
17 import static org.mockito.Mockito.timeout;
18 import static org.mockito.Mockito.verify;
19 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
20 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.candidatePath;
21 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.entityOwnersWithCandidate;
22 import akka.actor.ActorRef;
23 import akka.actor.PoisonPill;
24 import akka.actor.Props;
25 import akka.actor.Terminated;
26 import akka.actor.UntypedActor;
27 import akka.dispatch.Dispatchers;
28 import akka.testkit.JavaTestKit;
29 import akka.testkit.TestActorRef;
30 import com.google.common.base.Function;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.concurrent.CountDownLatch;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicInteger;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
43 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
46 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
47 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
53 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
59 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
60 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
61 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
62 import org.opendaylight.controller.cluster.datastore.modification.Modification;
63 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
64 import org.opendaylight.controller.cluster.raft.TestActorFactory;
65 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
66 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
67 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
68 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
69 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
70 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
71 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
72 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipChange;
73 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
74 import org.opendaylight.yangtools.yang.common.QName;
75 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
76 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
77 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
80 * Unit tests for EntityOwnershipShard.
82 * @author Thomas Pantelis
84 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
85 private static final String ENTITY_TYPE = "test type";
86 private static final YangInstanceIdentifier ENTITY_ID1 =
87 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
88 private static final YangInstanceIdentifier ENTITY_ID2 =
89 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
90 private static final YangInstanceIdentifier ENTITY_ID3 =
91 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
92 private static final YangInstanceIdentifier ENTITY_ID4 =
93 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
94 private static final YangInstanceIdentifier ENTITY_ID5 =
95 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
96 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
97 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
98 private static final String LOCAL_MEMBER_NAME = "member-1";
100 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
101 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
104 public void tearDown() {
105 actorFactory.close();
109 public void testOnRegisterCandidateLocal() throws Exception {
110 ShardTestKit kit = new ShardTestKit(getSystem());
112 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
114 ShardTestKit.waitUntilLeader(shard);
116 YangInstanceIdentifier entityId = ENTITY_ID1;
117 Entity entity = new Entity(ENTITY_TYPE, entityId);
119 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
120 kit.expectMsgClass(SuccessReply.class);
122 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
123 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
127 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
128 ShardTestKit kit = new ShardTestKit(getSystem());
130 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
132 String peerId = newShardId("follower").toString();
133 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
134 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
136 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
137 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
138 withDispatcher(Dispatchers.DefaultDispatcherId()));
140 YangInstanceIdentifier entityId = ENTITY_ID1;
141 Entity entity = new Entity(ENTITY_TYPE, entityId);
143 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
144 kit.expectMsgClass(SuccessReply.class);
146 // Now grant the vote so the shard becomes the leader. This should retry the commit.
147 peer.underlyingActor().grantVote = true;
149 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
150 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
154 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
155 ShardTestKit kit = new ShardTestKit(getSystem());
157 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
158 shardTransactionCommitTimeoutInSeconds(1);
160 String peerId = newShardId("follower").toString();
161 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
162 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
164 MockFollower follower = peer.underlyingActor();
166 // Drop AppendEntries so consensus isn't reached.
167 follower.dropAppendEntries = true;
169 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
170 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
171 withDispatcher(Dispatchers.DefaultDispatcherId()));
173 ShardTestKit.waitUntilLeader(shard);
175 YangInstanceIdentifier entityId = ENTITY_ID1;
176 Entity entity = new Entity(ENTITY_TYPE, entityId);
178 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
179 kit.expectMsgClass(SuccessReply.class);
181 // Wait enough time for the commit to timeout.
182 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
184 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
185 // write being applied to the state.
186 follower.dropAppendEntries = false;
188 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
189 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
193 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
194 ShardTestKit kit = new ShardTestKit(getSystem());
196 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
197 shardIsolatedLeaderCheckIntervalInMillis(50);
199 String peerId = newShardId("follower").toString();
200 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
201 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
203 MockFollower follower = peer.underlyingActor();
205 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
206 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
207 withDispatcher(Dispatchers.DefaultDispatcherId()));
209 ShardTestKit.waitUntilLeader(shard);
211 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
212 follower.dropAppendEntries = true;
213 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
215 YangInstanceIdentifier entityId = ENTITY_ID1;
216 Entity entity = new Entity(ENTITY_TYPE, entityId);
218 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
219 kit.expectMsgClass(SuccessReply.class);
221 // Resume AppendEntries - the candidate write should now be committed.
222 follower.dropAppendEntries = false;
223 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
224 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
228 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
229 ShardTestKit kit = new ShardTestKit(getSystem());
231 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
232 shardBatchedModificationCount(5);
234 String peerId = newShardId("leader").toString();
235 TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
236 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
238 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
239 TestEntityOwnershipShard.class, newShardId(LOCAL_MEMBER_NAME),
240 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build(),
241 dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
243 shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
244 DataStoreVersions.CURRENT_VERSION), peer);
246 shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
247 kit.expectMsgClass(SuccessReply.class);
249 MockLeader leader = peer.underlyingActor();
250 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
251 leader.modificationsReceived, 5, TimeUnit.SECONDS));
252 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
255 // Test with initial commit timeout and subsequent retry.
257 leader.modificationsReceived = new CountDownLatch(1);
258 leader.sendReply = false;
260 shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
262 shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
263 kit.expectMsgClass(SuccessReply.class);
265 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
266 leader.modificationsReceived, 5, TimeUnit.SECONDS));
267 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
270 // Send a bunch of registration messages quickly and verify.
274 leader.modificationsReceived = new CountDownLatch(max);
275 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
276 for(int i = 1; i <= max; i++) {
277 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
279 shard.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, id)), kit.getRef());
282 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
283 leader.modificationsReceived, 10, TimeUnit.SECONDS));
285 // Sleep a little to ensure no additional BatchedModifications are received.
287 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
289 List<Modification> receivedMods = leader.getAndClearReceivedModifications();
290 for(int i = 0; i < max; i++) {
291 verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
294 assertEquals("# modifications received", max, receivedMods.size());
298 public void testOnUnregisterCandidateLocal() throws Exception {
299 ShardTestKit kit = new ShardTestKit(getSystem());
300 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
301 ShardTestKit.waitUntilLeader(shard);
303 Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
307 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
308 kit.expectMsgClass(SuccessReply.class);
310 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
311 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
315 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
316 kit.expectMsgClass(SuccessReply.class);
318 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
322 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
323 kit.expectMsgClass(SuccessReply.class);
325 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
326 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
330 public void testOwnershipChanges() throws Exception {
331 ShardTestKit kit = new ShardTestKit(getSystem());
332 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
333 ShardTestKit.waitUntilLeader(shard);
335 Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
336 ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
338 // Add a remote candidate
340 String remoteMemberName1 = "remoteMember1";
341 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
345 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
346 kit.expectMsgClass(SuccessReply.class);
348 // Verify the remote candidate becomes owner
350 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
351 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
352 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
354 // Add another remote candidate and verify ownership doesn't change
356 String remoteMemberName2 = "remoteMember2";
357 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
359 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
360 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
361 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
363 // Remove the second remote candidate and verify ownership doesn't change
365 deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
367 verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
368 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
369 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
371 // Remove the first remote candidate and verify the local candidate becomes owner
373 deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
375 verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
376 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
378 // Add the second remote candidate back and verify ownership doesn't change
380 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
382 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
383 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
384 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
386 // Unregister the local candidate and verify the second remote candidate becomes owner
388 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
389 kit.expectMsgClass(SuccessReply.class);
391 verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
392 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
396 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
397 ShardTestKit kit = new ShardTestKit(getSystem());
399 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000);
401 String peerMemberName1 = "peerMember1";
402 String peerMemberName2 = "peerMember2";
404 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
405 ShardIdentifier peerId1 = newShardId(peerMemberName1);
406 ShardIdentifier peerId2 = newShardId(peerMemberName2);
408 TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
409 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
410 peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
412 TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
413 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
414 peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
416 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
417 ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
418 put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).
419 withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
420 leader.tell(new ElectionTimeout(), leader);
422 ShardTestKit.waitUntilLeader(leader);
424 // Send PeerDown and PeerUp with no entities
426 leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
427 leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
429 // Add candidates for entity1 with the local leader as the owner
431 leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
432 kit.expectMsgClass(SuccessReply.class);
433 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
435 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
436 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
438 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
439 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
440 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
442 // Add candidates for entity2 with peerMember2 as the owner
444 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
445 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
447 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
448 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
449 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
451 // Add candidates for entity3 with peerMember2 as the owner.
453 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
454 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
456 leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
457 kit.expectMsgClass(SuccessReply.class);
458 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
460 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit);
461 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
462 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
464 // Add only candidate peerMember2 for entity4.
466 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit);
467 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
468 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
470 // Add only candidate peerMember1 for entity5.
472 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit);
473 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
474 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
476 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
480 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
481 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
484 leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
485 // Send PeerDown again - should be noop
486 leader.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
487 peer1.tell(new PeerDown(peerMemberName2, peerId2.toString()), ActorRef.noSender());
489 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, ""); // no other candidates so should clear
490 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
491 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
492 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
494 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
495 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
496 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
497 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
499 // Reinstate peerMember2 - no owners should change
501 peer2 = actorFactory.createTestActor(newShardProps(peerId2,
502 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId1.toString(), "").build(),
503 peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()). withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
504 leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
505 // Send PeerUp again - should be noop
506 leader.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
507 peer1.tell(new PeerUp(peerMemberName2, peerId2.toString()), ActorRef.noSender());
509 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
510 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
511 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
512 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
514 // Add back candidate peerMember2 for entities 1, 2, & 3.
516 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
517 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
518 commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
519 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
520 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
521 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
522 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
523 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
524 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
526 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
528 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
529 leader.tell(new PeerDown(peerMemberName1, peerId1.toString()), ActorRef.noSender());
531 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
533 // Verify the reinstated peerMember2 is fully synced.
535 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
536 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
537 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
538 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
540 // Reinstate peerMember1 and verify no owner changes
542 peer1 = actorFactory.createTestActor(newShardProps(peerId1,
543 ImmutableMap.<String, String>builder().put(leaderId.toString(), ""). put(peerId2.toString(), "").build(),
544 peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
545 leader.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
547 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
548 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
549 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
550 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
552 // Verify the reinstated peerMember1 is fully synced.
554 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
555 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
556 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
557 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
559 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
560 // the entities (1 and 3) previously owned by the local leader member.
562 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
563 peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
564 peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
566 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
567 peer2.tell(new PeerDown(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
568 peer2.tell(new ElectionTimeout(), peer2);
570 ShardTestKit.waitUntilLeader(peer2);
572 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
573 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
574 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
575 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
579 public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
580 ShardTestKit kit = new ShardTestKit(getSystem());
582 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(10000);
583 ShardIdentifier leaderId = newShardId("leader");
584 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
586 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(Props.create(
587 TestEntityOwnershipShard.class, localId,
588 ImmutableMap.<String, String>builder().put(leaderId.toString(), "".toString()).build(),
589 dataStoreContextBuilder.build()).withDispatcher(Dispatchers.DefaultDispatcherId()));
591 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
592 ImmutableMap.<String, String>builder().put(localId.toString(), shard.path().toString()).build(),
593 LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
594 leader.tell(new ElectionTimeout(), leader);
596 ShardTestKit.waitUntilLeader(leader);
598 shard.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
600 Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
601 EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
603 shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
604 kit.expectMsgClass(SuccessReply.class);
606 // Register local candidate
608 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
609 kit.expectMsgClass(SuccessReply.class);
610 verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
611 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
614 // Simulate a replicated commit from the leader to remove the local candidate that would occur after a
615 // network partition is healed.
617 leader.tell(new PeerDown(LOCAL_MEMBER_NAME, localId.toString()), ActorRef.noSender());
619 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, true, false, false));
621 // Since the the shard has a local candidate registered, it should re-add its candidate to the entity.
623 verifyCommittedEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
624 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity, false, true, true));
626 // Unregister the local candidate and verify it's removed and no re-added.
628 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
629 kit.expectMsgClass(SuccessReply.class);
631 verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
632 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
633 verifyNoEntityCandidate(shard, entity.getType(), entity.getId(), LOCAL_MEMBER_NAME);
637 public void testListenerRegistration() throws Exception {
638 ShardTestKit kit = new ShardTestKit(getSystem());
639 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
640 ShardTestKit.waitUntilLeader(shard);
641 ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
643 String otherEntityType = "otherEntityType";
644 Entity entity1 = new Entity(ENTITY_TYPE, ENTITY_ID1);
645 Entity entity2 = new Entity(ENTITY_TYPE, ENTITY_ID2);
646 Entity entity3 = new Entity(ENTITY_TYPE, ENTITY_ID3);
647 Entity entity4 = new Entity(otherEntityType, ENTITY_ID3);
648 EntityOwnershipListener listener = mock(EntityOwnershipListener.class);
652 shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
653 kit.expectMsgClass(SuccessReply.class);
655 // Register a couple candidates for the desired entity type and verify listener is notified.
657 shard.tell(new RegisterCandidateLocal(entity1), kit.getRef());
658 kit.expectMsgClass(SuccessReply.class);
660 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
662 shard.tell(new RegisterCandidateLocal(entity2), kit.getRef());
663 kit.expectMsgClass(SuccessReply.class);
665 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
668 // Register another candidate for another entity type and verify listener is not notified.
670 shard.tell(new RegisterCandidateLocal(entity4), kit.getRef());
671 kit.expectMsgClass(SuccessReply.class);
673 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
674 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
676 // Register remote candidate for entity1
678 String remoteMemberName = "remoteMember";
679 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, entity1.getId(), remoteMemberName),
681 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entity1.getId(), remoteMemberName);
683 // Unregister the local candidate for entity1 and verify listener is notified
685 shard.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
686 kit.expectMsgClass(SuccessReply.class);
688 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
691 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
693 shard.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
694 kit.expectMsgClass(SuccessReply.class);
696 shard.tell(new RegisterCandidateLocal(entity3), kit.getRef());
697 kit.expectMsgClass(SuccessReply.class);
699 verifyOwner(shard, ENTITY_TYPE, entity3.getId(), LOCAL_MEMBER_NAME);
700 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
701 verify(listener, never()).ownershipChanged(any(EntityOwnershipChange.class));
703 // Re-register the listener and verify it gets notified of currently owned entities
707 shard.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
708 kit.expectMsgClass(SuccessReply.class);
710 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
711 ownershipChange(entity3, false, true, true)));
712 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
713 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
714 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
717 private static void commitModification(TestActorRef<EntityOwnershipShard> shard, NormalizedNode<?, ?> node,
718 JavaTestKit sender) {
719 BatchedModifications modifications = newBatchedModifications();
720 modifications.addModification(new MergeModification(ENTITY_OWNERS_PATH, node));
722 shard.tell(modifications, sender.getRef());
723 sender.expectMsgClass(CommitTransactionReply.class);
726 private static BatchedModifications newBatchedModifications() {
727 BatchedModifications modifications = new BatchedModifications("tnx", DataStoreVersions.CURRENT_VERSION, "");
728 modifications.setDoCommitOnReady(true);
729 modifications.setReady(true);
730 modifications.setTotalMessagesSent(1);
731 return modifications;
734 private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
735 YangInstanceIdentifier entityId, String candidateName) {
736 verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
737 new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
739 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
741 return AbstractShardTest.readStore(shard, path);
742 } catch(Exception e) {
743 throw new AssertionError("Failed to read " + path, e);
749 private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
750 YangInstanceIdentifier entityId, String candidateName) {
751 verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
753 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
755 return AbstractShardTest.readStore(shard, path);
756 } catch(Exception e) {
757 throw new AssertionError("Failed to read " + path, e);
763 private void verifyNoEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
764 YangInstanceIdentifier entityId, String candidateName) {
765 verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
767 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
769 return AbstractShardTest.readStore(shard, path);
770 } catch(Exception e) {
771 throw new AssertionError("Failed to read " + path, e);
777 private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
778 YangInstanceIdentifier entityId, String candidateName) throws Exception {
779 assertEquals("BatchedModifications size", 1, mods.size());
780 verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
783 private void verifyBatchedEntityCandidate(Modification mod, String entityType,
784 YangInstanceIdentifier entityId, String candidateName) throws Exception {
785 assertEquals("Modification type", MergeModification.class, mod.getClass());
786 verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
787 entityId, candidateName, true);
790 private static void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType,
791 YangInstanceIdentifier entityId, String localMemberName) {
792 verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
794 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
796 return AbstractShardTest.readStore(shard, path);
797 } catch(Exception e) {
804 private Props newShardProps() {
805 return newShardProps(Collections.<String,String>emptyMap());
808 private Props newShardProps(EntityOwnerSelectionStrategyConfig strategyConfig) {
809 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(),
810 LOCAL_MEMBER_NAME, strategyConfig);
813 private Props newShardProps(EntityOwnerSelectionStrategyConfig strategyConfig, Map<String, String> peers) {
814 return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, strategyConfig);
818 private Props newShardProps(Map<String,String> peers) {
819 return newShardProps(newShardId(LOCAL_MEMBER_NAME), peers, LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build());
822 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
823 EntityOwnerSelectionStrategyConfig config) {
824 return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).
825 datastoreContext(dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).
826 localMemberName(memberName).ownerSelectionStrategyConfig(config).props().withDispatcher(Dispatchers.DefaultDispatcherId());
829 private static ShardIdentifier newShardId(String memberName) {
830 return ShardIdentifier.builder().memberName(memberName).shardName("entity-ownership").
831 type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
834 public static class TestEntityOwnershipShard extends EntityOwnershipShard {
836 TestEntityOwnershipShard(ShardIdentifier name, Map<String, String> peerAddresses,
837 DatastoreContext datastoreContext) {
838 super(newBuilder().id(name).peerAddresses(peerAddresses).datastoreContext(datastoreContext).
839 schemaContext(SCHEMA_CONTEXT).localMemberName(LOCAL_MEMBER_NAME));
843 public void onReceiveCommand(Object message) throws Exception {
844 if(!(message instanceof ElectionTimeout)) {
845 super.onReceiveCommand(message);
852 public static class MockFollower extends UntypedActor {
853 volatile boolean grantVote;
854 volatile boolean dropAppendEntries;
855 private final String myId;
857 public MockFollower(String myId) {
861 public MockFollower(String myId, boolean grantVote) {
863 this.grantVote = grantVote;
867 public void onReceive(Object message) {
868 if(message instanceof RequestVote) {
870 getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
872 } else if(message instanceof AppendEntries) {
873 if(!dropAppendEntries) {
874 AppendEntries req = (AppendEntries) message;
875 long lastIndex = req.getLeaderCommit();
876 if (req.getEntries().size() > 0) {
877 for(ReplicatedLogEntry entry : req.getEntries()) {
878 lastIndex = entry.getIndex();
882 getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
883 DataStoreVersions.CURRENT_VERSION), getSelf());
891 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
892 ShardTestKit kit = new ShardTestKit(getSystem());
893 EntityOwnerSelectionStrategyConfig.Builder builder
894 = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
896 String peerId = newShardId("follower").toString();
897 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId, false).
898 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
900 peer.underlyingActor().grantVote = true;
902 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
903 ImmutableMap.of(peerId.toString(), peer.path().toString())));
904 ShardTestKit.waitUntilLeader(shard);
906 Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
907 ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
909 // Add a remote candidate
911 String remoteMemberName1 = "follower";
912 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
916 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
917 kit.expectMsgClass(SuccessReply.class);
919 // Verify the local candidate becomes owner
921 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
922 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
923 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
927 public void testDelayedEntityOwnerSelection() throws Exception {
928 ShardTestKit kit = new ShardTestKit(getSystem());
929 EntityOwnerSelectionStrategyConfig.Builder builder
930 = EntityOwnerSelectionStrategyConfig.newBuilder().addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
932 String follower1Id = newShardId("follower1").toString();
933 TestActorRef<MockFollower> follower1 = actorFactory.createTestActor(Props.create(MockFollower.class, follower1Id, false).
934 withDispatcher(Dispatchers.DefaultDispatcherId()), follower1Id);
936 follower1.underlyingActor().grantVote = true;
938 String follower2Id = newShardId("follower").toString();
939 TestActorRef<MockFollower> follower2 = actorFactory.createTestActor(Props.create(MockFollower.class, follower2Id, false).
940 withDispatcher(Dispatchers.DefaultDispatcherId()), follower2Id);
942 follower2.underlyingActor().grantVote = true;
945 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(builder.build(),
946 ImmutableMap.of(follower1Id.toString(), follower2.path().toString(), follower2Id.toString(), follower2.path().toString())));
947 ShardTestKit.waitUntilLeader(shard);
949 Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
950 ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
952 // Add a remote candidate
954 String remoteMemberName1 = "follower";
955 writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
959 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
960 kit.expectMsgClass(SuccessReply.class);
962 // Verify the local candidate becomes owner
964 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
965 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
966 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
969 public static class MockLeader extends UntypedActor {
970 volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
971 List<Modification> receivedModifications = new ArrayList<>();
972 volatile boolean sendReply = true;
976 public void onReceive(Object message) {
977 if(message instanceof BatchedModifications) {
979 Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
983 BatchedModifications mods = (BatchedModifications) message;
984 synchronized (receivedModifications) {
985 for(int i = 0; i < mods.getModifications().size(); i++) {
986 receivedModifications.add(mods.getModifications().get(i));
987 modificationsReceived.countDown();
991 getSender().tell(CommitTransactionReply.instance(DataStoreVersions.CURRENT_VERSION).
992 toSerializable(), getSelf());
999 List<Modification> getAndClearReceivedModifications() {
1000 synchronized (receivedModifications) {
1001 List<Modification> ret = new ArrayList<>(receivedModifications);
1002 receivedModifications.clear();