2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.List;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicLong;
39 import java.util.function.Predicate;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
46 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
59 import org.opendaylight.controller.cluster.raft.RaftState;
60 import org.opendaylight.controller.cluster.raft.TestActorFactory;
61 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 * Unit tests for EntityOwnershipShard.
77 * @author Thomas Pantelis
79 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
80 private static final String ENTITY_TYPE = "test type";
81 private static final YangInstanceIdentifier ENTITY_ID1 =
82 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
83 private static final YangInstanceIdentifier ENTITY_ID2 =
84 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
85 private static final YangInstanceIdentifier ENTITY_ID3 =
86 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
87 private static final YangInstanceIdentifier ENTITY_ID4 =
88 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
89 private static final YangInstanceIdentifier ENTITY_ID5 =
90 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
91 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
92 private static final String LOCAL_MEMBER_NAME = "local-member-1";
93 private static final String PEER_MEMBER_1_NAME = "peer-member-1";
94 private static final String PEER_MEMBER_2_NAME = "peer-member-2";
96 private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
97 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
100 public void tearDown() {
101 actorFactory.close();
105 public void testOnRegisterCandidateLocal() {
106 testLog.info("testOnRegisterCandidateLocal starting");
108 ShardTestKit kit = new ShardTestKit(getSystem());
110 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
112 ShardTestKit.waitUntilLeader(shard);
114 YangInstanceIdentifier entityId = ENTITY_ID1;
115 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
117 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
118 kit.expectMsgClass(SuccessReply.class);
120 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
123 testLog.info("testOnRegisterCandidateLocal ending");
127 public void testOnRegisterCandidateLocalWithNoInitialLeader() {
128 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
130 final ShardTestKit kit = new ShardTestKit(getSystem());
132 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
134 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
135 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
137 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
138 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
139 TestEntityOwnershipShard peerShard = peer.underlyingActor();
140 peerShard.startDroppingMessagesOfType(RequestVote.class);
141 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
143 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
144 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
146 YangInstanceIdentifier entityId = ENTITY_ID1;
147 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
149 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
150 kit.expectMsgClass(SuccessReply.class);
152 // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
153 peerShard.stopDroppingMessagesOfType(RequestVote.class);
155 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
156 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
158 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
162 public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
163 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
165 final ShardTestKit kit = new ShardTestKit(getSystem());
167 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
168 .shardTransactionCommitTimeoutInSeconds(1);
170 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
171 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
173 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
174 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
175 TestEntityOwnershipShard peerShard = peer.underlyingActor();
176 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
178 // Drop AppendEntries so consensus isn't reached.
179 peerShard.startDroppingMessagesOfType(AppendEntries.class);
181 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
182 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
184 ShardTestKit.waitUntilLeader(leader);
186 YangInstanceIdentifier entityId = ENTITY_ID1;
187 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
189 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
190 kit.expectMsgClass(SuccessReply.class);
192 // Wait enough time for the commit to timeout.
193 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
195 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
196 // write being applied to the state.
197 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
199 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
202 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
206 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
207 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
209 final ShardTestKit kit = new ShardTestKit(getSystem());
211 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
212 .shardIsolatedLeaderCheckIntervalInMillis(50);
214 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
215 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
217 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
218 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
219 TestEntityOwnershipShard peerShard = peer.underlyingActor();
220 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
222 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
223 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
225 ShardTestKit.waitUntilLeader(leader);
227 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
228 peerShard.startDroppingMessagesOfType(AppendEntries.class);
229 verifyRaftState(leader, state ->
230 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
232 YangInstanceIdentifier entityId = ENTITY_ID1;
233 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
235 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
236 kit.expectMsgClass(SuccessReply.class);
238 // Resume AppendEntries - the candidate write should now be committed.
239 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
240 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
241 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
243 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
247 public void testOnRegisterCandidateLocalWithRemoteLeader() {
248 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
250 ShardTestKit kit = new ShardTestKit(getSystem());
252 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
253 .shardBatchedModificationCount(5);
255 ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
256 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
257 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
258 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
259 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
260 final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
262 TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
263 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
264 local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
266 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
267 kit.expectMsgClass(SuccessReply.class);
269 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
270 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
272 // Test with initial commit timeout and subsequent retry.
274 local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
275 leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
277 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
278 kit.expectMsgClass(SuccessReply.class);
280 expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
282 // Send a bunch of registration messages quickly and verify.
284 leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
285 clearMessages(leaderShard.collectorActor());
288 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
289 for (int i = 1; i <= max; i++) {
290 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
292 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
295 for (int i = 0; i < max; i++) {
296 verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
299 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
303 public void testOnUnregisterCandidateLocal() {
304 testLog.info("testOnUnregisterCandidateLocal starting");
306 ShardTestKit kit = new ShardTestKit(getSystem());
307 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
308 ShardTestKit.waitUntilLeader(shard);
310 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
314 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
315 kit.expectMsgClass(SuccessReply.class);
317 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
318 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
322 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
323 kit.expectMsgClass(SuccessReply.class);
325 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
329 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
330 kit.expectMsgClass(SuccessReply.class);
332 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
333 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
335 testLog.info("testOnUnregisterCandidateLocal ending");
339 public void testOwnershipChanges() {
340 testLog.info("testOwnershipChanges starting");
342 final ShardTestKit kit = new ShardTestKit(getSystem());
344 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
346 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
347 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
348 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
350 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
351 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
353 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
355 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
356 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
358 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
360 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
361 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
362 leaderId.toString());
364 ShardTestKit.waitUntilLeader(leader);
366 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
368 // Add a remote candidate
370 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
371 kit.expectMsgClass(SuccessReply.class);
373 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
377 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
378 kit.expectMsgClass(SuccessReply.class);
380 // Verify the remote candidate becomes owner
382 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
383 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
385 // Add another remote candidate and verify ownership doesn't change
387 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
388 kit.expectMsgClass(SuccessReply.class);
390 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
391 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
392 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
394 // Remove the second remote candidate and verify ownership doesn't change
396 peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
397 kit.expectMsgClass(SuccessReply.class);
399 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
400 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
401 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
403 // Remove the first remote candidate and verify the local candidate becomes owner
405 peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
406 kit.expectMsgClass(SuccessReply.class);
408 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
409 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
411 // Add the second remote candidate back and verify ownership doesn't change
413 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
414 kit.expectMsgClass(SuccessReply.class);
416 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
417 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
418 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
420 // Unregister the local candidate and verify the second remote candidate becomes owner
422 leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
423 kit.expectMsgClass(SuccessReply.class);
425 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
426 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
428 testLog.info("testOwnershipChanges ending");
432 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
433 testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
435 final ShardTestKit kit = new ShardTestKit(getSystem());
437 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
438 .shardIsolatedLeaderCheckIntervalInMillis(100000);
440 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
441 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
442 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
444 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
445 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
447 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
449 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
450 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
452 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
454 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
455 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
456 leaderId.toString());
458 verifyRaftState(leader, state ->
459 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
461 // Send PeerDown and PeerUp with no entities
463 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
464 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
466 // Add candidates for entity1 with the local leader as the owner
468 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
469 kit.expectMsgClass(SuccessReply.class);
470 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
472 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
473 kit.expectMsgClass(SuccessReply.class);
474 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
476 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
477 kit.expectMsgClass(SuccessReply.class);
478 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
479 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
481 // Add candidates for entity2 with peerMember2 as the owner
483 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
484 kit.expectMsgClass(SuccessReply.class);
485 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
487 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
488 kit.expectMsgClass(SuccessReply.class);
489 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
490 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
492 // Add candidates for entity3 with peerMember2 as the owner.
494 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
495 kit.expectMsgClass(SuccessReply.class);
496 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
498 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
499 kit.expectMsgClass(SuccessReply.class);
500 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
502 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
503 kit.expectMsgClass(SuccessReply.class);
504 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
505 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
507 // Add only candidate peerMember2 for entity4.
509 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
510 kit.expectMsgClass(SuccessReply.class);
511 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
512 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
514 // Add only candidate peerMember1 for entity5.
516 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
517 kit.expectMsgClass(SuccessReply.class);
518 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
519 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
521 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
525 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
526 kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
529 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
530 // Send PeerDown again - should be noop
531 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
532 peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
534 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
535 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
536 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
537 // no other candidates for entity4 so peerMember2 should remain owner.
538 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
540 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
541 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
542 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
543 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
545 // Reinstate peerMember2
547 peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
548 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
550 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
551 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
552 // Send PeerUp again - should be noop
553 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
554 peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
556 // peerMember2's candidates should be removed on startup.
557 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
558 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
559 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
560 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
562 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
563 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
564 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
565 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
567 // Add back candidate peerMember2 for entities 1, 2, & 3.
569 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
570 kit.expectMsgClass(SuccessReply.class);
571 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
572 kit.expectMsgClass(SuccessReply.class);
573 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
574 kit.expectMsgClass(SuccessReply.class);
575 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
576 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
577 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
578 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
579 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
580 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
581 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
582 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
583 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
584 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
585 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
586 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
587 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
589 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
592 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
593 kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
595 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
597 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
599 // Verify the reinstated peerMember2 is fully synced.
601 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
602 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
603 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
604 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
606 // Reinstate peerMember1 and verify no owner changes
608 peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
609 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
610 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
611 leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
613 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
614 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
615 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
616 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
618 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
619 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
620 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
622 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
623 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
624 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
626 // Verify the reinstated peerMember1 is fully synced.
628 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
629 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
630 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
631 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
633 AtomicLong leaderLastApplied = new AtomicLong();
634 verifyRaftState(leader, rs -> {
635 assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
636 leaderLastApplied.set(rs.getLastApplied());
639 verifyRaftState(peer2, rs -> {
640 assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex());
643 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
644 // the entities (1 and 3) previously owned by the local leader member.
646 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
647 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
648 peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
651 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
652 kit.expectMsgClass(kit.duration("5 seconds"), Terminated.class);
654 peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
655 peer2.tell(TimeoutNow.INSTANCE, peer2);
657 verifyRaftState(peer2, state ->
658 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
660 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
661 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
662 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
663 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
665 testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
669 public void testLeaderIsolation() throws Exception {
670 testLog.info("testLeaderIsolation starting");
672 final ShardTestKit kit = new ShardTestKit(getSystem());
674 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
675 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
676 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
678 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
679 .shardIsolatedLeaderCheckIntervalInMillis(100000);
681 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
682 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
684 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
686 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
687 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
689 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
691 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
692 .shardIsolatedLeaderCheckIntervalInMillis(500);
694 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
695 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
696 leaderId.toString());
698 ShardTestKit.waitUntilLeader(leader);
700 // Add entity1 candidates for all members with the leader as the owner
702 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
703 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
704 kit.expectMsgClass(SuccessReply.class);
705 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
707 peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
708 kit.expectMsgClass(SuccessReply.class);
709 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
711 peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
712 kit.expectMsgClass(SuccessReply.class);
713 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
715 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
716 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
717 verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
719 // Add entity2 candidates for all members with peer1 as the owner
721 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
722 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
723 kit.expectMsgClass(SuccessReply.class);
724 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
726 peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
727 kit.expectMsgClass(SuccessReply.class);
728 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
730 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
731 kit.expectMsgClass(SuccessReply.class);
732 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
734 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
735 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
736 verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
738 // Add entity3 candidates for all members with peer2 as the owner
740 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
741 peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
742 kit.expectMsgClass(SuccessReply.class);
743 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
745 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
746 kit.expectMsgClass(SuccessReply.class);
747 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
749 peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
750 kit.expectMsgClass(SuccessReply.class);
751 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
753 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
754 verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
755 verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
757 // Add listeners on all members
759 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
760 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
761 kit.expectMsgClass(SuccessReply.class);
762 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
763 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
764 ownershipChange(entity3, false, false, true)));
765 reset(leaderListener);
767 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
768 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
769 kit.expectMsgClass(SuccessReply.class);
770 verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
771 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
772 ownershipChange(entity3, false, false, true)));
773 reset(peer1Listener);
775 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
776 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
777 kit.expectMsgClass(SuccessReply.class);
778 verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
779 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
780 ownershipChange(entity3, false, true, true)));
781 reset(peer2Listener);
783 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
785 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
786 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
788 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
789 ae -> ae.getLeaderId().equals(leaderId.toString()));
790 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
792 // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
794 peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
796 // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
799 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
800 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
802 verifyRaftState(leader, state ->
803 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
805 // Expect inJeopardy notification on the isolated leader.
807 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
808 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
809 ownershipChange(entity3, false, false, true, true)));
810 reset(leaderListener);
812 verifyRaftState(peer1, state ->
813 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
815 // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
818 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
820 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
822 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
823 reset(peer1Listener);
825 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
826 reset(peer2Listener);
828 // Remove the isolation.
830 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
831 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
832 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
833 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
835 // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
837 verifyRaftState(leader, state ->
838 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
840 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
841 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
842 ownershipChange(entity3, false, false, true)));
844 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
845 verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
847 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
848 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
849 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
851 verifyNoMoreInteractions(leaderListener);
852 verifyNoMoreInteractions(peer1Listener);
853 verifyNoMoreInteractions(peer2Listener);
855 testLog.info("testLeaderIsolation ending");
859 public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
860 testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
862 final ShardTestKit kit = new ShardTestKit(getSystem());
864 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
865 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
866 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
868 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
869 .shardIsolatedLeaderCheckIntervalInMillis(100000);
871 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
872 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
873 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
874 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
876 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
877 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
878 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
879 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
881 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
882 .shardIsolatedLeaderCheckIntervalInMillis(500);
884 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
885 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
886 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
888 ShardTestKit.waitUntilLeader(leader);
890 // Add listeners on all members
892 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
893 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
894 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
895 kit.expectMsgClass(SuccessReply.class);
897 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
898 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
899 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
900 kit.expectMsgClass(SuccessReply.class);
902 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
903 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
904 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
905 kit.expectMsgClass(SuccessReply.class);
907 // Drop the CandidateAdded message to the leader for now.
909 leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
911 // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
912 // assigned the owner.
914 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
915 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
916 kit.expectMsgClass(SuccessReply.class);
917 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
918 verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
919 verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
921 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
922 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
923 kit.expectMsgClass(SuccessReply.class);
924 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
925 verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
926 verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
928 // Capture the CandidateAdded messages.
930 final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
931 CandidateAdded.class, 2);
933 // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
934 // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
936 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
937 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
939 // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
941 leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
942 leader.tell(candidateAdded.get(0), leader);
943 leader.tell(candidateAdded.get(1), leader);
945 expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
946 ae -> ae.getEntries().size() > 0);
948 // Verify no owner assigned.
950 verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
951 verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
953 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
955 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
956 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
958 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
959 ae -> ae.getLeaderId().equals(leaderId.toString()));
960 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
962 // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
964 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
965 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
967 // Verify the leader transitions to IsolatedLeader.
969 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
970 state.getRaftState()));
972 // Send PeerDown to the new leader peer1.
974 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
976 // Make peer1 start an election and become leader by sending the TimeoutNow message.
978 peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
980 // Verify the peer1 transitions to Leader.
982 verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
983 state.getRaftState()));
985 verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
986 verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
988 verifyNoMoreInteractions(peer1Listener);
989 verifyNoMoreInteractions(peer2Listener);
991 // Add candidate peer1 candidate for entity2.
993 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
995 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
996 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
997 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
999 reset(leaderListener, peer1Listener, peer2Listener);
1001 // Remove the isolation.
1003 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1004 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1005 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1006 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1008 // Previous leader should switch to Follower.
1010 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1011 state.getRaftState()));
1013 // Send PeerUp to peer1 and peer2.
1015 peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1016 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1018 // The previous leader should become the owner of entity1.
1020 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1022 // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1023 // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1024 // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1025 // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1026 // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1027 verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1028 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1029 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1031 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1032 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1034 // Verify entity2's owner doesn't change.
1036 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1037 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1039 verifyNoMoreInteractions(leaderListener);
1040 verifyNoMoreInteractions(peer1Listener);
1041 verifyNoMoreInteractions(peer2Listener);
1043 testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1047 public void testListenerRegistration() {
1048 testLog.info("testListenerRegistration starting");
1050 ShardTestKit kit = new ShardTestKit(getSystem());
1052 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1053 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1055 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1056 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1057 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1059 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1060 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1062 ShardTestKit.waitUntilLeader(leader);
1064 String otherEntityType = "otherEntityType";
1065 final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1066 final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1067 final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1068 final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1069 DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1071 // Register listener
1073 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1074 kit.expectMsgClass(SuccessReply.class);
1076 // Register a couple candidates for the desired entity type and verify listener is notified.
1078 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1079 kit.expectMsgClass(SuccessReply.class);
1081 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1083 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1084 kit.expectMsgClass(SuccessReply.class);
1086 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1089 // Register another candidate for another entity type and verify listener is not notified.
1091 leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1092 kit.expectMsgClass(SuccessReply.class);
1094 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1095 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1097 // Register remote candidate for entity1
1099 peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1100 kit.expectMsgClass(SuccessReply.class);
1101 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1103 // Unregister the local candidate for entity1 and verify listener is notified
1105 leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1106 kit.expectMsgClass(SuccessReply.class);
1108 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1111 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1113 leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1114 kit.expectMsgClass(SuccessReply.class);
1116 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1117 kit.expectMsgClass(SuccessReply.class);
1119 verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1120 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1121 verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1123 // Re-register the listener and verify it gets notified of currently owned entities
1127 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1128 kit.expectMsgClass(SuccessReply.class);
1130 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1131 ownershipChange(entity3, false, true, true)));
1132 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1133 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1134 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1136 testLog.info("testListenerRegistration ending");
1140 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
1141 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1143 ShardTestKit kit = new ShardTestKit(getSystem());
1144 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1145 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1147 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1148 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1150 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1151 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1152 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1154 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1155 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1156 leaderId.toString());
1158 ShardTestKit.waitUntilLeader(leader);
1160 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1162 // Add a remote candidate
1164 peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1165 kit.expectMsgClass(SuccessReply.class);
1169 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1170 kit.expectMsgClass(SuccessReply.class);
1172 // Verify the local candidate becomes owner
1174 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1175 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1176 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1178 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1182 public void testDelayedEntityOwnerSelection() {
1183 testLog.info("testDelayedEntityOwnerSelection starting");
1185 final ShardTestKit kit = new ShardTestKit(getSystem());
1186 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1187 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1189 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1191 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1192 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1193 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1195 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1196 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1197 peerId1.toString());
1198 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1200 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1201 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1202 peerId2.toString());
1203 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1205 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1206 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1207 builder.build()), leaderId.toString());
1209 ShardTestKit.waitUntilLeader(leader);
1211 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1213 // Add a remote candidate
1215 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1216 kit.expectMsgClass(SuccessReply.class);
1220 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1221 kit.expectMsgClass(SuccessReply.class);
1223 // Verify the local candidate becomes owner
1225 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1226 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1227 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1229 testLog.info("testDelayedEntityOwnerSelection ending");
1232 private Props newLocalShardProps() {
1233 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1236 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1237 final String memberName) {
1238 return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1241 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1242 final EntityOwnerSelectionStrategyConfig config) {
1243 return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1244 .withDispatcher(Dispatchers.DefaultDispatcherId());
1247 private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1248 final String memberName) {
1249 return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1250 dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
1251 MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1252 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1255 private Map<String, String> peerMap(final String... peerIds) {
1256 ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1257 for (String peerId: peerIds) {
1258 builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1261 return builder.build();
1264 private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1265 private final ActorRef collectorActor;
1266 private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1268 TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1270 this.collectorActor = collectorActor;
1273 @SuppressWarnings({ "unchecked", "rawtypes" })
1275 public void handleCommand(final Object message) {
1276 Predicate drop = dropMessagesOfType.get(message.getClass());
1277 if (drop == null || !drop.test(message)) {
1278 super.handleCommand(message);
1281 if (collectorActor != null) {
1282 collectorActor.tell(message, ActorRef.noSender());
1286 void startDroppingMessagesOfType(final Class<?> msgClass) {
1287 dropMessagesOfType.put(msgClass, msg -> true);
1290 <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1291 dropMessagesOfType.put(msgClass, filter);
1294 void stopDroppingMessagesOfType(final Class<?> msgClass) {
1295 dropMessagesOfType.remove(msgClass);
1298 ActorRef collectorActor() {
1299 return collectorActor;
1302 static Props props(final Builder builder) {
1303 return props(builder, null);
1306 static Props props(final Builder builder, final ActorRef collectorActor) {
1307 return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1308 .withDispatcher(Dispatchers.DefaultDispatcherId());