2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.JavaTestKit;
30 import akka.testkit.TestActorRef;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.function.Predicate;
40 import org.junit.After;
41 import org.junit.Test;
42 import org.opendaylight.controller.cluster.access.concepts.MemberName;
43 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
45 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
46 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
53 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
54 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
55 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
58 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
59 import org.opendaylight.controller.cluster.raft.RaftState;
60 import org.opendaylight.controller.cluster.raft.TestActorFactory;
61 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
62 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
63 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
64 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
65 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
66 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
72 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
75 * Unit tests for EntityOwnershipShard.
77 * @author Thomas Pantelis
79 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
80 private static final String ENTITY_TYPE = "test type";
81 private static final YangInstanceIdentifier ENTITY_ID1 =
82 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
83 private static final YangInstanceIdentifier ENTITY_ID2 =
84 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
85 private static final YangInstanceIdentifier ENTITY_ID3 =
86 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
87 private static final YangInstanceIdentifier ENTITY_ID4 =
88 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
89 private static final YangInstanceIdentifier ENTITY_ID5 =
90 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
91 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
92 private static final String LOCAL_MEMBER_NAME = "local-member-1";
93 private static final String PEER_MEMBER_1_NAME = "peer-member-1";
94 private static final String PEER_MEMBER_2_NAME = "peer-member-2";
96 private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
97 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
100 public void tearDown() {
101 actorFactory.close();
105 public void testOnRegisterCandidateLocal() throws Exception {
106 testLog.info("testOnRegisterCandidateLocal starting");
108 ShardTestKit kit = new ShardTestKit(getSystem());
110 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
112 ShardTestKit.waitUntilLeader(shard);
114 YangInstanceIdentifier entityId = ENTITY_ID1;
115 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
117 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
118 kit.expectMsgClass(SuccessReply.class);
120 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
123 testLog.info("testOnRegisterCandidateLocal ending");
127 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
128 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
130 final ShardTestKit kit = new ShardTestKit(getSystem());
132 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
134 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
135 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
137 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
138 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
139 TestEntityOwnershipShard peerShard = peer.underlyingActor();
140 peerShard.startDroppingMessagesOfType(RequestVote.class);
141 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
143 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
144 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
146 YangInstanceIdentifier entityId = ENTITY_ID1;
147 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
149 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
150 kit.expectMsgClass(SuccessReply.class);
152 // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
153 peerShard.stopDroppingMessagesOfType(RequestVote.class);
155 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
156 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
158 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
162 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
163 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
165 final ShardTestKit kit = new ShardTestKit(getSystem());
167 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
168 .shardTransactionCommitTimeoutInSeconds(1);
170 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
171 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
173 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
174 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
175 TestEntityOwnershipShard peerShard = peer.underlyingActor();
176 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
178 // Drop AppendEntries so consensus isn't reached.
179 peerShard.startDroppingMessagesOfType(AppendEntries.class);
181 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
182 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
184 ShardTestKit.waitUntilLeader(leader);
186 YangInstanceIdentifier entityId = ENTITY_ID1;
187 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
189 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
190 kit.expectMsgClass(SuccessReply.class);
192 // Wait enough time for the commit to timeout.
193 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
195 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
196 // write being applied to the state.
197 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
199 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
202 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
206 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
207 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
209 final ShardTestKit kit = new ShardTestKit(getSystem());
211 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
212 .shardIsolatedLeaderCheckIntervalInMillis(50);
214 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
215 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
217 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
218 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
219 TestEntityOwnershipShard peerShard = peer.underlyingActor();
220 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
222 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
223 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
225 ShardTestKit.waitUntilLeader(leader);
227 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
228 peerShard.startDroppingMessagesOfType(AppendEntries.class);
229 verifyRaftState(leader, state ->
230 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
232 YangInstanceIdentifier entityId = ENTITY_ID1;
233 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
235 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
236 kit.expectMsgClass(SuccessReply.class);
238 // Resume AppendEntries - the candidate write should now be committed.
239 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
240 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
241 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
243 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
247 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
248 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
250 ShardTestKit kit = new ShardTestKit(getSystem());
252 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
253 .shardBatchedModificationCount(5);
255 ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
256 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
257 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
258 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
259 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
260 final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
262 TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
263 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
264 local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
266 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
267 kit.expectMsgClass(SuccessReply.class);
269 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
270 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
272 // Test with initial commit timeout and subsequent retry.
274 local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
275 leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
277 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
278 kit.expectMsgClass(SuccessReply.class);
280 expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
282 // Send a bunch of registration messages quickly and verify.
284 leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
285 clearMessages(leaderShard.collectorActor());
288 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
289 for (int i = 1; i <= max; i++) {
290 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
292 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
295 for (int i = 0; i < max; i++) {
296 verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
299 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
303 public void testOnUnregisterCandidateLocal() throws Exception {
304 testLog.info("testOnUnregisterCandidateLocal starting");
306 ShardTestKit kit = new ShardTestKit(getSystem());
307 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
308 ShardTestKit.waitUntilLeader(shard);
310 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
314 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
315 kit.expectMsgClass(SuccessReply.class);
317 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
318 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
322 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
323 kit.expectMsgClass(SuccessReply.class);
325 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
329 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
330 kit.expectMsgClass(SuccessReply.class);
332 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
333 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
335 testLog.info("testOnUnregisterCandidateLocal ending");
339 public void testOwnershipChanges() throws Exception {
340 testLog.info("testOwnershipChanges starting");
342 final ShardTestKit kit = new ShardTestKit(getSystem());
344 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
346 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
347 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
348 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
350 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
351 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
353 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
355 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
356 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
358 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
360 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
361 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
362 leaderId.toString());
364 ShardTestKit.waitUntilLeader(leader);
366 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
368 // Add a remote candidate
370 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
371 kit.expectMsgClass(SuccessReply.class);
373 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
377 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
378 kit.expectMsgClass(SuccessReply.class);
380 // Verify the remote candidate becomes owner
382 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
383 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
385 // Add another remote candidate and verify ownership doesn't change
387 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
388 kit.expectMsgClass(SuccessReply.class);
390 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
391 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
392 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
394 // Remove the second remote candidate and verify ownership doesn't change
396 peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
397 kit.expectMsgClass(SuccessReply.class);
399 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
400 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
401 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
403 // Remove the first remote candidate and verify the local candidate becomes owner
405 peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
406 kit.expectMsgClass(SuccessReply.class);
408 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
409 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
411 // Add the second remote candidate back and verify ownership doesn't change
413 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
414 kit.expectMsgClass(SuccessReply.class);
416 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
417 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
418 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
420 // Unregister the local candidate and verify the second remote candidate becomes owner
422 leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
423 kit.expectMsgClass(SuccessReply.class);
425 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
426 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
428 testLog.info("testOwnershipChanges ending");
432 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
433 testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
435 final ShardTestKit kit = new ShardTestKit(getSystem());
437 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
438 .shardIsolatedLeaderCheckIntervalInMillis(100000);
440 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
441 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
442 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
444 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
445 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
447 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
449 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
450 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
452 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
454 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
455 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
456 leaderId.toString());
458 verifyRaftState(leader, state ->
459 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
461 // Send PeerDown and PeerUp with no entities
463 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
464 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
466 // Add candidates for entity1 with the local leader as the owner
468 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
469 kit.expectMsgClass(SuccessReply.class);
470 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
472 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
473 kit.expectMsgClass(SuccessReply.class);
474 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
476 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
477 kit.expectMsgClass(SuccessReply.class);
478 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
479 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
481 // Add candidates for entity2 with peerMember2 as the owner
483 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
484 kit.expectMsgClass(SuccessReply.class);
485 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
487 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
488 kit.expectMsgClass(SuccessReply.class);
489 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
490 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
492 // Add candidates for entity3 with peerMember2 as the owner.
494 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
495 kit.expectMsgClass(SuccessReply.class);
496 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
498 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
499 kit.expectMsgClass(SuccessReply.class);
500 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
502 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
503 kit.expectMsgClass(SuccessReply.class);
504 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
505 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
507 // Add only candidate peerMember2 for entity4.
509 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
510 kit.expectMsgClass(SuccessReply.class);
511 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
512 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
514 // Add only candidate peerMember1 for entity5.
516 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
517 kit.expectMsgClass(SuccessReply.class);
518 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
519 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
521 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
525 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
526 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
529 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
530 // Send PeerDown again - should be noop
531 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
532 peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
534 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
535 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
536 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
537 // no other candidates for entity4 so peerMember2 should remain owner.
538 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
540 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
541 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
542 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
543 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
545 // Reinstate peerMember2
547 peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
548 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
550 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
551 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
552 // Send PeerUp again - should be noop
553 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
554 peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
556 // peerMember2's candidates should be removed on startup.
557 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
558 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
559 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
560 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
562 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
563 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
564 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
565 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
567 // Add back candidate peerMember2 for entities 1, 2, & 3.
569 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
570 kit.expectMsgClass(SuccessReply.class);
571 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
572 kit.expectMsgClass(SuccessReply.class);
573 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
574 kit.expectMsgClass(SuccessReply.class);
575 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
576 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
577 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
578 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
579 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
580 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
581 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
582 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
583 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
584 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
585 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
586 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
587 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
589 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
592 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
593 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
595 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
597 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
599 // Verify the reinstated peerMember2 is fully synced.
601 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
602 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
603 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
604 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
606 // Reinstate peerMember1 and verify no owner changes
608 peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
609 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
610 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
611 leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
613 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
614 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
615 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
616 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
618 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
619 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
620 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
622 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
623 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
624 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
626 // Verify the reinstated peerMember1 is fully synced.
628 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
629 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
630 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
631 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
633 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
634 // the entities (1 and 3) previously owned by the local leader member.
636 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
637 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
638 peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
641 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
642 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
644 peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
645 peer2.tell(TimeoutNow.INSTANCE, peer2);
647 verifyRaftState(peer2, state ->
648 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
650 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
651 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
652 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
653 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
655 testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
659 public void testLeaderIsolation() throws Exception {
660 testLog.info("testLeaderIsolation starting");
662 final ShardTestKit kit = new ShardTestKit(getSystem());
664 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
665 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
666 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
668 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
669 .shardIsolatedLeaderCheckIntervalInMillis(100000);
671 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
672 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
674 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
676 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
677 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
679 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
681 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
682 .shardIsolatedLeaderCheckIntervalInMillis(500);
684 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
685 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
686 leaderId.toString());
688 ShardTestKit.waitUntilLeader(leader);
690 // Add entity1 candidates for all members with the leader as the owner
692 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
693 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
694 kit.expectMsgClass(SuccessReply.class);
695 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
697 peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
698 kit.expectMsgClass(SuccessReply.class);
699 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
701 peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
702 kit.expectMsgClass(SuccessReply.class);
703 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
705 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
706 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
707 verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
709 // Add entity2 candidates for all members with peer1 as the owner
711 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
712 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
713 kit.expectMsgClass(SuccessReply.class);
714 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
716 peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
717 kit.expectMsgClass(SuccessReply.class);
718 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
720 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
721 kit.expectMsgClass(SuccessReply.class);
722 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
724 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
725 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
726 verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
728 // Add entity3 candidates for all members with peer2 as the owner
730 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
731 peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
732 kit.expectMsgClass(SuccessReply.class);
733 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
735 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
736 kit.expectMsgClass(SuccessReply.class);
737 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
739 peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
740 kit.expectMsgClass(SuccessReply.class);
741 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
743 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
744 verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
745 verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
747 // Add listeners on all members
749 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
750 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
751 kit.expectMsgClass(SuccessReply.class);
752 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
753 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
754 ownershipChange(entity3, false, false, true)));
755 reset(leaderListener);
757 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
758 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
759 kit.expectMsgClass(SuccessReply.class);
760 verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
761 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
762 ownershipChange(entity3, false, false, true)));
763 reset(peer1Listener);
765 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
766 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
767 kit.expectMsgClass(SuccessReply.class);
768 verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
769 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
770 ownershipChange(entity3, false, true, true)));
771 reset(peer2Listener);
773 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
775 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
776 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
778 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
779 ae -> ae.getLeaderId().equals(leaderId.toString()));
780 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
782 // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
784 peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
786 // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
789 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
790 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
792 verifyRaftState(leader, state ->
793 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
795 // Expect inJeopardy notification on the isolated leader.
797 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
798 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
799 ownershipChange(entity3, false, false, true, true)));
800 reset(leaderListener);
802 verifyRaftState(peer1, state ->
803 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
805 // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
808 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
810 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
812 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
813 reset(peer1Listener);
815 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
816 reset(peer2Listener);
818 // Remove the isolation.
820 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
821 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
822 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
823 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
825 // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
827 verifyRaftState(leader, state ->
828 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
830 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
831 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
832 ownershipChange(entity3, false, false, true)));
834 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
835 verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
837 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
838 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
839 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
841 verifyNoMoreInteractions(leaderListener);
842 verifyNoMoreInteractions(peer1Listener);
843 verifyNoMoreInteractions(peer2Listener);
845 testLog.info("testLeaderIsolation ending");
849 public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
850 testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
852 final ShardTestKit kit = new ShardTestKit(getSystem());
854 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
855 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
856 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
858 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
859 .shardIsolatedLeaderCheckIntervalInMillis(100000);
861 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
862 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
863 actorFactory.createTestActor(MessageCollectorActor.props())), peerId1.toString());
864 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
866 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
867 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
868 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
869 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
871 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
872 .shardIsolatedLeaderCheckIntervalInMillis(500);
874 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
875 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
876 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
878 ShardTestKit.waitUntilLeader(leader);
880 // Add listeners on all members
882 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
883 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
884 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
885 kit.expectMsgClass(SuccessReply.class);
887 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
888 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
889 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
890 kit.expectMsgClass(SuccessReply.class);
892 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
893 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
894 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
895 kit.expectMsgClass(SuccessReply.class);
897 // Drop the CandidateAdded message to the leader for now.
899 leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
901 // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
902 // assigned the owner.
904 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
905 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
906 kit.expectMsgClass(SuccessReply.class);
907 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
908 verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
909 verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
911 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
912 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
913 kit.expectMsgClass(SuccessReply.class);
914 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
915 verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
916 verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
918 // Capture the CandidateAdded messages.
920 final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
921 CandidateAdded.class, 2);
923 // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
924 // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
926 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
927 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
929 // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
931 leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
932 leader.tell(candidateAdded.get(0), leader);
933 leader.tell(candidateAdded.get(1), leader);
935 expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
936 ae -> ae.getEntries().size() > 0);
938 // Verify no owner assigned.
940 verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
941 verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
943 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
945 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
946 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
948 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
949 ae -> ae.getLeaderId().equals(leaderId.toString()));
950 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
952 // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
954 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
955 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
957 // Verify the leader transitions to IsolatedLeader.
959 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
960 state.getRaftState()));
962 // Send PeerDown to the new leader peer1.
964 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
966 // Make peer1 start an election and become leader by sending the TimeoutNow message.
968 peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
970 // Verify the peer1 transitions to Leader.
972 verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
973 state.getRaftState()));
975 verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
976 verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
978 verifyNoMoreInteractions(peer1Listener);
979 verifyNoMoreInteractions(peer2Listener);
981 // Add candidate peer1 candidate for entity2.
983 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
985 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
986 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
987 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
989 reset(leaderListener, peer1Listener, peer2Listener);
991 // Remove the isolation.
993 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
994 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
995 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
996 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
998 // Previous leader should switch to Follower.
1000 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1001 state.getRaftState()));
1003 // Send PeerUp to peer1 and peer2.
1005 peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1006 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1008 // The previous leader should become the owner of entity1.
1010 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1012 // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1013 // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1014 // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1015 // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1016 // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1017 verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1018 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1019 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1021 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1022 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1024 // Verify entity2's owner doesn't change.
1026 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1027 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1029 verifyNoMoreInteractions(leaderListener);
1030 verifyNoMoreInteractions(peer1Listener);
1031 verifyNoMoreInteractions(peer2Listener);
1033 testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1037 public void testListenerRegistration() throws Exception {
1038 testLog.info("testListenerRegistration starting");
1040 ShardTestKit kit = new ShardTestKit(getSystem());
1042 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1043 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1045 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1046 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1047 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1049 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1050 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1052 ShardTestKit.waitUntilLeader(leader);
1054 String otherEntityType = "otherEntityType";
1055 final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1056 final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1057 final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1058 final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1059 DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1061 // Register listener
1063 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1064 kit.expectMsgClass(SuccessReply.class);
1066 // Register a couple candidates for the desired entity type and verify listener is notified.
1068 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1069 kit.expectMsgClass(SuccessReply.class);
1071 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1073 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1074 kit.expectMsgClass(SuccessReply.class);
1076 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1079 // Register another candidate for another entity type and verify listener is not notified.
1081 leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1082 kit.expectMsgClass(SuccessReply.class);
1084 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1085 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1087 // Register remote candidate for entity1
1089 peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1090 kit.expectMsgClass(SuccessReply.class);
1091 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1093 // Unregister the local candidate for entity1 and verify listener is notified
1095 leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1096 kit.expectMsgClass(SuccessReply.class);
1098 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1101 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1103 leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1104 kit.expectMsgClass(SuccessReply.class);
1106 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1107 kit.expectMsgClass(SuccessReply.class);
1109 verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1110 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1111 verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1113 // Re-register the listener and verify it gets notified of currently owned entities
1117 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1118 kit.expectMsgClass(SuccessReply.class);
1120 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1121 ownershipChange(entity3, false, true, true)));
1122 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1123 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1124 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1126 testLog.info("testListenerRegistration ending");
1130 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
1131 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1133 ShardTestKit kit = new ShardTestKit(getSystem());
1134 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1135 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1137 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1138 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1140 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1141 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1142 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1144 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1145 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1146 leaderId.toString());
1148 ShardTestKit.waitUntilLeader(leader);
1150 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1152 // Add a remote candidate
1154 peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1155 kit.expectMsgClass(SuccessReply.class);
1159 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1160 kit.expectMsgClass(SuccessReply.class);
1162 // Verify the local candidate becomes owner
1164 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1165 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1166 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1168 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1172 public void testDelayedEntityOwnerSelection() throws Exception {
1173 testLog.info("testDelayedEntityOwnerSelection starting");
1175 final ShardTestKit kit = new ShardTestKit(getSystem());
1176 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1177 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1179 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1181 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1182 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1183 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1185 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1186 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1187 peerId1.toString());
1188 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1190 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1191 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1192 peerId2.toString());
1193 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1195 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1196 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1197 builder.build()), leaderId.toString());
1199 ShardTestKit.waitUntilLeader(leader);
1201 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1203 // Add a remote candidate
1205 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1206 kit.expectMsgClass(SuccessReply.class);
1210 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1211 kit.expectMsgClass(SuccessReply.class);
1213 // Verify the local candidate becomes owner
1215 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1216 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1217 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1219 testLog.info("testDelayedEntityOwnerSelection ending");
1222 private Props newLocalShardProps() {
1223 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1226 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
1227 return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1230 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
1231 EntityOwnerSelectionStrategyConfig config) {
1232 return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1233 .withDispatcher(Dispatchers.DefaultDispatcherId());
1236 private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
1237 String memberName) {
1238 return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1239 dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
1240 MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1241 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1244 private Map<String, String> peerMap(String... peerIds) {
1245 ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1246 for (String peerId: peerIds) {
1247 builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1250 return builder.build();
1253 private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1254 private final TestActorRef<MessageCollectorActor> collectorActor;
1255 private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1257 TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1259 this.collectorActor = collectorActor;
1262 @SuppressWarnings({ "unchecked", "rawtypes" })
1264 public void handleCommand(Object message) {
1265 Predicate drop = dropMessagesOfType.get(message.getClass());
1266 if (drop == null || !drop.test(message)) {
1267 super.handleCommand(message);
1270 if (collectorActor != null) {
1271 collectorActor.tell(message, ActorRef.noSender());
1275 void startDroppingMessagesOfType(Class<?> msgClass) {
1276 dropMessagesOfType.put(msgClass, msg -> true);
1279 <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
1280 dropMessagesOfType.put(msgClass, filter);
1283 void stopDroppingMessagesOfType(Class<?> msgClass) {
1284 dropMessagesOfType.remove(msgClass);
1287 TestActorRef<MessageCollectorActor> collectorActor() {
1288 return collectorActor;
1291 static Props props(Builder builder) {
1292 return props(builder, null);
1295 static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1296 return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1297 .withDispatcher(Dispatchers.DefaultDispatcherId());