2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.ArgumentMatchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.TestActorRef;
30 import com.google.common.collect.ImmutableMap;
31 import com.google.common.util.concurrent.Uninterruptibles;
32 import java.time.Duration;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.function.Predicate;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.access.concepts.MemberName;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
48 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
49 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
50 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
52 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
53 import org.opendaylight.controller.cluster.entityownership.messages.CandidateAdded;
54 import org.opendaylight.controller.cluster.entityownership.messages.RegisterCandidateLocal;
55 import org.opendaylight.controller.cluster.entityownership.messages.RegisterListenerLocal;
56 import org.opendaylight.controller.cluster.entityownership.messages.UnregisterCandidateLocal;
57 import org.opendaylight.controller.cluster.entityownership.messages.UnregisterListenerLocal;
58 import org.opendaylight.controller.cluster.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
59 import org.opendaylight.controller.cluster.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
60 import org.opendaylight.controller.cluster.raft.RaftState;
61 import org.opendaylight.controller.cluster.raft.TestActorFactory;
62 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
63 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
70 import org.opendaylight.yangtools.yang.common.QName;
71 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
74 * Unit tests for EntityOwnershipShard.
76 * @author Thomas Pantelis
78 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
79 private static final String ENTITY_TYPE = "test type";
80 private static final YangInstanceIdentifier ENTITY_ID1 =
81 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
82 private static final YangInstanceIdentifier ENTITY_ID2 =
83 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
84 private static final YangInstanceIdentifier ENTITY_ID3 =
85 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
86 private static final YangInstanceIdentifier ENTITY_ID4 =
87 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
88 private static final YangInstanceIdentifier ENTITY_ID5 =
89 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
90 private static final String LOCAL_MEMBER_NAME = "local-member-1";
91 private static final String PEER_MEMBER_1_NAME = "peer-member-1";
92 private static final String PEER_MEMBER_2_NAME = "peer-member-2";
94 private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
95 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
98 public void tearDown() {
103 public void testOnRegisterCandidateLocal() {
104 testLog.info("testOnRegisterCandidateLocal starting");
106 ShardTestKit kit = new ShardTestKit(getSystem());
108 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
110 ShardTestKit.waitUntilLeader(shard);
112 YangInstanceIdentifier entityId = ENTITY_ID1;
113 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
115 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
116 kit.expectMsgClass(SuccessReply.class);
118 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
119 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121 testLog.info("testOnRegisterCandidateLocal ending");
125 public void testOnRegisterCandidateLocalWithNoInitialLeader() {
126 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
128 final ShardTestKit kit = new ShardTestKit(getSystem());
130 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
132 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
133 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
135 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
136 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
137 TestEntityOwnershipShard peerShard = peer.underlyingActor();
138 peerShard.startDroppingMessagesOfType(RequestVote.class);
139 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
141 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
142 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
144 YangInstanceIdentifier entityId = ENTITY_ID1;
145 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
147 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
148 kit.expectMsgClass(SuccessReply.class);
150 // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
151 peerShard.stopDroppingMessagesOfType(RequestVote.class);
153 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
154 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
156 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
160 public void testOnRegisterCandidateLocalWithNoInitialConsensus() {
161 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
163 final ShardTestKit kit = new ShardTestKit(getSystem());
165 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
166 .shardTransactionCommitTimeoutInSeconds(1);
168 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
169 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
171 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
172 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
173 TestEntityOwnershipShard peerShard = peer.underlyingActor();
174 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
176 // Drop AppendEntries so consensus isn't reached.
177 peerShard.startDroppingMessagesOfType(AppendEntries.class);
179 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
180 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
182 ShardTestKit.waitUntilLeader(leader);
184 YangInstanceIdentifier entityId = ENTITY_ID1;
185 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
187 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
188 kit.expectMsgClass(SuccessReply.class);
190 // Wait enough time for the commit to timeout.
191 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
193 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
194 // write being applied to the state.
195 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
197 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
198 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
204 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
205 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
207 final ShardTestKit kit = new ShardTestKit(getSystem());
209 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
210 .shardIsolatedLeaderCheckIntervalInMillis(50);
212 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
213 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
215 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
216 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
217 TestEntityOwnershipShard peerShard = peer.underlyingActor();
218 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
220 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
221 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
223 ShardTestKit.waitUntilLeader(leader);
225 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
226 peerShard.startDroppingMessagesOfType(AppendEntries.class);
227 verifyRaftState(leader, state ->
228 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
230 YangInstanceIdentifier entityId = ENTITY_ID1;
231 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
233 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
234 kit.expectMsgClass(SuccessReply.class);
236 // Resume AppendEntries - the candidate write should now be committed.
237 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
238 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
239 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
241 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
245 public void testOnRegisterCandidateLocalWithRemoteLeader() {
246 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
248 ShardTestKit kit = new ShardTestKit(getSystem());
250 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
251 .shardBatchedModificationCount(5);
253 ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
254 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
255 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
256 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
257 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
258 final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
260 TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
261 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
262 local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
264 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
265 kit.expectMsgClass(SuccessReply.class);
267 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
268 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
270 // Test with initial commit timeout and subsequent retry.
272 local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
273 leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
275 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
276 kit.expectMsgClass(SuccessReply.class);
278 expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
280 // Send a bunch of registration messages quickly and verify.
282 leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
283 clearMessages(leaderShard.collectorActor());
286 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
287 for (int i = 1; i <= max; i++) {
288 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
290 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
293 for (int i = 0; i < max; i++) {
294 verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
297 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
301 public void testOnUnregisterCandidateLocal() {
302 testLog.info("testOnUnregisterCandidateLocal starting");
304 ShardTestKit kit = new ShardTestKit(getSystem());
305 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
306 ShardTestKit.waitUntilLeader(shard);
308 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
312 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
313 kit.expectMsgClass(SuccessReply.class);
315 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
316 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
320 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
321 kit.expectMsgClass(SuccessReply.class);
323 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
327 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
328 kit.expectMsgClass(SuccessReply.class);
330 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
331 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
333 testLog.info("testOnUnregisterCandidateLocal ending");
337 public void testOwnershipChanges() {
338 testLog.info("testOwnershipChanges starting");
340 final ShardTestKit kit = new ShardTestKit(getSystem());
342 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
344 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
345 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
346 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
348 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
349 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
351 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
353 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
354 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
356 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
358 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
359 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
360 leaderId.toString());
362 ShardTestKit.waitUntilLeader(leader);
364 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
366 // Add a remote candidate
368 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
369 kit.expectMsgClass(SuccessReply.class);
371 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
375 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
376 kit.expectMsgClass(SuccessReply.class);
378 // Verify the remote candidate becomes owner
380 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
381 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
383 // Add another remote candidate and verify ownership doesn't change
385 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
386 kit.expectMsgClass(SuccessReply.class);
388 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
389 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
390 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
392 // Remove the second remote candidate and verify ownership doesn't change
394 peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
395 kit.expectMsgClass(SuccessReply.class);
397 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
398 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
399 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
401 // Remove the first remote candidate and verify the local candidate becomes owner
403 peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
404 kit.expectMsgClass(SuccessReply.class);
406 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
407 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
409 // Add the second remote candidate back and verify ownership doesn't change
411 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
412 kit.expectMsgClass(SuccessReply.class);
414 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
415 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
416 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
418 // Unregister the local candidate and verify the second remote candidate becomes owner
420 leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
421 kit.expectMsgClass(SuccessReply.class);
423 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
424 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
426 testLog.info("testOwnershipChanges ending");
430 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
431 testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
433 final ShardTestKit kit = new ShardTestKit(getSystem());
435 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
436 .shardIsolatedLeaderCheckIntervalInMillis(100000);
438 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
439 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
440 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
442 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
443 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
445 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
447 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
448 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
450 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
452 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
453 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
454 leaderId.toString());
456 verifyRaftState(leader, state ->
457 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
459 // Send PeerDown and PeerUp with no entities
461 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
462 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
464 // Add candidates for entity1 with the local leader as the owner
466 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
467 kit.expectMsgClass(SuccessReply.class);
468 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
470 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
471 kit.expectMsgClass(SuccessReply.class);
472 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
474 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
475 kit.expectMsgClass(SuccessReply.class);
476 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
477 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
479 // Add candidates for entity2 with peerMember2 as the owner
481 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
482 kit.expectMsgClass(SuccessReply.class);
483 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
485 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
486 kit.expectMsgClass(SuccessReply.class);
487 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
488 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
490 // Add candidates for entity3 with peerMember2 as the owner.
492 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
493 kit.expectMsgClass(SuccessReply.class);
494 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
496 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
497 kit.expectMsgClass(SuccessReply.class);
498 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
500 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
501 kit.expectMsgClass(SuccessReply.class);
502 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
503 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
505 // Add only candidate peerMember2 for entity4.
507 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
508 kit.expectMsgClass(SuccessReply.class);
509 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
510 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
512 // Add only candidate peerMember1 for entity5.
514 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
515 kit.expectMsgClass(SuccessReply.class);
516 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
517 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
519 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
523 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
524 kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
527 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
528 // Send PeerDown again - should be noop
529 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
530 peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
532 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
533 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
534 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
535 // no other candidates for entity4 so peerMember2 should remain owner.
536 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
538 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
539 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
540 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
541 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
543 // Reinstate peerMember2
545 peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
546 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
548 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
549 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
550 // Send PeerUp again - should be noop
551 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
552 peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
554 // peerMember2's candidates should be removed on startup.
555 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
556 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
557 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
558 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
560 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
561 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
562 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
563 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
565 // Add back candidate peerMember2 for entities 1, 2, & 3.
567 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
568 kit.expectMsgClass(SuccessReply.class);
569 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
570 kit.expectMsgClass(SuccessReply.class);
571 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
572 kit.expectMsgClass(SuccessReply.class);
573 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
574 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
575 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
576 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
577 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
578 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
579 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
580 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
581 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
582 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
583 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
584 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
585 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
587 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
590 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
591 kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
593 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
595 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
597 // Verify the reinstated peerMember2 is fully synced.
599 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
600 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
601 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
602 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
604 // Reinstate peerMember1 and verify no owner changes
606 peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
607 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
608 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
609 leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
611 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
612 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
613 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
614 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
616 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
617 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
618 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
620 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
621 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
622 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
624 // Verify the reinstated peerMember1 is fully synced.
626 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
627 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
628 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
629 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
631 AtomicLong leaderLastApplied = new AtomicLong();
632 verifyRaftState(leader, rs -> {
633 assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
634 leaderLastApplied.set(rs.getLastApplied());
637 verifyRaftState(peer2, rs -> assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex()));
639 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
640 // the entities (1 and 3) previously owned by the local leader member.
642 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
643 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
644 peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
647 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
648 kit.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
650 peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
651 peer2.tell(TimeoutNow.INSTANCE, peer2);
653 verifyRaftState(peer2, state ->
654 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
656 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
657 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
658 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
659 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
661 testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
665 public void testLeaderIsolation() throws Exception {
666 testLog.info("testLeaderIsolation starting");
668 final ShardTestKit kit = new ShardTestKit(getSystem());
670 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
671 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
672 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
674 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
675 .shardIsolatedLeaderCheckIntervalInMillis(100000);
677 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
678 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
680 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
682 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
683 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
685 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
687 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
688 .shardIsolatedLeaderCheckIntervalInMillis(500);
690 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
691 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
692 leaderId.toString());
694 ShardTestKit.waitUntilLeader(leader);
696 // Add entity1 candidates for all members with the leader as the owner
698 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
699 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
700 kit.expectMsgClass(SuccessReply.class);
701 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
703 peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
704 kit.expectMsgClass(SuccessReply.class);
705 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
707 peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
708 kit.expectMsgClass(SuccessReply.class);
709 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
711 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
712 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
713 verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
715 // Add entity2 candidates for all members with peer1 as the owner
717 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
718 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
719 kit.expectMsgClass(SuccessReply.class);
720 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
722 peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
723 kit.expectMsgClass(SuccessReply.class);
724 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
726 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
727 kit.expectMsgClass(SuccessReply.class);
728 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
730 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
731 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
732 verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
734 // Add entity3 candidates for all members with peer2 as the owner
736 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
737 peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
738 kit.expectMsgClass(SuccessReply.class);
739 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
741 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
742 kit.expectMsgClass(SuccessReply.class);
743 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
745 peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
746 kit.expectMsgClass(SuccessReply.class);
747 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
749 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
750 verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
751 verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
753 // Add listeners on all members
755 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
756 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
757 kit.expectMsgClass(SuccessReply.class);
758 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
759 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
760 ownershipChange(entity3, false, false, true)));
761 reset(leaderListener);
763 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
764 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
765 kit.expectMsgClass(SuccessReply.class);
766 verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
767 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
768 ownershipChange(entity3, false, false, true)));
769 reset(peer1Listener);
771 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
772 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
773 kit.expectMsgClass(SuccessReply.class);
774 verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
775 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
776 ownershipChange(entity3, false, true, true)));
777 reset(peer2Listener);
779 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
781 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
782 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
784 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
785 ae -> ae.getLeaderId().equals(leaderId.toString()));
786 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
788 // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
790 peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
792 // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
795 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
796 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
798 verifyRaftState(leader, state ->
799 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
801 // Expect inJeopardy notification on the isolated leader.
803 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
804 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
805 ownershipChange(entity3, false, false, true, true)));
806 reset(leaderListener);
808 verifyRaftState(peer1, state ->
809 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
811 // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
814 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
816 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
818 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
819 reset(peer1Listener);
821 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
822 reset(peer2Listener);
824 // Remove the isolation.
826 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
827 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
828 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
829 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
831 // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
833 verifyRaftState(leader, state ->
834 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
836 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
837 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
838 ownershipChange(entity3, false, false, true)));
840 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
841 verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
843 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
844 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
845 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
847 verifyNoMoreInteractions(leaderListener);
848 verifyNoMoreInteractions(peer1Listener);
849 verifyNoMoreInteractions(peer2Listener);
851 testLog.info("testLeaderIsolation ending");
855 public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
856 testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
858 final ShardTestKit kit = new ShardTestKit(getSystem());
860 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
861 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
862 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
864 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
865 .shardIsolatedLeaderCheckIntervalInMillis(100000);
867 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
868 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
869 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
870 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
872 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
873 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
874 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
875 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
877 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
878 .shardIsolatedLeaderCheckIntervalInMillis(500);
880 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
881 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
882 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
884 ShardTestKit.waitUntilLeader(leader);
886 // Add listeners on all members
888 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
889 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
890 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
891 kit.expectMsgClass(SuccessReply.class);
893 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
894 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
895 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
896 kit.expectMsgClass(SuccessReply.class);
898 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
899 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
900 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
901 kit.expectMsgClass(SuccessReply.class);
903 // Drop the CandidateAdded message to the leader for now.
905 leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
907 // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
908 // assigned the owner.
910 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
911 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
912 kit.expectMsgClass(SuccessReply.class);
913 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
914 verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
915 verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
917 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
918 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
919 kit.expectMsgClass(SuccessReply.class);
920 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
921 verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
922 verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
924 // Capture the CandidateAdded messages.
926 final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
927 CandidateAdded.class, 2);
929 // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
930 // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
932 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
933 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
935 // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
937 leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
938 leader.tell(candidateAdded.get(0), leader);
939 leader.tell(candidateAdded.get(1), leader);
941 expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
942 ae -> ae.getEntries().size() > 0);
944 // Verify no owner assigned.
946 verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
947 verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
949 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
951 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
952 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
954 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
955 ae -> ae.getLeaderId().equals(leaderId.toString()));
956 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
958 // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
960 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
961 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
963 // Verify the leader transitions to IsolatedLeader.
965 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
966 state.getRaftState()));
968 // Send PeerDown to the new leader peer1.
970 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
972 // Make peer1 start an election and become leader by sending the TimeoutNow message.
974 peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
976 // Verify the peer1 transitions to Leader.
978 verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
979 state.getRaftState()));
981 verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
982 verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
984 verifyNoMoreInteractions(peer1Listener);
985 verifyNoMoreInteractions(peer2Listener);
987 // Add candidate peer1 candidate for entity2.
989 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
991 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
992 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
993 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
995 reset(leaderListener, peer1Listener, peer2Listener);
997 // Remove the isolation.
999 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1000 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1001 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1002 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1004 // Previous leader should switch to Follower.
1006 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1007 state.getRaftState()));
1009 // Send PeerUp to peer1 and peer2.
1011 peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1012 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1014 // The previous leader should become the owner of entity1.
1016 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1018 // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1019 // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1020 // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1021 // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1022 // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1023 verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1024 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1025 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1027 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1028 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1030 // Verify entity2's owner doesn't change.
1032 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1033 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1035 verifyNoMoreInteractions(leaderListener);
1036 verifyNoMoreInteractions(peer1Listener);
1037 verifyNoMoreInteractions(peer2Listener);
1039 testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1043 public void testListenerRegistration() {
1044 testLog.info("testListenerRegistration starting");
1046 ShardTestKit kit = new ShardTestKit(getSystem());
1048 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1049 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1051 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1052 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1053 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1055 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1056 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1058 ShardTestKit.waitUntilLeader(leader);
1060 String otherEntityType = "otherEntityType";
1061 final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1062 final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1063 final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1064 final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1065 DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1067 // Register listener
1069 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1070 kit.expectMsgClass(SuccessReply.class);
1072 // Register a couple candidates for the desired entity type and verify listener is notified.
1074 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1075 kit.expectMsgClass(SuccessReply.class);
1077 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1079 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1080 kit.expectMsgClass(SuccessReply.class);
1082 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1085 // Register another candidate for another entity type and verify listener is not notified.
1087 leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1088 kit.expectMsgClass(SuccessReply.class);
1090 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1091 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1093 // Register remote candidate for entity1
1095 peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1096 kit.expectMsgClass(SuccessReply.class);
1097 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1099 // Unregister the local candidate for entity1 and verify listener is notified
1101 leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1102 kit.expectMsgClass(SuccessReply.class);
1104 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1107 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1109 leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1110 kit.expectMsgClass(SuccessReply.class);
1112 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1113 kit.expectMsgClass(SuccessReply.class);
1115 verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1116 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1117 verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1119 // Re-register the listener and verify it gets notified of currently owned entities
1123 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1124 kit.expectMsgClass(SuccessReply.class);
1126 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1127 ownershipChange(entity3, false, true, true)));
1128 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1129 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1130 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1132 testLog.info("testListenerRegistration ending");
1136 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() {
1137 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1139 ShardTestKit kit = new ShardTestKit(getSystem());
1140 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1141 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1143 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1144 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1146 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1147 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1148 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1150 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1151 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1152 leaderId.toString());
1154 ShardTestKit.waitUntilLeader(leader);
1156 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1158 // Add a remote candidate
1160 peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1161 kit.expectMsgClass(SuccessReply.class);
1165 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1166 kit.expectMsgClass(SuccessReply.class);
1168 // Verify the local candidate becomes owner
1170 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1171 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1172 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1174 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1178 public void testDelayedEntityOwnerSelection() {
1179 testLog.info("testDelayedEntityOwnerSelection starting");
1181 final ShardTestKit kit = new ShardTestKit(getSystem());
1182 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1183 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1185 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1187 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1188 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1189 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1191 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1192 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1193 peerId1.toString());
1194 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1196 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1197 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1198 peerId2.toString());
1199 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1201 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1202 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1203 builder.build()), leaderId.toString());
1205 ShardTestKit.waitUntilLeader(leader);
1207 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1209 // Add a remote candidate
1211 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1212 kit.expectMsgClass(SuccessReply.class);
1216 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1217 kit.expectMsgClass(SuccessReply.class);
1219 // Verify the local candidate becomes owner
1221 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1222 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1223 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1225 testLog.info("testDelayedEntityOwnerSelection ending");
1228 private Props newLocalShardProps() {
1229 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1232 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1233 final String memberName) {
1234 return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1237 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1238 final EntityOwnerSelectionStrategyConfig config) {
1239 return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1240 .withDispatcher(Dispatchers.DefaultDispatcherId());
1243 private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1244 final String memberName) {
1245 return EntityOwnershipShard.newBuilder()
1247 .peerAddresses(peers)
1248 .datastoreContext(dataStoreContextBuilder.build())
1249 .schemaContextProvider(() -> EOSTestUtils.SCHEMA_CONTEXT)
1250 .localMemberName(MemberName.forName(memberName))
1251 .ownerSelectionStrategyConfig(EntityOwnerSelectionStrategyConfig.newBuilder().build());
1254 private Map<String, String> peerMap(final String... peerIds) {
1255 ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1256 for (String peerId: peerIds) {
1257 builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1260 return builder.build();
1263 private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1264 private final ActorRef collectorActor;
1265 private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1267 TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1269 this.collectorActor = collectorActor;
1272 @SuppressWarnings({ "unchecked", "rawtypes" })
1274 public void handleCommand(final Object message) {
1275 Predicate drop = dropMessagesOfType.get(message.getClass());
1276 if (drop == null || !drop.test(message)) {
1277 super.handleCommand(message);
1280 if (collectorActor != null) {
1281 collectorActor.tell(message, ActorRef.noSender());
1285 void startDroppingMessagesOfType(final Class<?> msgClass) {
1286 dropMessagesOfType.put(msgClass, msg -> true);
1289 <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1290 dropMessagesOfType.put(msgClass, filter);
1293 void stopDroppingMessagesOfType(final Class<?> msgClass) {
1294 dropMessagesOfType.remove(msgClass);
1297 ActorRef collectorActor() {
1298 return collectorActor;
1301 static Props props(final Builder builder) {
1302 return props(builder, null);
1305 static Props props(final Builder builder, final ActorRef collectorActor) {
1306 return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1307 .withDispatcher(Dispatchers.DefaultDispatcherId());