2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.List;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.function.Predicate;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
46 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
59 import org.opendaylight.controller.cluster.raft.RaftState;
60 import org.opendaylight.controller.cluster.raft.TestActorFactory;
61 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 * Unit tests for EntityOwnershipShard.
77 * @author Thomas Pantelis
79 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
80 private static final String ENTITY_TYPE = "test type";
81 private static final YangInstanceIdentifier ENTITY_ID1 =
82 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
83 private static final YangInstanceIdentifier ENTITY_ID2 =
84 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
85 private static final YangInstanceIdentifier ENTITY_ID3 =
86 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
87 private static final YangInstanceIdentifier ENTITY_ID4 =
88 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
89 private static final YangInstanceIdentifier ENTITY_ID5 =
90 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
91 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
92 private static final String LOCAL_MEMBER_NAME = "local-member-1";
93 private static final String PEER_MEMBER_1_NAME = "peer-member-1";
94 private static final String PEER_MEMBER_2_NAME = "peer-member-2";
96 private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
97 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
100 public void tearDown() {
101 actorFactory.close();
105 public void testOnRegisterCandidateLocal() {
106 testLog.info("testOnRegisterCandidateLocal starting");
108 ShardTestKit kit = new ShardTestKit(getSystem());
110 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
112 ShardTestKit.waitUntilLeader(shard);
114 YangInstanceIdentifier entityId = ENTITY_ID1;
115 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
117 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
118 kit.expectMsgClass(SuccessReply.class);
120 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
123 testLog.info("testOnRegisterCandidateLocal ending");
127 public void testOnRegisterCandidateLocalWithNoInitialLeader() {
128 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
130 final ShardTestKit kit = new ShardTestKit(getSystem());
132 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
134 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
135 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
137 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
138 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
139 TestEntityOwnershipShard peerShard = peer.underlyingActor();
140 peerShard.startDroppingMessagesOfType(RequestVote.class);
141 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
143 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
144 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
146 YangInstanceIdentifier entityId = ENTITY_ID1;
147 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
149 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
150 kit.expectMsgClass(SuccessReply.class);
152 // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
153 peerShard.stopDroppingMessagesOfType(RequestVote.class);
155 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
156 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
158 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
162 public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
163 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
165 final ShardTestKit kit = new ShardTestKit(getSystem());
167 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
168 .shardTransactionCommitTimeoutInSeconds(1);
170 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
171 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
173 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
174 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
175 TestEntityOwnershipShard peerShard = peer.underlyingActor();
176 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
178 // Drop AppendEntries so consensus isn't reached.
179 peerShard.startDroppingMessagesOfType(AppendEntries.class);
181 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
182 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
184 ShardTestKit.waitUntilLeader(leader);
186 YangInstanceIdentifier entityId = ENTITY_ID1;
187 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
189 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
190 kit.expectMsgClass(SuccessReply.class);
192 // Wait enough time for the commit to timeout.
193 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
195 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
196 // write being applied to the state.
197 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
199 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
202 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
206 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
207 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
209 final ShardTestKit kit = new ShardTestKit(getSystem());
211 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
212 .shardIsolatedLeaderCheckIntervalInMillis(50);
214 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
215 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
217 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
218 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
219 TestEntityOwnershipShard peerShard = peer.underlyingActor();
220 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
222 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
223 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
225 ShardTestKit.waitUntilLeader(leader);
227 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
228 peerShard.startDroppingMessagesOfType(AppendEntries.class);
229 verifyRaftState(leader, state ->
230 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
232 YangInstanceIdentifier entityId = ENTITY_ID1;
233 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
235 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
236 kit.expectMsgClass(SuccessReply.class);
238 // Resume AppendEntries - the candidate write should now be committed.
239 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
240 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
241 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
243 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
247 public void testOnRegisterCandidateLocalWithRemoteLeader() {
248 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
250 ShardTestKit kit = new ShardTestKit(getSystem());
252 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
253 .shardBatchedModificationCount(5);
255 ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
256 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
257 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
258 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
259 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
260 final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
262 TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
263 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
264 local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
266 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
267 kit.expectMsgClass(SuccessReply.class);
269 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
270 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
272 // Test with initial commit timeout and subsequent retry.
274 local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
275 leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
277 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
278 kit.expectMsgClass(SuccessReply.class);
280 expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
282 // Send a bunch of registration messages quickly and verify.
284 leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
285 clearMessages(leaderShard.collectorActor());
288 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
289 for (int i = 1; i <= max; i++) {
290 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
292 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
295 for (int i = 0; i < max; i++) {
296 verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
299 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
303 public void testOnUnregisterCandidateLocal() {
304 testLog.info("testOnUnregisterCandidateLocal starting");
306 ShardTestKit kit = new ShardTestKit(getSystem());
307 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
308 ShardTestKit.waitUntilLeader(shard);
310 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
314 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
315 kit.expectMsgClass(SuccessReply.class);
317 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
318 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
322 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
323 kit.expectMsgClass(SuccessReply.class);
325 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
329 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
330 kit.expectMsgClass(SuccessReply.class);
332 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
333 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
335 testLog.info("testOnUnregisterCandidateLocal ending");
339 public void testOwnershipChanges() {
340 testLog.info("testOwnershipChanges starting");
342 final ShardTestKit kit = new ShardTestKit(getSystem());
344 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
346 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
347 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
348 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
350 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
351 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
353 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
355 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
356 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
358 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
360 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
361 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
362 leaderId.toString());
364 ShardTestKit.waitUntilLeader(leader);
366 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
368 // Add a remote candidate
370 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
371 kit.expectMsgClass(SuccessReply.class);
373 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
377 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
378 kit.expectMsgClass(SuccessReply.class);
380 // Verify the remote candidate becomes owner
382 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
383 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
385 // Add another remote candidate and verify ownership doesn't change
387 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
388 kit.expectMsgClass(SuccessReply.class);
390 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
391 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
392 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
394 // Remove the second remote candidate and verify ownership doesn't change
396 peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
397 kit.expectMsgClass(SuccessReply.class);
399 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
400 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
401 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
403 // Remove the first remote candidate and verify the local candidate becomes owner
405 peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
406 kit.expectMsgClass(SuccessReply.class);
408 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
409 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
411 // Add the second remote candidate back and verify ownership doesn't change
413 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
414 kit.expectMsgClass(SuccessReply.class);
416 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
417 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
418 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
420 // Unregister the local candidate and verify the second remote candidate becomes owner
422 leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
423 kit.expectMsgClass(SuccessReply.class);
425 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
426 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
428 testLog.info("testOwnershipChanges ending");
432 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
433 testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
435 final ShardTestKit kit = new ShardTestKit(getSystem());
437 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
438 .shardIsolatedLeaderCheckIntervalInMillis(100000);
440 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
441 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
442 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
444 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
445 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
447 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
449 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
450 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
452 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
454 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
455 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
456 leaderId.toString());
458 verifyRaftState(leader, state ->
459 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
461 // Send PeerDown and PeerUp with no entities
463 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
464 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
466 // Add candidates for entity1 with the local leader as the owner
468 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
469 kit.expectMsgClass(SuccessReply.class);
470 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
472 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
473 kit.expectMsgClass(SuccessReply.class);
474 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
476 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
477 kit.expectMsgClass(SuccessReply.class);
478 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
479 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
481 // Add candidates for entity2 with peerMember2 as the owner
483 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
484 kit.expectMsgClass(SuccessReply.class);
485 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
487 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
488 kit.expectMsgClass(SuccessReply.class);
489 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
490 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
492 // Add candidates for entity3 with peerMember2 as the owner.
494 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
495 kit.expectMsgClass(SuccessReply.class);
496 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
498 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
499 kit.expectMsgClass(SuccessReply.class);
500 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
502 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
503 kit.expectMsgClass(SuccessReply.class);
504 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
505 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
507 // Add only candidate peerMember2 for entity4.
509 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
510 kit.expectMsgClass(SuccessReply.class);
511 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
512 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
514 // Add only candidate peerMember1 for entity5.
516 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
517 kit.expectMsgClass(SuccessReply.class);
518 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
519 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
521 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
525 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
526 kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
529 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
530 // Send PeerDown again - should be noop
531 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
532 peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
534 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
535 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
536 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
537 // no other candidates for entity4 so peerMember2 should remain owner.
538 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
540 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
541 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
542 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
543 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
545 // Reinstate peerMember2
547 peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
548 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
550 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
551 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
552 // Send PeerUp again - should be noop
553 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
554 peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
556 // peerMember2's candidates should be removed on startup.
557 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
558 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
559 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
560 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
562 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
563 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
564 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
565 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
567 // Add back candidate peerMember2 for entities 1, 2, & 3.
569 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
570 kit.expectMsgClass(SuccessReply.class);
571 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
572 kit.expectMsgClass(SuccessReply.class);
573 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
574 kit.expectMsgClass(SuccessReply.class);
575 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
576 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
577 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
578 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
579 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
580 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
581 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
582 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
583 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
584 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
585 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
586 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
587 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
589 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
592 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
593 kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
595 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
597 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
599 // Verify the reinstated peerMember2 is fully synced.
601 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
602 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
603 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
604 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
606 // Reinstate peerMember1 and verify no owner changes
608 peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
609 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
610 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
611 leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
613 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
614 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
615 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
616 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
618 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
619 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
620 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
622 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
623 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
624 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
626 // Verify the reinstated peerMember1 is fully synced.
628 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
629 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
630 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
631 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
633 AtomicLong leaderLastApplied = new AtomicLong();
634 verifyRaftState(leader, rs -> {
635 assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
636 leaderLastApplied.set(rs.getLastApplied());
639 verifyRaftState(peer2, rs -> assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()));
641 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
642 // the entities (1 and 3) previously owned by the local leader member.
644 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
645 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
646 peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
649 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
650 kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
652 peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
653 peer2.tell(TimeoutNow.INSTANCE, peer2);
655 verifyRaftState(peer2, state ->
656 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
658 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
659 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
660 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
661 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
663 testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
667 public void testLeaderIsolation() throws Exception {
668 testLog.info("testLeaderIsolation starting");
670 final ShardTestKit kit = new ShardTestKit(getSystem());
672 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
673 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
674 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
676 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
677 .shardIsolatedLeaderCheckIntervalInMillis(100000);
679 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
680 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
682 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
684 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
685 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
687 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
689 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
690 .shardIsolatedLeaderCheckIntervalInMillis(500);
692 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
693 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
694 leaderId.toString());
696 ShardTestKit.waitUntilLeader(leader);
698 // Add entity1 candidates for all members with the leader as the owner
700 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
701 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
702 kit.expectMsgClass(SuccessReply.class);
703 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
705 peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
706 kit.expectMsgClass(SuccessReply.class);
707 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
709 peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
710 kit.expectMsgClass(SuccessReply.class);
711 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
713 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
714 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
715 verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
717 // Add entity2 candidates for all members with peer1 as the owner
719 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
720 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
721 kit.expectMsgClass(SuccessReply.class);
722 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
724 peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
725 kit.expectMsgClass(SuccessReply.class);
726 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
728 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
729 kit.expectMsgClass(SuccessReply.class);
730 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
732 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
733 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
734 verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
736 // Add entity3 candidates for all members with peer2 as the owner
738 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
739 peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
740 kit.expectMsgClass(SuccessReply.class);
741 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
743 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
744 kit.expectMsgClass(SuccessReply.class);
745 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
747 peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
748 kit.expectMsgClass(SuccessReply.class);
749 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
751 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
752 verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
753 verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
755 // Add listeners on all members
757 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
758 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
759 kit.expectMsgClass(SuccessReply.class);
760 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
761 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
762 ownershipChange(entity3, false, false, true)));
763 reset(leaderListener);
765 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
766 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
767 kit.expectMsgClass(SuccessReply.class);
768 verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
769 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
770 ownershipChange(entity3, false, false, true)));
771 reset(peer1Listener);
773 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
774 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
775 kit.expectMsgClass(SuccessReply.class);
776 verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
777 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
778 ownershipChange(entity3, false, true, true)));
779 reset(peer2Listener);
781 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
783 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
784 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
786 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
787 ae -> ae.getLeaderId().equals(leaderId.toString()));
788 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
790 // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
792 peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
794 // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
797 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
798 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
800 verifyRaftState(leader, state ->
801 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
803 // Expect inJeopardy notification on the isolated leader.
805 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
806 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
807 ownershipChange(entity3, false, false, true, true)));
808 reset(leaderListener);
810 verifyRaftState(peer1, state ->
811 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
813 // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
816 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
818 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
820 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
821 reset(peer1Listener);
823 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
824 reset(peer2Listener);
826 // Remove the isolation.
828 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
829 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
830 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
831 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
833 // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
835 verifyRaftState(leader, state ->
836 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
838 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
839 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
840 ownershipChange(entity3, false, false, true)));
842 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
843 verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
845 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
846 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
847 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
849 verifyNoMoreInteractions(leaderListener);
850 verifyNoMoreInteractions(peer1Listener);
851 verifyNoMoreInteractions(peer2Listener);
853 testLog.info("testLeaderIsolation ending");
857 public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
858 testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
860 final ShardTestKit kit = new ShardTestKit(getSystem());
862 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
863 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
864 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
866 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
867 .shardIsolatedLeaderCheckIntervalInMillis(100000);
869 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
870 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
871 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
872 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
874 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
875 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
876 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
877 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
879 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
880 .shardIsolatedLeaderCheckIntervalInMillis(500);
882 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
883 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
884 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
886 ShardTestKit.waitUntilLeader(leader);
888 // Add listeners on all members
890 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
891 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
892 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
893 kit.expectMsgClass(SuccessReply.class);
895 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
896 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
897 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
898 kit.expectMsgClass(SuccessReply.class);
900 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
901 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
902 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
903 kit.expectMsgClass(SuccessReply.class);
905 // Drop the CandidateAdded message to the leader for now.
907 leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
909 // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
910 // assigned the owner.
912 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
913 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
914 kit.expectMsgClass(SuccessReply.class);
915 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
916 verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
917 verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
919 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
920 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
921 kit.expectMsgClass(SuccessReply.class);
922 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
923 verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
924 verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
926 // Capture the CandidateAdded messages.
928 final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
929 CandidateAdded.class, 2);
931 // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
932 // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
934 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
935 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
937 // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
939 leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
940 leader.tell(candidateAdded.get(0), leader);
941 leader.tell(candidateAdded.get(1), leader);
943 expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
944 ae -> ae.getEntries().size() > 0);
946 // Verify no owner assigned.
948 verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
949 verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
951 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
953 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
954 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
956 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
957 ae -> ae.getLeaderId().equals(leaderId.toString()));
958 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
960 // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
962 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
963 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
965 // Verify the leader transitions to IsolatedLeader.
967 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
968 state.getRaftState()));
970 // Send PeerDown to the new leader peer1.
972 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
974 // Make peer1 start an election and become leader by sending the TimeoutNow message.
976 peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
978 // Verify the peer1 transitions to Leader.
980 verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
981 state.getRaftState()));
983 verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
984 verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
986 verifyNoMoreInteractions(peer1Listener);
987 verifyNoMoreInteractions(peer2Listener);
989 // Add candidate peer1 candidate for entity2.
991 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
993 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
994 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
995 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
997 reset(leaderListener, peer1Listener, peer2Listener);
999 // Remove the isolation.
1001 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1002 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1003 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1004 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1006 // Previous leader should switch to Follower.
1008 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1009 state.getRaftState()));
1011 // Send PeerUp to peer1 and peer2.
1013 peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1014 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1016 // The previous leader should become the owner of entity1.
1018 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1020 // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1021 // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1022 // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1023 // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1024 // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1025 verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1026 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1027 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1029 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1030 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1032 // Verify entity2's owner doesn't change.
1034 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1035 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1037 verifyNoMoreInteractions(leaderListener);
1038 verifyNoMoreInteractions(peer1Listener);
1039 verifyNoMoreInteractions(peer2Listener);
1041 testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1045 public void testListenerRegistration() {
1046 testLog.info("testListenerRegistration starting");
1048 ShardTestKit kit = new ShardTestKit(getSystem());
1050 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1051 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1053 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1054 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1055 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1057 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1058 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1060 ShardTestKit.waitUntilLeader(leader);
1062 String otherEntityType = "otherEntityType";
1063 final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1064 final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1065 final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1066 final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1067 DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1069 // Register listener
1071 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1072 kit.expectMsgClass(SuccessReply.class);
1074 // Register a couple candidates for the desired entity type and verify listener is notified.
1076 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1077 kit.expectMsgClass(SuccessReply.class);
1079 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1081 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1082 kit.expectMsgClass(SuccessReply.class);
1084 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1087 // Register another candidate for another entity type and verify listener is not notified.
1089 leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1090 kit.expectMsgClass(SuccessReply.class);
1092 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1093 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1095 // Register remote candidate for entity1
1097 peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1098 kit.expectMsgClass(SuccessReply.class);
1099 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1101 // Unregister the local candidate for entity1 and verify listener is notified
1103 leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1104 kit.expectMsgClass(SuccessReply.class);
1106 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1109 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1111 leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1112 kit.expectMsgClass(SuccessReply.class);
1114 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1115 kit.expectMsgClass(SuccessReply.class);
1117 verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1118 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1119 verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1121 // Re-register the listener and verify it gets notified of currently owned entities
1125 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1126 kit.expectMsgClass(SuccessReply.class);
1128 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1129 ownershipChange(entity3, false, true, true)));
1130 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1131 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1132 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1134 testLog.info("testListenerRegistration ending");
1138 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
1139 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1141 ShardTestKit kit = new ShardTestKit(getSystem());
1142 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1143 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1145 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1146 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1148 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1149 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1150 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1152 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1153 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1154 leaderId.toString());
1156 ShardTestKit.waitUntilLeader(leader);
1158 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1160 // Add a remote candidate
1162 peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1163 kit.expectMsgClass(SuccessReply.class);
1167 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1168 kit.expectMsgClass(SuccessReply.class);
1170 // Verify the local candidate becomes owner
1172 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1173 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1174 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1176 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1180 public void testDelayedEntityOwnerSelection() {
1181 testLog.info("testDelayedEntityOwnerSelection starting");
1183 final ShardTestKit kit = new ShardTestKit(getSystem());
1184 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1185 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1187 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1189 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1190 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1191 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1193 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1194 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1195 peerId1.toString());
1196 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1198 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1199 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1200 peerId2.toString());
1201 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1203 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1204 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1205 builder.build()), leaderId.toString());
1207 ShardTestKit.waitUntilLeader(leader);
1209 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1211 // Add a remote candidate
1213 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1214 kit.expectMsgClass(SuccessReply.class);
1218 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1219 kit.expectMsgClass(SuccessReply.class);
1221 // Verify the local candidate becomes owner
1223 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1224 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1225 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1227 testLog.info("testDelayedEntityOwnerSelection ending");
1230 private Props newLocalShardProps() {
1231 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1234 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1235 final String memberName) {
1236 return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1239 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1240 final EntityOwnerSelectionStrategyConfig config) {
1241 return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1242 .withDispatcher(Dispatchers.DefaultDispatcherId());
1245 private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1246 final String memberName) {
1247 return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1248 dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
1249 MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1250 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1253 private Map<String, String> peerMap(final String... peerIds) {
1254 ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1255 for (String peerId: peerIds) {
1256 builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1259 return builder.build();
1262 private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1263 private final ActorRef collectorActor;
1264 private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1266 TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1268 this.collectorActor = collectorActor;
1271 @SuppressWarnings({ "unchecked", "rawtypes" })
1273 public void handleCommand(final Object message) {
1274 Predicate drop = dropMessagesOfType.get(message.getClass());
1275 if (drop == null || !drop.test(message)) {
1276 super.handleCommand(message);
1279 if (collectorActor != null) {
1280 collectorActor.tell(message, ActorRef.noSender());
1284 void startDroppingMessagesOfType(final Class<?> msgClass) {
1285 dropMessagesOfType.put(msgClass, msg -> true);
1288 <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1289 dropMessagesOfType.put(msgClass, filter);
1292 void stopDroppingMessagesOfType(final Class<?> msgClass) {
1293 dropMessagesOfType.remove(msgClass);
1296 ActorRef collectorActor() {
1297 return collectorActor;
1300 static Props props(final Builder builder) {
1301 return props(builder, null);
1304 static Props props(final Builder builder, final ActorRef collectorActor) {
1305 return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1306 .withDispatcher(Dispatchers.DefaultDispatcherId());