2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.clearMessages;
21 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectFirstMatching;
22 import static org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor.expectMatching;
24 import akka.actor.ActorRef;
25 import akka.actor.PoisonPill;
26 import akka.actor.Props;
27 import akka.actor.Terminated;
28 import akka.dispatch.Dispatchers;
29 import akka.testkit.JavaTestKit;
30 import akka.testkit.TestActorRef;
31 import com.google.common.collect.ImmutableMap;
32 import com.google.common.util.concurrent.Uninterruptibles;
33 import java.util.ArrayList;
34 import java.util.Collections;
35 import java.util.List;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.atomic.AtomicLong;
40 import java.util.function.Predicate;
41 import org.junit.After;
42 import org.junit.Test;
43 import org.opendaylight.controller.cluster.access.concepts.MemberName;
44 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
45 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
46 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
47 import org.opendaylight.controller.cluster.datastore.entityownership.messages.CandidateAdded;
48 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
49 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
50 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
51 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
52 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
53 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
54 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
55 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
56 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
57 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
58 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
59 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
60 import org.opendaylight.controller.cluster.raft.RaftState;
61 import org.opendaylight.controller.cluster.raft.TestActorFactory;
62 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
63 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
64 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
65 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
66 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
67 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
68 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
69 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
70 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
71 import org.opendaylight.yangtools.yang.common.QName;
72 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
73 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
76 * Unit tests for EntityOwnershipShard.
78 * @author Thomas Pantelis
80 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
81 private static final String ENTITY_TYPE = "test type";
82 private static final YangInstanceIdentifier ENTITY_ID1 =
83 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
84 private static final YangInstanceIdentifier ENTITY_ID2 =
85 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
86 private static final YangInstanceIdentifier ENTITY_ID3 =
87 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
88 private static final YangInstanceIdentifier ENTITY_ID4 =
89 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
90 private static final YangInstanceIdentifier ENTITY_ID5 =
91 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
92 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
93 private static final String LOCAL_MEMBER_NAME = "local-member-1";
94 private static final String PEER_MEMBER_1_NAME = "peer-member-1";
95 private static final String PEER_MEMBER_2_NAME = "peer-member-2";
97 private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
98 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
101 public void tearDown() {
102 actorFactory.close();
106 public void testOnRegisterCandidateLocal() throws Exception {
107 testLog.info("testOnRegisterCandidateLocal starting");
109 ShardTestKit kit = new ShardTestKit(getSystem());
111 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
113 ShardTestKit.waitUntilLeader(shard);
115 YangInstanceIdentifier entityId = ENTITY_ID1;
116 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
118 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
119 kit.expectMsgClass(SuccessReply.class);
121 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
124 testLog.info("testOnRegisterCandidateLocal ending");
128 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
129 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
131 final ShardTestKit kit = new ShardTestKit(getSystem());
133 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
135 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
136 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
138 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
139 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
140 TestEntityOwnershipShard peerShard = peer.underlyingActor();
141 peerShard.startDroppingMessagesOfType(RequestVote.class);
142 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
144 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
145 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
147 YangInstanceIdentifier entityId = ENTITY_ID1;
148 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
150 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
151 kit.expectMsgClass(SuccessReply.class);
153 // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
154 peerShard.stopDroppingMessagesOfType(RequestVote.class);
156 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
157 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
159 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
163 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
164 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
166 final ShardTestKit kit = new ShardTestKit(getSystem());
168 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
169 .shardTransactionCommitTimeoutInSeconds(1);
171 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
172 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
174 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
175 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
176 TestEntityOwnershipShard peerShard = peer.underlyingActor();
177 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
179 // Drop AppendEntries so consensus isn't reached.
180 peerShard.startDroppingMessagesOfType(AppendEntries.class);
182 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
183 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
185 ShardTestKit.waitUntilLeader(leader);
187 YangInstanceIdentifier entityId = ENTITY_ID1;
188 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
190 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
191 kit.expectMsgClass(SuccessReply.class);
193 // Wait enough time for the commit to timeout.
194 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
196 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
197 // write being applied to the state.
198 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
200 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
201 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
203 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
207 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
208 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
210 final ShardTestKit kit = new ShardTestKit(getSystem());
212 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
213 .shardIsolatedLeaderCheckIntervalInMillis(50);
215 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
216 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
218 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
219 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
220 TestEntityOwnershipShard peerShard = peer.underlyingActor();
221 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
223 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
224 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
226 ShardTestKit.waitUntilLeader(leader);
228 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
229 peerShard.startDroppingMessagesOfType(AppendEntries.class);
230 verifyRaftState(leader, state ->
231 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
233 YangInstanceIdentifier entityId = ENTITY_ID1;
234 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
236 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
237 kit.expectMsgClass(SuccessReply.class);
239 // Resume AppendEntries - the candidate write should now be committed.
240 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
241 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
242 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
244 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
248 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
249 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
251 ShardTestKit kit = new ShardTestKit(getSystem());
253 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2)
254 .shardBatchedModificationCount(5);
256 ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
257 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
258 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
259 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
260 actorFactory.createActor(MessageCollectorActor.props())), leaderId.toString());
261 final TestEntityOwnershipShard leaderShard = leader.underlyingActor();
263 TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
264 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
265 local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
267 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
268 kit.expectMsgClass(SuccessReply.class);
270 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
271 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
273 // Test with initial commit timeout and subsequent retry.
275 local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
276 leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
278 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
279 kit.expectMsgClass(SuccessReply.class);
281 expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
283 // Send a bunch of registration messages quickly and verify.
285 leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
286 clearMessages(leaderShard.collectorActor());
289 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
290 for (int i = 1; i <= max; i++) {
291 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
293 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
296 for (int i = 0; i < max; i++) {
297 verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
300 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
304 public void testOnUnregisterCandidateLocal() throws Exception {
305 testLog.info("testOnUnregisterCandidateLocal starting");
307 ShardTestKit kit = new ShardTestKit(getSystem());
308 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
309 ShardTestKit.waitUntilLeader(shard);
311 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
315 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
316 kit.expectMsgClass(SuccessReply.class);
318 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
319 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
323 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
324 kit.expectMsgClass(SuccessReply.class);
326 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
330 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
331 kit.expectMsgClass(SuccessReply.class);
333 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
334 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
336 testLog.info("testOnUnregisterCandidateLocal ending");
340 public void testOwnershipChanges() throws Exception {
341 testLog.info("testOwnershipChanges starting");
343 final ShardTestKit kit = new ShardTestKit(getSystem());
345 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
347 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
348 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
349 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
351 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
352 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
354 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
356 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
357 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
359 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
361 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
362 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
363 leaderId.toString());
365 ShardTestKit.waitUntilLeader(leader);
367 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
369 // Add a remote candidate
371 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
372 kit.expectMsgClass(SuccessReply.class);
374 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
378 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
379 kit.expectMsgClass(SuccessReply.class);
381 // Verify the remote candidate becomes owner
383 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
384 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
386 // Add another remote candidate and verify ownership doesn't change
388 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
389 kit.expectMsgClass(SuccessReply.class);
391 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
392 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
393 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
395 // Remove the second remote candidate and verify ownership doesn't change
397 peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
398 kit.expectMsgClass(SuccessReply.class);
400 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
401 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
402 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
404 // Remove the first remote candidate and verify the local candidate becomes owner
406 peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
407 kit.expectMsgClass(SuccessReply.class);
409 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
410 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
412 // Add the second remote candidate back and verify ownership doesn't change
414 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
415 kit.expectMsgClass(SuccessReply.class);
417 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
418 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
419 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
421 // Unregister the local candidate and verify the second remote candidate becomes owner
423 leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
424 kit.expectMsgClass(SuccessReply.class);
426 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
427 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
429 testLog.info("testOwnershipChanges ending");
433 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
434 testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
436 final ShardTestKit kit = new ShardTestKit(getSystem());
438 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
439 .shardIsolatedLeaderCheckIntervalInMillis(100000);
441 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
442 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
443 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
445 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
446 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
448 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
450 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
451 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
453 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
455 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
456 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
457 leaderId.toString());
459 verifyRaftState(leader, state ->
460 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
462 // Send PeerDown and PeerUp with no entities
464 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
465 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
467 // Add candidates for entity1 with the local leader as the owner
469 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
470 kit.expectMsgClass(SuccessReply.class);
471 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
473 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
474 kit.expectMsgClass(SuccessReply.class);
475 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
477 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
478 kit.expectMsgClass(SuccessReply.class);
479 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
480 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
482 // Add candidates for entity2 with peerMember2 as the owner
484 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
485 kit.expectMsgClass(SuccessReply.class);
486 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
488 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
489 kit.expectMsgClass(SuccessReply.class);
490 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
491 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
493 // Add candidates for entity3 with peerMember2 as the owner.
495 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
496 kit.expectMsgClass(SuccessReply.class);
497 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
499 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
500 kit.expectMsgClass(SuccessReply.class);
501 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
503 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
504 kit.expectMsgClass(SuccessReply.class);
505 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
506 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
508 // Add only candidate peerMember2 for entity4.
510 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
511 kit.expectMsgClass(SuccessReply.class);
512 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
513 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
515 // Add only candidate peerMember1 for entity5.
517 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
518 kit.expectMsgClass(SuccessReply.class);
519 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
520 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
522 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
526 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
527 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
530 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
531 // Send PeerDown again - should be noop
532 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
533 peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
535 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
536 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
537 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
538 // no other candidates for entity4 so peerMember2 should remain owner.
539 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
541 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
542 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
543 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
544 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
546 // Reinstate peerMember2
548 peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
549 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
551 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
552 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
553 // Send PeerUp again - should be noop
554 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
555 peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
557 // peerMember2's candidates should be removed on startup.
558 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
559 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
560 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
561 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
563 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
564 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
565 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
566 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
568 // Add back candidate peerMember2 for entities 1, 2, & 3.
570 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
571 kit.expectMsgClass(SuccessReply.class);
572 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
573 kit.expectMsgClass(SuccessReply.class);
574 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
575 kit.expectMsgClass(SuccessReply.class);
576 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
577 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
578 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
579 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
580 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
581 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
582 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
583 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
584 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
585 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
586 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
587 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
588 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
590 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
593 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
594 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
596 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
598 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
600 // Verify the reinstated peerMember2 is fully synced.
602 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
603 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
604 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
605 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
607 // Reinstate peerMember1 and verify no owner changes
609 peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
610 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
611 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
612 leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
614 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
615 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
616 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
617 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
619 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
620 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
621 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
623 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
624 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
625 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
627 // Verify the reinstated peerMember1 is fully synced.
629 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
630 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
631 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
632 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
634 AtomicLong leaderLastApplied = new AtomicLong();
635 verifyRaftState(leader, rs -> {
636 assertEquals("LastApplied up-to-date", rs.getLastApplied(), rs.getLastIndex());
637 leaderLastApplied.set(rs.getLastApplied());
640 verifyRaftState(peer2, rs -> {
641 assertEquals("LastApplied", leaderLastApplied.get(), rs.getLastIndex());
644 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
645 // the entities (1 and 3) previously owned by the local leader member.
647 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
648 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
649 peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
652 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
653 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
655 peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
656 peer2.tell(TimeoutNow.INSTANCE, peer2);
658 verifyRaftState(peer2, state ->
659 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
661 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
662 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
663 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
664 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
666 testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
670 public void testLeaderIsolation() throws Exception {
671 testLog.info("testLeaderIsolation starting");
673 final ShardTestKit kit = new ShardTestKit(getSystem());
675 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
676 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
677 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
679 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
680 .shardIsolatedLeaderCheckIntervalInMillis(100000);
682 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
683 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
685 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
687 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
688 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
690 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
692 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
693 .shardIsolatedLeaderCheckIntervalInMillis(500);
695 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
696 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
697 leaderId.toString());
699 ShardTestKit.waitUntilLeader(leader);
701 // Add entity1 candidates for all members with the leader as the owner
703 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
704 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
705 kit.expectMsgClass(SuccessReply.class);
706 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
708 peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
709 kit.expectMsgClass(SuccessReply.class);
710 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
712 peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
713 kit.expectMsgClass(SuccessReply.class);
714 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
716 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
717 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
718 verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
720 // Add entity2 candidates for all members with peer1 as the owner
722 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
723 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
724 kit.expectMsgClass(SuccessReply.class);
725 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
727 peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
728 kit.expectMsgClass(SuccessReply.class);
729 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
731 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
732 kit.expectMsgClass(SuccessReply.class);
733 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
735 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
736 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
737 verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
739 // Add entity3 candidates for all members with peer2 as the owner
741 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
742 peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
743 kit.expectMsgClass(SuccessReply.class);
744 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
746 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
747 kit.expectMsgClass(SuccessReply.class);
748 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
750 peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
751 kit.expectMsgClass(SuccessReply.class);
752 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
754 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
755 verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
756 verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
758 // Add listeners on all members
760 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
761 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
762 kit.expectMsgClass(SuccessReply.class);
763 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
764 ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true)),
765 ownershipChange(entity3, false, false, true)));
766 reset(leaderListener);
768 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
769 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
770 kit.expectMsgClass(SuccessReply.class);
771 verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(
772 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, true, true)),
773 ownershipChange(entity3, false, false, true)));
774 reset(peer1Listener);
776 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
777 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
778 kit.expectMsgClass(SuccessReply.class);
779 verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(
780 ownershipChange(entity1, false, false, true), ownershipChange(entity2, false, false, true)),
781 ownershipChange(entity3, false, true, true)));
782 reset(peer2Listener);
784 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
786 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
787 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
789 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
790 ae -> ae.getLeaderId().equals(leaderId.toString()));
791 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
793 // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
795 peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
797 // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
800 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
801 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
803 verifyRaftState(leader, state ->
804 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
806 // Expect inJeopardy notification on the isolated leader.
808 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
809 ownershipChange(entity1, true, true, true, true), ownershipChange(entity2, false, false, true, true)),
810 ownershipChange(entity3, false, false, true, true)));
811 reset(leaderListener);
813 verifyRaftState(peer1, state ->
814 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
816 // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
819 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
821 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
823 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
824 reset(peer1Listener);
826 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
827 reset(peer2Listener);
829 // Remove the isolation.
831 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
832 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
833 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
834 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
836 // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
838 verifyRaftState(leader, state ->
839 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
841 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(
842 ownershipChange(entity1, true, true, true), ownershipChange(entity2, false, false, true)),
843 ownershipChange(entity3, false, false, true)));
845 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
846 verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
848 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
849 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
850 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
852 verifyNoMoreInteractions(leaderListener);
853 verifyNoMoreInteractions(peer1Listener);
854 verifyNoMoreInteractions(peer2Listener);
856 testLog.info("testLeaderIsolation ending");
860 public void testLeaderIsolationWithPendingCandidateAdded() throws Exception {
861 testLog.info("testLeaderIsolationWithPendingCandidateAdded starting");
863 final ShardTestKit kit = new ShardTestKit(getSystem());
865 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
866 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
867 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
869 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4)
870 .shardIsolatedLeaderCheckIntervalInMillis(100000);
872 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
873 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME),
874 actorFactory.createActor(MessageCollectorActor.props())), peerId1.toString());
875 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
877 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
878 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME),
879 actorFactory.createTestActor(MessageCollectorActor.props())), peerId2.toString());
880 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
882 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build())
883 .shardIsolatedLeaderCheckIntervalInMillis(500);
885 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
886 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
887 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
889 ShardTestKit.waitUntilLeader(leader);
891 // Add listeners on all members
893 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class,
894 "DOMEntityOwnershipListener-" + LOCAL_MEMBER_NAME);
895 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
896 kit.expectMsgClass(SuccessReply.class);
898 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class,
899 "DOMEntityOwnershipListener-" + PEER_MEMBER_1_NAME);
900 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
901 kit.expectMsgClass(SuccessReply.class);
903 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class,
904 "DOMEntityOwnershipListener-" + PEER_MEMBER_2_NAME);
905 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
906 kit.expectMsgClass(SuccessReply.class);
908 // Drop the CandidateAdded message to the leader for now.
910 leader.underlyingActor().startDroppingMessagesOfType(CandidateAdded.class);
912 // Add an entity candidates for the leader. Since we've blocked the CandidateAdded message, it won't be
913 // assigned the owner.
915 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
916 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
917 kit.expectMsgClass(SuccessReply.class);
918 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
919 verifyCommittedEntityCandidate(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
920 verifyCommittedEntityCandidate(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
922 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
923 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
924 kit.expectMsgClass(SuccessReply.class);
925 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
926 verifyCommittedEntityCandidate(peer1, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
927 verifyCommittedEntityCandidate(peer2, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
929 // Capture the CandidateAdded messages.
931 final List<CandidateAdded> candidateAdded = expectMatching(leader.underlyingActor().collectorActor(),
932 CandidateAdded.class, 2);
934 // Drop AppendEntries to the followers containing a log entry, which will be for the owner writes after we
935 // forward the CandidateAdded messages to the leader. This will leave the pending owner write tx's uncommitted.
937 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
938 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class, ae -> ae.getEntries().size() > 0);
940 // Now forward the CandidateAdded messages to the leader and wait for it to send out the AppendEntries.
942 leader.underlyingActor().stopDroppingMessagesOfType(CandidateAdded.class);
943 leader.tell(candidateAdded.get(0), leader);
944 leader.tell(candidateAdded.get(1), leader);
946 expectMatching(peer1.underlyingActor().collectorActor(), AppendEntries.class, 2,
947 ae -> ae.getEntries().size() > 0);
949 // Verify no owner assigned.
951 verifyNoOwnerSet(leader, entity1.getType(), entity1.getIdentifier());
952 verifyNoOwnerSet(leader, entity2.getType(), entity2.getIdentifier());
954 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
956 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
957 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
959 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
960 ae -> ae.getLeaderId().equals(leaderId.toString()));
961 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
963 // Send PeerDown to the isolated leader - should be no-op since there's no owned entities.
965 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
966 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
968 // Verify the leader transitions to IsolatedLeader.
970 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.IsolatedLeader.toString(),
971 state.getRaftState()));
973 // Send PeerDown to the new leader peer1.
975 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
977 // Make peer1 start an election and become leader by sending the TimeoutNow message.
979 peer1.tell(TimeoutNow.INSTANCE, ActorRef.noSender());
981 // Verify the peer1 transitions to Leader.
983 verifyRaftState(peer1, state -> assertEquals("getRaftState", RaftState.Leader.toString(),
984 state.getRaftState()));
986 verifyNoOwnerSet(peer1, entity1.getType(), entity1.getIdentifier());
987 verifyNoOwnerSet(peer2, entity1.getType(), entity2.getIdentifier());
989 verifyNoMoreInteractions(peer1Listener);
990 verifyNoMoreInteractions(peer2Listener);
992 // Add candidate peer1 candidate for entity2.
994 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
996 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
997 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
998 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, false, true));
1000 reset(leaderListener, peer1Listener, peer2Listener);
1002 // Remove the isolation.
1004 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
1005 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1006 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1007 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
1009 // Previous leader should switch to Follower.
1011 verifyRaftState(leader, state -> assertEquals("getRaftState", RaftState.Follower.toString(),
1012 state.getRaftState()));
1014 // Send PeerUp to peer1 and peer2.
1016 peer1.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1017 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
1019 // The previous leader should become the owner of entity1.
1021 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
1023 // The previous leader's DOMEntityOwnershipListener should get 4 total notifications:
1024 // - inJeopardy cleared for entity1 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1025 // - inJeopardy cleared for entity2 (wasOwner=false, isOwner=false, hasOwner=false, inJeopardy=false)
1026 // - local owner granted for entity1 (wasOwner=false, isOwner=true, hasOwner=true, inJeopardy=false)
1027 // - remote owner for entity2 (wasOwner=false, isOwner=false, hasOwner=true, inJeopardy=false)
1028 verify(leaderListener, timeout(5000).times(4)).ownershipChanged(or(
1029 or(ownershipChange(entity1, false, false, false), ownershipChange(entity2, false, false, false)),
1030 or(ownershipChange(entity1, false, true, true), ownershipChange(entity2, false, false, true))));
1032 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1033 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
1035 // Verify entity2's owner doesn't change.
1037 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1038 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
1040 verifyNoMoreInteractions(leaderListener);
1041 verifyNoMoreInteractions(peer1Listener);
1042 verifyNoMoreInteractions(peer2Listener);
1044 testLog.info("testLeaderIsolationWithPendingCandidateAdded ending");
1048 public void testListenerRegistration() throws Exception {
1049 testLog.info("testListenerRegistration starting");
1051 ShardTestKit kit = new ShardTestKit(getSystem());
1053 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1054 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1056 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1057 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1058 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1060 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1061 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
1063 ShardTestKit.waitUntilLeader(leader);
1065 String otherEntityType = "otherEntityType";
1066 final DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1067 final DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
1068 final DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
1069 final DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
1070 DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
1072 // Register listener
1074 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1075 kit.expectMsgClass(SuccessReply.class);
1077 // Register a couple candidates for the desired entity type and verify listener is notified.
1079 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1080 kit.expectMsgClass(SuccessReply.class);
1082 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
1084 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
1085 kit.expectMsgClass(SuccessReply.class);
1087 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
1090 // Register another candidate for another entity type and verify listener is not notified.
1092 leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
1093 kit.expectMsgClass(SuccessReply.class);
1095 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1096 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1098 // Register remote candidate for entity1
1100 peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
1101 kit.expectMsgClass(SuccessReply.class);
1102 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
1104 // Unregister the local candidate for entity1 and verify listener is notified
1106 leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
1107 kit.expectMsgClass(SuccessReply.class);
1109 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
1112 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
1114 leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1115 kit.expectMsgClass(SuccessReply.class);
1117 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
1118 kit.expectMsgClass(SuccessReply.class);
1120 verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
1121 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
1122 verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
1124 // Re-register the listener and verify it gets notified of currently owned entities
1128 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
1129 kit.expectMsgClass(SuccessReply.class);
1131 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
1132 ownershipChange(entity3, false, true, true)));
1133 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
1134 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
1135 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
1137 testLog.info("testListenerRegistration ending");
1141 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
1142 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
1144 ShardTestKit kit = new ShardTestKit(getSystem());
1145 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1146 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1148 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1149 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
1151 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1152 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
1153 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1155 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1156 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()),
1157 leaderId.toString());
1159 ShardTestKit.waitUntilLeader(leader);
1161 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1163 // Add a remote candidate
1165 peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
1166 kit.expectMsgClass(SuccessReply.class);
1170 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1171 kit.expectMsgClass(SuccessReply.class);
1173 // Verify the local candidate becomes owner
1175 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1176 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1177 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1179 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
1183 public void testDelayedEntityOwnerSelection() throws Exception {
1184 testLog.info("testDelayedEntityOwnerSelection starting");
1186 final ShardTestKit kit = new ShardTestKit(getSystem());
1187 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder()
1188 .addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
1190 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
1192 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
1193 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
1194 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
1196 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1197 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
1198 peerId1.toString());
1199 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1201 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
1202 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
1203 peerId2.toString());
1204 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
1206 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
1207 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME,
1208 builder.build()), leaderId.toString());
1210 ShardTestKit.waitUntilLeader(leader);
1212 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1214 // Add a remote candidate
1216 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1217 kit.expectMsgClass(SuccessReply.class);
1221 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1222 kit.expectMsgClass(SuccessReply.class);
1224 // Verify the local candidate becomes owner
1226 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1227 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1228 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1230 testLog.info("testDelayedEntityOwnerSelection ending");
1233 private Props newLocalShardProps() {
1234 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1237 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers,
1238 final String memberName) {
1239 return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1242 private Props newShardProps(final ShardIdentifier shardId, final Map<String,String> peers, final String memberName,
1243 final EntityOwnerSelectionStrategyConfig config) {
1244 return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1245 .withDispatcher(Dispatchers.DefaultDispatcherId());
1248 private EntityOwnershipShard.Builder newShardBuilder(final ShardIdentifier shardId, final Map<String, String> peers,
1249 final String memberName) {
1250 return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1251 dataStoreContextBuilder.build()).schemaContextProvider(() -> SCHEMA_CONTEXT).localMemberName(
1252 MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1253 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1256 private Map<String, String> peerMap(final String... peerIds) {
1257 ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1258 for (String peerId: peerIds) {
1259 builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1262 return builder.build();
1265 private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1266 private final ActorRef collectorActor;
1267 private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1269 TestEntityOwnershipShard(final Builder builder, final ActorRef collectorActor) {
1271 this.collectorActor = collectorActor;
1274 @SuppressWarnings({ "unchecked", "rawtypes" })
1276 public void handleCommand(final Object message) {
1277 Predicate drop = dropMessagesOfType.get(message.getClass());
1278 if (drop == null || !drop.test(message)) {
1279 super.handleCommand(message);
1282 if (collectorActor != null) {
1283 collectorActor.tell(message, ActorRef.noSender());
1287 void startDroppingMessagesOfType(final Class<?> msgClass) {
1288 dropMessagesOfType.put(msgClass, msg -> true);
1291 <T> void startDroppingMessagesOfType(final Class<T> msgClass, final Predicate<T> filter) {
1292 dropMessagesOfType.put(msgClass, filter);
1295 void stopDroppingMessagesOfType(final Class<?> msgClass) {
1296 dropMessagesOfType.remove(msgClass);
1299 ActorRef collectorActor() {
1300 return collectorActor;
1303 static Props props(final Builder builder) {
1304 return props(builder, null);
1307 static Props props(final Builder builder, final ActorRef collectorActor) {
1308 return Props.create(TestEntityOwnershipShard.class, builder, collectorActor)
1309 .withDispatcher(Dispatchers.DefaultDispatcherId());