2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
11 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
12 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
13 import static org.junit.Assert.assertEquals;
14 import static org.mockito.AdditionalMatchers.or;
15 import static org.mockito.Matchers.any;
16 import static org.mockito.Mockito.mock;
17 import static org.mockito.Mockito.never;
18 import static org.mockito.Mockito.reset;
19 import static org.mockito.Mockito.timeout;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22 import static org.mockito.Mockito.verifyNoMoreInteractions;
23 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.JavaTestKit;
30 import akka.testkit.TestActorRef;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.function.Predicate;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
46 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
59 import org.opendaylight.controller.cluster.raft.RaftState;
60 import org.opendaylight.controller.cluster.raft.TestActorFactory;
61 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
65 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
66 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
69 import org.opendaylight.yangtools.yang.common.QName;
70 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
71 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
74 * Unit tests for EntityOwnershipShard.
76 * @author Thomas Pantelis
78 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
79 private static final String ENTITY_TYPE = "test type";
80 private static final YangInstanceIdentifier ENTITY_ID1 =
81 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
82 private static final YangInstanceIdentifier ENTITY_ID2 =
83 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
84 private static final YangInstanceIdentifier ENTITY_ID3 =
85 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
86 private static final YangInstanceIdentifier ENTITY_ID4 =
87 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
88 private static final YangInstanceIdentifier ENTITY_ID5 =
89 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
90 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
91 private static final String LOCAL_MEMBER_NAME = "local-member-1";
92 private static final String PEER_MEMBER_1_NAME = "peer-member-1";
93 private static final String PEER_MEMBER_2_NAME = "peer-member-2";
95 private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
96 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
99 public void tearDown() {
100 actorFactory.close();
104 public void testOnRegisterCandidateLocal() throws Exception {
105 testLog.info("testOnRegisterCandidateLocal starting");
107 ShardTestKit kit = new ShardTestKit(getSystem());
109 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
111 ShardTestKit.waitUntilLeader(shard);
113 YangInstanceIdentifier entityId = ENTITY_ID1;
114 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
116 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
117 kit.expectMsgClass(SuccessReply.class);
119 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
120 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122 testLog.info("testOnRegisterCandidateLocal ending");
126 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
127 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
129 ShardTestKit kit = new ShardTestKit(getSystem());
131 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
133 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
134 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
136 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
137 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
138 TestEntityOwnershipShard peerShard = peer.underlyingActor();
139 peerShard.startDroppingMessagesOfType(RequestVote.class);
140 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
142 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
143 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
145 YangInstanceIdentifier entityId = ENTITY_ID1;
146 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
148 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
149 kit.expectMsgClass(SuccessReply.class);
151 // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
152 peerShard.stopDroppingMessagesOfType(RequestVote.class);
154 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
155 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
157 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
161 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
162 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
164 ShardTestKit kit = new ShardTestKit(getSystem());
166 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
167 shardTransactionCommitTimeoutInSeconds(1);
169 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
170 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
172 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
173 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
174 TestEntityOwnershipShard peerShard = peer.underlyingActor();
175 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
177 // Drop AppendEntries so consensus isn't reached.
178 peerShard.startDroppingMessagesOfType(AppendEntries.class);
180 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
181 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
183 ShardTestKit.waitUntilLeader(leader);
185 YangInstanceIdentifier entityId = ENTITY_ID1;
186 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
188 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
189 kit.expectMsgClass(SuccessReply.class);
191 // Wait enough time for the commit to timeout.
192 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
194 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
195 // write being applied to the state.
196 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
198 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
199 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
201 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
205 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
206 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
208 ShardTestKit kit = new ShardTestKit(getSystem());
210 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
211 shardIsolatedLeaderCheckIntervalInMillis(50);
213 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
214 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
216 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
217 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
218 TestEntityOwnershipShard peerShard = peer.underlyingActor();
219 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
221 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
222 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
224 ShardTestKit.waitUntilLeader(leader);
226 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
227 peerShard.startDroppingMessagesOfType(AppendEntries.class);
228 verifyRaftState(leader, state ->
229 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
231 YangInstanceIdentifier entityId = ENTITY_ID1;
232 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
234 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
235 kit.expectMsgClass(SuccessReply.class);
237 // Resume AppendEntries - the candidate write should now be committed.
238 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
239 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
240 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
242 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
246 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
247 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
249 ShardTestKit kit = new ShardTestKit(getSystem());
251 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
252 shardBatchedModificationCount(5);
254 ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
255 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
256 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
257 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
258 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
259 TestEntityOwnershipShard leaderShard = leader.underlyingActor();
261 TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
262 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
263 local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
265 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
266 kit.expectMsgClass(SuccessReply.class);
268 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
269 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
271 // Test with initial commit timeout and subsequent retry.
273 local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
274 leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
276 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
277 kit.expectMsgClass(SuccessReply.class);
279 expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
281 // Send a bunch of registration messages quickly and verify.
283 leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
284 clearMessages(leaderShard.collectorActor());
287 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
288 for(int i = 1; i <= max; i++) {
289 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
291 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
294 for(int i = 0; i < max; i++) {
295 verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
298 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
302 public void testOnUnregisterCandidateLocal() throws Exception {
303 testLog.info("testOnUnregisterCandidateLocal starting");
305 ShardTestKit kit = new ShardTestKit(getSystem());
306 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
307 ShardTestKit.waitUntilLeader(shard);
309 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
313 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
314 kit.expectMsgClass(SuccessReply.class);
316 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
317 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
321 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
322 kit.expectMsgClass(SuccessReply.class);
324 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
328 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
329 kit.expectMsgClass(SuccessReply.class);
331 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
332 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
334 testLog.info("testOnUnregisterCandidateLocal ending");
338 public void testOwnershipChanges() throws Exception {
339 testLog.info("testOwnershipChanges starting");
341 ShardTestKit kit = new ShardTestKit(getSystem());
343 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
345 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
346 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
347 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
349 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
350 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
352 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
354 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
355 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
357 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
359 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
360 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
361 leaderId.toString());
363 ShardTestKit.waitUntilLeader(leader);
365 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
367 // Add a remote candidate
369 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
370 kit.expectMsgClass(SuccessReply.class);
372 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
376 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
377 kit.expectMsgClass(SuccessReply.class);
379 // Verify the remote candidate becomes owner
381 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
382 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
384 // Add another remote candidate and verify ownership doesn't change
386 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
387 kit.expectMsgClass(SuccessReply.class);
389 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
390 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
391 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
393 // Remove the second remote candidate and verify ownership doesn't change
395 peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
396 kit.expectMsgClass(SuccessReply.class);
398 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
399 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
400 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
402 // Remove the first remote candidate and verify the local candidate becomes owner
404 peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
405 kit.expectMsgClass(SuccessReply.class);
407 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
408 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
410 // Add the second remote candidate back and verify ownership doesn't change
412 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
413 kit.expectMsgClass(SuccessReply.class);
415 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
416 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
417 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
419 // Unregister the local candidate and verify the second remote candidate becomes owner
421 leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
422 kit.expectMsgClass(SuccessReply.class);
424 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
425 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
427 testLog.info("testOwnershipChanges ending");
431 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
432 testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
434 ShardTestKit kit = new ShardTestKit(getSystem());
436 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
437 shardIsolatedLeaderCheckIntervalInMillis(100000);
439 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
440 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
441 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
443 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
444 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
446 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
448 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
449 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
451 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
453 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
454 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
455 leaderId.toString());
457 verifyRaftState(leader, state ->
458 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
460 // Send PeerDown and PeerUp with no entities
462 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
463 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
465 // Add candidates for entity1 with the local leader as the owner
467 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
468 kit.expectMsgClass(SuccessReply.class);
469 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
471 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
472 kit.expectMsgClass(SuccessReply.class);
473 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
475 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
476 kit.expectMsgClass(SuccessReply.class);
477 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
478 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
480 // Add candidates for entity2 with peerMember2 as the owner
482 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
483 kit.expectMsgClass(SuccessReply.class);
484 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
486 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
487 kit.expectMsgClass(SuccessReply.class);
488 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
489 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
491 // Add candidates for entity3 with peerMember2 as the owner.
493 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
494 kit.expectMsgClass(SuccessReply.class);
495 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
497 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
498 kit.expectMsgClass(SuccessReply.class);
499 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
501 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
502 kit.expectMsgClass(SuccessReply.class);
503 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
504 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
506 // Add only candidate peerMember2 for entity4.
508 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
509 kit.expectMsgClass(SuccessReply.class);
510 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
511 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
513 // Add only candidate peerMember1 for entity5.
515 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
516 kit.expectMsgClass(SuccessReply.class);
517 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
518 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
520 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
524 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
525 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
528 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
529 // Send PeerDown again - should be noop
530 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
531 peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
533 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
534 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
535 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
536 // no other candidates for entity4 so peerMember2 should remain owner.
537 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
539 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
540 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
541 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
542 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
544 // Reinstate peerMember2
546 peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
547 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
549 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
550 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
551 // Send PeerUp again - should be noop
552 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
553 peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
555 // peerMember2's candidates should be removed on startup.
556 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
557 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
558 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
559 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
561 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
562 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
563 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
564 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
566 // Add back candidate peerMember2 for entities 1, 2, & 3.
568 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
569 kit.expectMsgClass(SuccessReply.class);
570 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
571 kit.expectMsgClass(SuccessReply.class);
572 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
573 kit.expectMsgClass(SuccessReply.class);
574 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
575 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
576 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
577 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
578 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
579 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
580 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
581 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
582 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
583 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
584 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
585 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
586 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
588 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
591 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
592 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
594 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
596 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
598 // Verify the reinstated peerMember2 is fully synced.
600 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
601 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
602 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
603 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
605 // Reinstate peerMember1 and verify no owner changes
607 peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
608 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
609 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
610 leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
612 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
613 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
614 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
615 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
617 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
618 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
619 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
621 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
622 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
623 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
625 // Verify the reinstated peerMember1 is fully synced.
627 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
628 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
629 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
630 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
632 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
633 // the entities (1 and 3) previously owned by the local leader member.
635 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
636 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
637 peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
640 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
641 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
643 peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
644 peer2.tell(TimeoutNow.INSTANCE, peer2);
646 verifyRaftState(peer2, state ->
647 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
649 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
650 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
651 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
652 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
654 testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
658 public void testLeaderIsolation() throws Exception {
659 testLog.info("testLeaderIsolation starting");
661 ShardTestKit kit = new ShardTestKit(getSystem());
663 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
664 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
665 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
667 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
668 shardIsolatedLeaderCheckIntervalInMillis(100000);
670 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
671 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
673 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
675 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
676 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
678 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
680 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
681 shardIsolatedLeaderCheckIntervalInMillis(500);
683 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
684 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
685 leaderId.toString());
687 ShardTestKit.waitUntilLeader(leader);
689 // Add entity1 candidates for all members with the leader as the owner
691 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
692 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
693 kit.expectMsgClass(SuccessReply.class);
694 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
696 peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
697 kit.expectMsgClass(SuccessReply.class);
698 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
700 peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
701 kit.expectMsgClass(SuccessReply.class);
702 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
704 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
705 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
706 verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
708 // Add entity2 candidates for all members with peer1 as the owner
710 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
711 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
712 kit.expectMsgClass(SuccessReply.class);
713 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
715 peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
716 kit.expectMsgClass(SuccessReply.class);
717 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
719 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
720 kit.expectMsgClass(SuccessReply.class);
721 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
723 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
724 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
725 verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
727 // Add entity3 candidates for all members with peer2 as the owner
729 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
730 peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
731 kit.expectMsgClass(SuccessReply.class);
732 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
734 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
735 kit.expectMsgClass(SuccessReply.class);
736 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
738 peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
739 kit.expectMsgClass(SuccessReply.class);
740 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
742 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
743 verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
744 verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
746 // Add listeners on all members
748 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
749 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
750 kit.expectMsgClass(SuccessReply.class);
751 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true),
752 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
753 reset(leaderListener);
755 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
756 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
757 kit.expectMsgClass(SuccessReply.class);
758 verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
759 ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true)));
760 reset(peer1Listener);
762 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
763 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
764 kit.expectMsgClass(SuccessReply.class);
765 verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
766 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true)));
767 reset(peer2Listener);
769 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
771 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
772 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
774 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
775 ae -> ae.getLeaderId().equals(leaderId.toString()));
776 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
778 // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
780 peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
782 // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
785 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
786 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
788 verifyRaftState(leader, state ->
789 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
791 // Expect inJeopardy notification on the isolated leader.
793 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true),
794 ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true)));
795 reset(leaderListener);
797 verifyRaftState(peer1, state ->
798 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
800 // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
803 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
805 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
807 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
808 reset(peer1Listener);
810 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
811 reset(peer2Listener);
813 // Remove the isolation.
815 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
816 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
817 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
818 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
820 // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
822 verifyRaftState(leader, state ->
823 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
825 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true),
826 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
828 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
829 verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
831 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
832 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
833 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
835 verifyNoMoreInteractions(leaderListener);
836 verifyNoMoreInteractions(peer1Listener);
837 verifyNoMoreInteractions(peer2Listener);
839 testLog.info("testLeaderIsolation ending");
843 public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
844 testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
846 ShardTestKit kit = new ShardTestKit(getSystem());
848 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
849 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
850 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
852 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
853 shardIsolatedLeaderCheckIntervalInMillis(100000);
855 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
856 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
857 actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString());
858 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
860 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
861 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
862 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
863 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
865 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
866 shardIsolatedLeaderCheckIntervalInMillis(500);
868 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
869 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
870 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
872 ShardTestKit.waitUntilLeader(leader);
874 // Add listeners on all members
876 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
877 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
878 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
879 kit.expectMsgClass(SuccessReply.class);
881 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
882 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
883 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
884 kit.expectMsgClass(SuccessReply.class);
886 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
887 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
888 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
889 kit.expectMsgClass(SuccessReply.class);
891 // Drop the CandidateAdded message to the leader for now.
893 leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
895 // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
896 // assigned the owner.
898 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
899 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
900 kit.expectMsgClass(SuccessReply.class);
901 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
902 verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
903 verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
905 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
906 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
907 kit.expectMsgClass(SuccessReply.class);
908 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
909 verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
910 verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
912 // Capture the CandidateAdded messages.
914 List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(), CandidateAdded.class, 2);
916 // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
917 // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
919 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
920 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
922 // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
924 leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
925 leader.tell(candidateAdded.get(0), leader);
926 leader.tell(candidateAdded.get(1), leader);
928 expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2, ae -> ae.getEntries().size() > 0);
930 // Verify no owner assigned.
932 verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
933 verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
935 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
937 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
938 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
940 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
941 ae -> ae.getLeaderId().equals(leaderId.toString()));
942 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
944 // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
946 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
947 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
949 // Verify the leader transitions to IsolatedLeader.
951 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
953 // Send PeerDown to the new leader peer1.
955 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
957 // Make peer1 start an election and become leader by sending the TimeoutNow message.
959 peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
961 // Verify the peer1 transitions to Leader.
963 verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
965 verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
966 verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
968 verifyNoMoreInteractions(peer1Listener);
969 verifyNoMoreInteractions(peer2Listener);
971 // Add candidate peer1 candidate for entity2.
973 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
975 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
976 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
977 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
979 reset(leaderListener, peer1Listener, peer2Listener);
981 // Remove the isolation.
983 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
984 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
985 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
986 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
988 // Previous leader should switch to Follower.
990 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
992 // Send PeerUp to peer1 and peer2.
994 peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
995 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
997 // The previous leader should become the owner of entity1.
999 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1001 // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1002 // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1003 // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1004 // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1005 // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1006 verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(or(ownershipChange(entity1, false, false, false),
1007 ownershipChange(entity2, false, false, false)), or(ownershipChange(entity1, false, true, true),
1008 ownershipChange(entity2, false, false, true))));
1010 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1011 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1013 // Verify entity2's owner doesn't change.
1015 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1016 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1018 verifyNoMoreInteractions(leaderListener);
1019 verifyNoMoreInteractions(peer1Listener);
1020 verifyNoMoreInteractions(peer2Listener);
1022 testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1026 public void testListenerRegistration() throws Exception {
1027 testLog.info("testListenerRegistration starting");
1029 ShardTestKit kit = new ShardTestKit(getSystem());
1031 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1032 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1034 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1035 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1036 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1038 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1039 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1041 ShardTestKit.waitUntilLeader(leader);
1043 String otherEntityType = "otherEntityType";
1044 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1045 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1046 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1047 DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1048 DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1050 // Register listener
1052 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1053 kit.expectMsgClass(SuccessReply.class);
1055 // Register a couple candidates for the desired entity type and verify listener is notified.
1057 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1058 kit.expectMsgClass(SuccessReply.class);
1060 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1062 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1063 kit.expectMsgClass(SuccessReply.class);
1065 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1068 // Register another candidate for another entity type and verify listener is not notified.
1070 leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1071 kit.expectMsgClass(SuccessReply.class);
1073 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1074 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1076 // Register remote candidate for entity1
1078 peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1079 kit.expectMsgClass(SuccessReply.class);
1080 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1082 // Unregister the local candidate for entity1 and verify listener is notified
1084 leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1085 kit.expectMsgClass(SuccessReply.class);
1087 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1090 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1092 leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1093 kit.expectMsgClass(SuccessReply.class);
1095 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1096 kit.expectMsgClass(SuccessReply.class);
1098 verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1099 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1100 verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1102 // Re-register the listener and verify it gets notified of currently owned entities
1106 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1107 kit.expectMsgClass(SuccessReply.class);
1109 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1110 ownershipChange(entity3, false, true, true)));
1111 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1112 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1113 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1115 testLog.info("testListenerRegistration ending");
1119 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
1120 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1122 ShardTestKit kit = new ShardTestKit(getSystem());
1123 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
1124 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1126 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1127 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1129 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1130 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1131 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1133 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1134 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString());
1136 ShardTestKit.waitUntilLeader(leader);
1138 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1140 // Add a remote candidate
1142 peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1143 kit.expectMsgClass(SuccessReply.class);
1147 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1148 kit.expectMsgClass(SuccessReply.class);
1150 // Verify the local candidate becomes owner
1152 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1153 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1154 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1156 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1160 public void testDelayedEntityOwnerSelection() throws Exception {
1161 testLog.info("testDelayedEntityOwnerSelection starting");
1163 ShardTestKit kit = new ShardTestKit(getSystem());
1164 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
1165 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1167 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1169 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1170 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1171 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1173 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1174 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1175 peerId1.toString());
1176 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1178 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1179 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1180 peerId2.toString());
1181 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1183 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1184 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()),
1185 leaderId.toString());
1187 ShardTestKit.waitUntilLeader(leader);
1189 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1191 // Add a remote candidate
1193 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1194 kit.expectMsgClass(SuccessReply.class);
1198 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1199 kit.expectMsgClass(SuccessReply.class);
1201 // Verify the local candidate becomes owner
1203 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1204 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1205 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1207 testLog.info("testDelayedEntityOwnerSelection ending");
1210 private Props newLocalShardProps() {
1211 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1214 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
1215 return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1218 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
1219 EntityOwnerSelectionStrategyConfig config) {
1220 return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1221 .withDispatcher(Dispatchers.DefaultDispatcherId());
1224 private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
1225 String memberName) {
1226 return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1227 dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
1228 MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1229 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1232 private Map<String, String> peerMap(String... peerIds) {
1233 ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1234 for(String peerId: peerIds) {
1235 builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1238 return builder.build();
1241 private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1242 private final TestActorRef<MessageCollectorActor> collectorActor;
1243 private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1245 TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1247 this.collectorActor = collectorActor;
1250 @SuppressWarnings({ "unchecked", "rawtypes" })
1252 public void handleCommand(Object message) {
1253 Predicate drop = dropMessagesOfType.get(message.getClass());
1254 if(drop == null || !drop.test(message)) {
1255 super.handleCommand(message);
1258 if(collectorActor != null) {
1259 collectorActor.tell(message, ActorRef.noSender());
1263 void startDroppingMessagesOfType(Class<?> msgClass) {
1264 dropMessagesOfType.put(msgClass, msg -> true);
1267 <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
1268 dropMessagesOfType.put(msgClass, filter);
1271 void stopDroppingMessagesOfType(Class<?> msgClass) {
1272 dropMessagesOfType.remove(msgClass);
1275 TestActorRef<MessageCollectorActor> collectorActor() {
1276 return collectorActor;
1279 static Props props(Builder builder) {
1280 return props(builder, null);
1283 static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1284 return Props.create(TestEntityOwnershipShard.class, builder, collectorActor).
1285 withDispatcher(Dispatchers.DefaultDispatcherId());