2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.AdditionalMatchers.or;
12 import static org.mockito.Matchers.any;
13 import static org.mockito.Mockito.mock;
14 import static org.mockito.Mockito.never;
15 import static org.mockito.Mockito.reset;
16 import static org.mockito.Mockito.timeout;
17 import static org.mockito.Mockito.times;
18 import static org.mockito.Mockito.verify;
19 import static org.mockito.Mockito.verifyNoMoreInteractions;
20 import akka.actor.ActorRef;
21 import akka.actor.PoisonPill;
22 import akka.actor.Props;
23 import akka.actor.Terminated;
24 import akka.dispatch.Dispatchers;
25 import akka.testkit.JavaTestKit;
26 import akka.testkit.TestActorRef;
27 import com.google.common.collect.ImmutableMap;
28 import com.google.common.util.concurrent.Uninterruptibles;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.List;
33 import java.util.concurrent.ConcurrentHashMap;
34 import java.util.concurrent.TimeUnit;
35 import java.util.function.Predicate;
36 import org.junit.After;
37 import org.junit.Test;
38 import org.opendaylight.controller.cluster.access.concepts.MemberName;
39 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
40 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
41 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
42 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
43 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterListenerLocal;
44 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterCandidateLocal;
45 import org.opendaylight.controller.cluster.datastore.entityownership.messages.UnregisterListenerLocal;
46 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
47 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.LastCandidateSelectionStrategy;
48 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
49 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
50 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
51 import org.opendaylight.controller.cluster.datastore.messages.PeerDown;
52 import org.opendaylight.controller.cluster.datastore.messages.PeerUp;
53 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
54 import org.opendaylight.controller.cluster.raft.RaftState;
55 import org.opendaylight.controller.cluster.raft.TestActorFactory;
56 import org.opendaylight.controller.cluster.raft.base.messages.ElectionTimeout;
57 import org.opendaylight.controller.cluster.raft.base.messages.TimeoutNow;
58 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
59 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
60 import org.opendaylight.controller.cluster.raft.utils.MessageCollectorActor;
61 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
62 import org.opendaylight.mdsal.eos.dom.api.DOMEntity;
63 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipChange;
64 import org.opendaylight.mdsal.eos.dom.api.DOMEntityOwnershipListener;
65 import org.opendaylight.yangtools.yang.common.QName;
66 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
67 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
70 * Unit tests for EntityOwnershipShard.
72 * @author Thomas Pantelis
74 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
75 private static final String ENTITY_TYPE = "test type";
76 private static final YangInstanceIdentifier ENTITY_ID1 =
77 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
78 private static final YangInstanceIdentifier ENTITY_ID2 =
79 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
80 private static final YangInstanceIdentifier ENTITY_ID3 =
81 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity3"));
82 private static final YangInstanceIdentifier ENTITY_ID4 =
83 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity4"));
84 private static final YangInstanceIdentifier ENTITY_ID5 =
85 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity5"));
86 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
87 private static final String LOCAL_MEMBER_NAME = "local-member-1";
88 private static final String PEER_MEMBER_1_NAME = "peer-member-1";
89 private static final String PEER_MEMBER_2_NAME = "peer-member-2";
91 private Builder dataStoreContextBuilder = DatastoreContext.newBuilder().persistent(false);
92 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
95 public void tearDown() {
100 public void testOnRegisterCandidateLocal() throws Exception {
101 testLog.info("testOnRegisterCandidateLocal starting");
103 ShardTestKit kit = new ShardTestKit(getSystem());
105 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
107 ShardTestKit.waitUntilLeader(shard);
109 YangInstanceIdentifier entityId = ENTITY_ID1;
110 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
112 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
113 kit.expectMsgClass(SuccessReply.class);
115 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
116 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
118 testLog.info("testOnRegisterCandidateLocal ending");
122 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
123 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader starting");
125 ShardTestKit kit = new ShardTestKit(getSystem());
127 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
129 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
130 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
132 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
133 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
134 TestEntityOwnershipShard peerShard = peer.underlyingActor();
135 peerShard.startDroppingMessagesOfType(RequestVote.class);
136 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
138 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(
139 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
141 YangInstanceIdentifier entityId = ENTITY_ID1;
142 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
144 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
145 kit.expectMsgClass(SuccessReply.class);
147 // Now allow RequestVotes to the peer so the shard becomes the leader. This should retry the commit.
148 peerShard.stopDroppingMessagesOfType(RequestVote.class);
150 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
151 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
153 testLog.info("testOnRegisterCandidateLocalWithNoInitialLeader ending");
157 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
158 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus starting");
160 ShardTestKit kit = new ShardTestKit(getSystem());
162 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
163 shardTransactionCommitTimeoutInSeconds(1);
165 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
166 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
168 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
169 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
170 TestEntityOwnershipShard peerShard = peer.underlyingActor();
171 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
173 // Drop AppendEntries so consensus isn't reached.
174 peerShard.startDroppingMessagesOfType(AppendEntries.class);
176 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
177 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
179 ShardTestKit.waitUntilLeader(leader);
181 YangInstanceIdentifier entityId = ENTITY_ID1;
182 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
184 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
185 kit.expectMsgClass(SuccessReply.class);
187 // Wait enough time for the commit to timeout.
188 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
190 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
191 // write being applied to the state.
192 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
194 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
195 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
197 testLog.info("testOnRegisterCandidateLocalWithNoInitialConsensus ending");
201 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
202 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader starting");
204 ShardTestKit kit = new ShardTestKit(getSystem());
206 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
207 shardIsolatedLeaderCheckIntervalInMillis(50);
209 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
210 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
212 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
213 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
214 TestEntityOwnershipShard peerShard = peer.underlyingActor();
215 peerShard.startDroppingMessagesOfType(ElectionTimeout.class);
217 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
218 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME));
220 ShardTestKit.waitUntilLeader(leader);
222 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
223 peerShard.startDroppingMessagesOfType(AppendEntries.class);
224 verifyRaftState(leader, state ->
225 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
227 YangInstanceIdentifier entityId = ENTITY_ID1;
228 DOMEntity entity = new DOMEntity(ENTITY_TYPE, entityId);
230 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
231 kit.expectMsgClass(SuccessReply.class);
233 // Resume AppendEntries - the candidate write should now be committed.
234 peerShard.stopDroppingMessagesOfType(AppendEntries.class);
235 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
236 verifyOwner(leader, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
238 testLog.info("testOnRegisterCandidateLocalWithIsolatedLeader ending");
242 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
243 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader starting");
245 ShardTestKit kit = new ShardTestKit(getSystem());
247 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
248 shardBatchedModificationCount(5);
250 ShardIdentifier leaderId = newShardId(PEER_MEMBER_1_NAME);
251 ShardIdentifier localId = newShardId(LOCAL_MEMBER_NAME);
252 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
253 newShardBuilder(leaderId, peerMap(localId.toString()), PEER_MEMBER_1_NAME),
254 actorFactory.createTestActor(MessageCollectorActor.props())), leaderId.toString());
255 TestEntityOwnershipShard leaderShard = leader.underlyingActor();
257 TestActorRef<TestEntityOwnershipShard> local = actorFactory.createTestActor(TestEntityOwnershipShard.props(
258 newShardBuilder(localId, peerMap(leaderId.toString()),LOCAL_MEMBER_NAME)), localId.toString());
259 local.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
261 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
262 kit.expectMsgClass(SuccessReply.class);
264 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
265 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
267 // Test with initial commit timeout and subsequent retry.
269 local.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
270 leaderShard.startDroppingMessagesOfType(BatchedModifications.class);
272 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
273 kit.expectMsgClass(SuccessReply.class);
275 MessageCollectorActor.expectFirstMatching(leaderShard.collectorActor(), BatchedModifications.class);
277 // Send a bunch of registration messages quickly and verify.
279 leaderShard.stopDroppingMessagesOfType(BatchedModifications.class);
280 MessageCollectorActor.clearMessages(leaderShard.collectorActor());
283 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
284 for(int i = 1; i <= max; i++) {
285 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
287 local.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, id)), kit.getRef());
290 for(int i = 0; i < max; i++) {
291 verifyCommittedEntityCandidate(local, ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
294 testLog.info("testOnRegisterCandidateLocalWithRemoteLeader ending");
298 public void testOnUnregisterCandidateLocal() throws Exception {
299 testLog.info("testOnUnregisterCandidateLocal starting");
301 ShardTestKit kit = new ShardTestKit(getSystem());
302 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newLocalShardProps());
303 ShardTestKit.waitUntilLeader(shard);
305 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
309 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
310 kit.expectMsgClass(SuccessReply.class);
312 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
313 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
317 shard.tell(new UnregisterCandidateLocal(entity), kit.getRef());
318 kit.expectMsgClass(SuccessReply.class);
320 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
324 shard.tell(new RegisterCandidateLocal(entity), kit.getRef());
325 kit.expectMsgClass(SuccessReply.class);
327 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
328 verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
330 testLog.info("testOnUnregisterCandidateLocal ending");
334 public void testOwnershipChanges() throws Exception {
335 testLog.info("testOwnershipChanges starting");
337 ShardTestKit kit = new ShardTestKit(getSystem());
339 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
341 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
342 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
343 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
345 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
346 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
348 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
350 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
351 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
353 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
355 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
356 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
357 leaderId.toString());
359 ShardTestKit.waitUntilLeader(leader);
361 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
363 // Add a remote candidate
365 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
366 kit.expectMsgClass(SuccessReply.class);
368 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
372 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
373 kit.expectMsgClass(SuccessReply.class);
375 // Verify the remote candidate becomes owner
377 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
378 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
380 // Add another remote candidate and verify ownership doesn't change
382 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
383 kit.expectMsgClass(SuccessReply.class);
385 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
386 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
387 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
389 // Remove the second remote candidate and verify ownership doesn't change
391 peer2.tell(new UnregisterCandidateLocal(entity), kit.getRef());
392 kit.expectMsgClass(SuccessReply.class);
394 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
395 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
396 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
398 // Remove the first remote candidate and verify the local candidate becomes owner
400 peer1.tell(new UnregisterCandidateLocal(entity), kit.getRef());
401 kit.expectMsgClass(SuccessReply.class);
403 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
404 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
406 // Add the second remote candidate back and verify ownership doesn't change
408 peer2.tell(new RegisterCandidateLocal(entity), kit.getRef());
409 kit.expectMsgClass(SuccessReply.class);
411 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
412 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
413 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
415 // Unregister the local candidate and verify the second remote candidate becomes owner
417 leader.tell(new UnregisterCandidateLocal(entity), kit.getRef());
418 kit.expectMsgClass(SuccessReply.class);
420 verifyEntityCandidateRemoved(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
421 verifyOwner(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_2_NAME);
423 testLog.info("testOwnershipChanges ending");
427 public void testOwnerChangesOnPeerAvailabilityChanges() throws Exception {
428 testLog.info("testOwnerChangesOnPeerAvailabilityChanges starting");
430 ShardTestKit kit = new ShardTestKit(getSystem());
432 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
433 shardIsolatedLeaderCheckIntervalInMillis(100000);
435 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
436 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
437 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
439 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
440 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
442 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
444 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
445 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
447 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
449 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
450 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME),
451 leaderId.toString());
453 verifyRaftState(leader, state ->
454 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
456 // Send PeerDown and PeerUp with no entities
458 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
459 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
461 // Add candidates for entity1 with the local leader as the owner
463 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
464 kit.expectMsgClass(SuccessReply.class);
465 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
467 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
468 kit.expectMsgClass(SuccessReply.class);
469 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
471 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
472 kit.expectMsgClass(SuccessReply.class);
473 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
474 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
476 // Add candidates for entity2 with peerMember2 as the owner
478 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
479 kit.expectMsgClass(SuccessReply.class);
480 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
482 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
483 kit.expectMsgClass(SuccessReply.class);
484 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
485 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
487 // Add candidates for entity3 with peerMember2 as the owner.
489 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
490 kit.expectMsgClass(SuccessReply.class);
491 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
493 leader.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
494 kit.expectMsgClass(SuccessReply.class);
495 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
497 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
498 kit.expectMsgClass(SuccessReply.class);
499 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
500 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
502 // Add only candidate peerMember2 for entity4.
504 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
505 kit.expectMsgClass(SuccessReply.class);
506 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
507 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
509 // Add only candidate peerMember1 for entity5.
511 peer1.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
512 kit.expectMsgClass(SuccessReply.class);
513 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
514 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, PEER_MEMBER_1_NAME);
516 // Kill peerMember2 and send PeerDown - the entities (2, 3, 4) owned by peerMember2 should get a new
520 peer2.tell(PoisonPill.getInstance(), ActorRef.noSender());
521 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
524 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
525 // Send PeerDown again - should be noop
526 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
527 peer1.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
529 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
530 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
531 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
532 // no other candidates for entity4 so peerMember2 should remain owner.
533 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
535 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
536 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
537 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
538 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
540 // Reinstate peerMember2
542 peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
543 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
545 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
546 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
547 // Send PeerUp again - should be noop
548 leader.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
549 peer1.tell(new PeerUp(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
551 // peerMember2's candidates should be removed on startup.
552 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
553 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
554 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
555 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, PEER_MEMBER_2_NAME);
557 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
558 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
559 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
560 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
562 // Add back candidate peerMember2 for entities 1, 2, & 3.
564 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
565 kit.expectMsgClass(SuccessReply.class);
566 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
567 kit.expectMsgClass(SuccessReply.class);
568 peer2.tell(new RegisterCandidateLocal(new DOMEntity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
569 kit.expectMsgClass(SuccessReply.class);
570 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
571 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
572 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
573 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
574 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
575 verifyCommittedEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
576 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
577 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
578 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
579 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
580 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
581 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
582 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
584 // Kill peerMember1 and send PeerDown - entity 2 should get a new owner selected
587 peer1.tell(PoisonPill.getInstance(), ActorRef.noSender());
588 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
590 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
592 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
594 // Verify the reinstated peerMember2 is fully synced.
596 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
597 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
598 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
599 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
601 // Reinstate peerMember1 and verify no owner changes
603 peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(newShardBuilder(
604 peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)), peerId1.toString());
605 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
606 leader.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
608 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
609 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
610 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
611 verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
613 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
614 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
615 verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
617 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_1_NAME);
618 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_1_NAME);
619 verifyNoEntityCandidate(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_1_NAME);
621 // Verify the reinstated peerMember1 is fully synced.
623 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
624 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
625 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
626 verifyOwner(peer1, ENTITY_TYPE, ENTITY_ID4, "");
628 // Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
629 // the entities (1 and 3) previously owned by the local leader member.
631 peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
632 peer2.tell(new PeerUp(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
633 peer2.tell(new PeerUp(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
636 leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
637 kit.expectMsgClass(JavaTestKit.duration("5 seconds"), Terminated.class);
639 peer2.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
640 peer2.tell(TimeoutNow.INSTANCE, peer2);
642 verifyRaftState(peer2, state ->
643 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
645 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, PEER_MEMBER_2_NAME);
646 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID2, PEER_MEMBER_2_NAME);
647 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID3, PEER_MEMBER_2_NAME);
648 verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID4, "");
650 testLog.info("testOwnerChangesOnPeerAvailabilityChanges ending");
654 public void testLeaderIsolation() throws Exception {
655 testLog.info("testLeaderIsolation starting");
657 ShardTestKit kit = new ShardTestKit(getSystem());
659 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
660 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
661 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
663 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(4).
664 shardIsolatedLeaderCheckIntervalInMillis(100000);
666 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
667 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
669 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
671 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
672 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
674 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
676 dataStoreContextBuilder = DatastoreContext.newBuilderFrom(dataStoreContextBuilder.build()).
677 shardIsolatedLeaderCheckIntervalInMillis(500);
679 TestActorRef<TestEntityOwnershipShard> leader = actorFactory.createTestActor(TestEntityOwnershipShard.props(
680 newShardBuilder(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME)),
681 leaderId.toString());
683 ShardTestKit.waitUntilLeader(leader);
685 // Add entity1 candidates for all members with the leader as the owner
687 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
688 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
689 kit.expectMsgClass(SuccessReply.class);
690 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
692 peer1.tell(new RegisterCandidateLocal(entity1), kit.getRef());
693 kit.expectMsgClass(SuccessReply.class);
694 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
696 peer2.tell(new RegisterCandidateLocal(entity1), kit.getRef());
697 kit.expectMsgClass(SuccessReply.class);
698 verifyCommittedEntityCandidate(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_2_NAME);
700 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
701 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
702 verifyOwner(peer2, entity1.getType(), entity1.getIdentifier(), LOCAL_MEMBER_NAME);
704 // Add entity2 candidates for all members with peer1 as the owner
706 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
707 peer1.tell(new RegisterCandidateLocal(entity2), kit.getRef());
708 kit.expectMsgClass(SuccessReply.class);
709 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
711 peer2.tell(new RegisterCandidateLocal(entity2), kit.getRef());
712 kit.expectMsgClass(SuccessReply.class);
713 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_2_NAME);
715 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
716 kit.expectMsgClass(SuccessReply.class);
717 verifyCommittedEntityCandidate(leader, entity2.getType(), entity2.getIdentifier(), LOCAL_MEMBER_NAME);
719 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
720 verifyOwner(peer1, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
721 verifyOwner(peer2, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
723 // Add entity3 candidates for all members with peer2 as the owner
725 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
726 peer2.tell(new RegisterCandidateLocal(entity3), kit.getRef());
727 kit.expectMsgClass(SuccessReply.class);
728 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
730 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
731 kit.expectMsgClass(SuccessReply.class);
732 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), LOCAL_MEMBER_NAME);
734 peer1.tell(new RegisterCandidateLocal(entity3), kit.getRef());
735 kit.expectMsgClass(SuccessReply.class);
736 verifyCommittedEntityCandidate(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_1_NAME);
738 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
739 verifyOwner(peer1, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
740 verifyOwner(peer2, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
742 // Add listeners on all members
744 DOMEntityOwnershipListener leaderListener = mock(DOMEntityOwnershipListener.class);
745 leader.tell(new RegisterListenerLocal(leaderListener, ENTITY_TYPE), kit.getRef());
746 kit.expectMsgClass(SuccessReply.class);
747 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, true, true),
748 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
749 reset(leaderListener);
751 DOMEntityOwnershipListener peer1Listener = mock(DOMEntityOwnershipListener.class);
752 peer1.tell(new RegisterListenerLocal(peer1Listener, ENTITY_TYPE), kit.getRef());
753 kit.expectMsgClass(SuccessReply.class);
754 verify(peer1Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
755 ownershipChange(entity2, false, true, true)), ownershipChange(entity3, false, false, true)));
756 reset(peer1Listener);
758 DOMEntityOwnershipListener peer2Listener = mock(DOMEntityOwnershipListener.class);
759 peer2.tell(new RegisterListenerLocal(peer2Listener, ENTITY_TYPE), kit.getRef());
760 kit.expectMsgClass(SuccessReply.class);
761 verify(peer2Listener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, false, false, true),
762 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, true, true)));
763 reset(peer2Listener);
765 // Isolate the leader by dropping AppendEntries to the followers and incoming messages from the followers.
767 leader.underlyingActor().startDroppingMessagesOfType(RequestVote.class);
768 leader.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
770 peer2.underlyingActor().startDroppingMessagesOfType(AppendEntries.class,
771 ae -> ae.getLeaderId().equals(leaderId.toString()));
772 peer1.underlyingActor().startDroppingMessagesOfType(AppendEntries.class);
774 // Make peer1 start an election and become leader by enabling the ElectionTimeout message.
776 peer1.underlyingActor().stopDroppingMessagesOfType(ElectionTimeout.class);
778 // Send PeerDown to the isolated leader so it tries to re-assign ownership for the entities owned by the
781 leader.tell(new PeerDown(peerId1.getMemberName(), peerId1.toString()), ActorRef.noSender());
782 leader.tell(new PeerDown(peerId2.getMemberName(), peerId2.toString()), ActorRef.noSender());
784 verifyRaftState(leader, state ->
785 assertEquals("getRaftState", RaftState.IsolatedLeader.toString(), state.getRaftState()));
787 // Expect inJeopardy notification on the isolated leader.
789 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true, true),
790 ownershipChange(entity2, false, false, true, true)), ownershipChange(entity3, false, false, true, true)));
791 reset(leaderListener);
793 verifyRaftState(peer1, state ->
794 assertEquals("getRaftState", RaftState.Leader.toString(), state.getRaftState()));
796 // Send PeerDown to the new leader peer1 so it re-assigns ownership for the entities owned by the
799 peer1.tell(new PeerDown(leaderId.getMemberName(), leaderId.toString()), ActorRef.noSender());
801 verifyOwner(peer1, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
803 verify(peer1Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
804 reset(peer1Listener);
806 verify(peer2Listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, false, true));
807 reset(peer2Listener);
809 // Remove the isolation.
811 leader.underlyingActor().stopDroppingMessagesOfType(RequestVote.class);
812 leader.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
813 peer2.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
814 peer1.underlyingActor().stopDroppingMessagesOfType(AppendEntries.class);
816 // Previous leader should switch to Follower and send inJeopardy cleared notifications for all entities.
818 verifyRaftState(leader, state ->
819 assertEquals("getRaftState", RaftState.Follower.toString(), state.getRaftState()));
821 verify(leaderListener, timeout(5000).times(3)).ownershipChanged(or(or(ownershipChange(entity1, true, true, true),
822 ownershipChange(entity2, false, false, true)), ownershipChange(entity3, false, false, true)));
824 verifyOwner(leader, entity1.getType(), entity1.getIdentifier(), PEER_MEMBER_1_NAME);
825 verify(leaderListener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
827 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
828 verifyOwner(leader, entity2.getType(), entity2.getIdentifier(), PEER_MEMBER_1_NAME);
829 verifyOwner(leader, entity3.getType(), entity3.getIdentifier(), PEER_MEMBER_2_NAME);
831 verifyNoMoreInteractions(leaderListener);
832 verifyNoMoreInteractions(peer1Listener);
833 verifyNoMoreInteractions(peer2Listener);
835 testLog.info("testLeaderIsolation ending");
839 public void testListenerRegistration() throws Exception {
840 testLog.info("testListenerRegistration starting");
842 ShardTestKit kit = new ShardTestKit(getSystem());
844 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
845 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
847 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
848 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
849 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
851 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
852 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME), leaderId.toString());
854 ShardTestKit.waitUntilLeader(leader);
856 String otherEntityType = "otherEntityType";
857 DOMEntity entity1 = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
858 DOMEntity entity2 = new DOMEntity(ENTITY_TYPE, ENTITY_ID2);
859 DOMEntity entity3 = new DOMEntity(ENTITY_TYPE, ENTITY_ID3);
860 DOMEntity entity4 = new DOMEntity(otherEntityType, ENTITY_ID3);
861 DOMEntityOwnershipListener listener = mock(DOMEntityOwnershipListener.class);
865 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
866 kit.expectMsgClass(SuccessReply.class);
868 // Register a couple candidates for the desired entity type and verify listener is notified.
870 leader.tell(new RegisterCandidateLocal(entity1), kit.getRef());
871 kit.expectMsgClass(SuccessReply.class);
873 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, false, true, true));
875 leader.tell(new RegisterCandidateLocal(entity2), kit.getRef());
876 kit.expectMsgClass(SuccessReply.class);
878 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity2, false, true, true));
881 // Register another candidate for another entity type and verify listener is not notified.
883 leader.tell(new RegisterCandidateLocal(entity4), kit.getRef());
884 kit.expectMsgClass(SuccessReply.class);
886 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
887 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
889 // Register remote candidate for entity1
891 peer.tell(new RegisterCandidateLocal(entity1), kit.getRef());
892 kit.expectMsgClass(SuccessReply.class);
893 verifyCommittedEntityCandidate(leader, ENTITY_TYPE, entity1.getIdentifier(), PEER_MEMBER_1_NAME);
895 // Unregister the local candidate for entity1 and verify listener is notified
897 leader.tell(new UnregisterCandidateLocal(entity1), kit.getRef());
898 kit.expectMsgClass(SuccessReply.class);
900 verify(listener, timeout(5000)).ownershipChanged(ownershipChange(entity1, true, false, true));
903 // Unregister the listener, add a candidate for entity3 and verify listener isn't notified
905 leader.tell(new UnregisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
906 kit.expectMsgClass(SuccessReply.class);
908 leader.tell(new RegisterCandidateLocal(entity3), kit.getRef());
909 kit.expectMsgClass(SuccessReply.class);
911 verifyOwner(leader, ENTITY_TYPE, entity3.getIdentifier(), LOCAL_MEMBER_NAME);
912 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
913 verify(listener, never()).ownershipChanged(any(DOMEntityOwnershipChange.class));
915 // Re-register the listener and verify it gets notified of currently owned entities
919 leader.tell(new RegisterListenerLocal(listener, ENTITY_TYPE), kit.getRef());
920 kit.expectMsgClass(SuccessReply.class);
922 verify(listener, timeout(5000).times(2)).ownershipChanged(or(ownershipChange(entity2, false, true, true),
923 ownershipChange(entity3, false, true, true)));
924 Uninterruptibles.sleepUninterruptibly(300, TimeUnit.MILLISECONDS);
925 verify(listener, never()).ownershipChanged(ownershipChange(entity4));
926 verify(listener, times(1)).ownershipChanged(ownershipChange(entity1));
928 testLog.info("testListenerRegistration ending");
932 public void testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived() throws Exception {
933 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived starting");
935 ShardTestKit kit = new ShardTestKit(getSystem());
936 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
937 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
939 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
940 ShardIdentifier peerId = newShardId(PEER_MEMBER_1_NAME);
942 TestActorRef<TestEntityOwnershipShard> peer = actorFactory.createTestActor(TestEntityOwnershipShard.props(
943 newShardBuilder(peerId, peerMap(leaderId.toString()), PEER_MEMBER_1_NAME)), peerId.toString());
944 peer.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
946 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
947 newShardProps(leaderId, peerMap(peerId.toString()), LOCAL_MEMBER_NAME, builder.build()), leaderId.toString());
949 ShardTestKit.waitUntilLeader(leader);
951 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
953 // Add a remote candidate
955 peer.tell(new RegisterCandidateLocal(entity), kit.getRef());
956 kit.expectMsgClass(SuccessReply.class);
960 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
961 kit.expectMsgClass(SuccessReply.class);
963 // Verify the local candidate becomes owner
965 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
966 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
967 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
969 testLog.info("testDelayedEntityOwnerSelectionWhenMaxPeerRequestsReceived ending");
973 public void testDelayedEntityOwnerSelection() throws Exception {
974 testLog.info("testDelayedEntityOwnerSelection starting");
976 ShardTestKit kit = new ShardTestKit(getSystem());
977 EntityOwnerSelectionStrategyConfig.Builder builder = EntityOwnerSelectionStrategyConfig.newBuilder().
978 addStrategy(ENTITY_TYPE, LastCandidateSelectionStrategy.class, 500);
980 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
982 ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
983 ShardIdentifier peerId1 = newShardId(PEER_MEMBER_1_NAME);
984 ShardIdentifier peerId2 = newShardId(PEER_MEMBER_2_NAME);
986 TestActorRef<TestEntityOwnershipShard> peer1 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
987 newShardBuilder(peerId1, peerMap(leaderId.toString(), peerId2.toString()), PEER_MEMBER_1_NAME)),
989 peer1.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
991 TestActorRef<TestEntityOwnershipShard> peer2 = actorFactory.createTestActor(TestEntityOwnershipShard.props(
992 newShardBuilder(peerId2, peerMap(leaderId.toString(), peerId1.toString()), PEER_MEMBER_2_NAME)),
994 peer2.underlyingActor().startDroppingMessagesOfType(ElectionTimeout.class);
996 TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(
997 newShardProps(leaderId, peerMap(peerId1.toString(), peerId2.toString()), LOCAL_MEMBER_NAME, builder.build()),
998 leaderId.toString());
1000 ShardTestKit.waitUntilLeader(leader);
1002 DOMEntity entity = new DOMEntity(ENTITY_TYPE, ENTITY_ID1);
1004 // Add a remote candidate
1006 peer1.tell(new RegisterCandidateLocal(entity), kit.getRef());
1007 kit.expectMsgClass(SuccessReply.class);
1011 leader.tell(new RegisterCandidateLocal(entity), kit.getRef());
1012 kit.expectMsgClass(SuccessReply.class);
1014 // Verify the local candidate becomes owner
1016 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), PEER_MEMBER_1_NAME);
1017 verifyCommittedEntityCandidate(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1018 verifyOwner(leader, entity.getType(), entity.getIdentifier(), LOCAL_MEMBER_NAME);
1020 testLog.info("testDelayedEntityOwnerSelection ending");
1023 private Props newLocalShardProps() {
1024 return newShardProps(newShardId(LOCAL_MEMBER_NAME), Collections.<String,String>emptyMap(), LOCAL_MEMBER_NAME);
1027 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName) {
1028 return newShardProps(shardId, peers, memberName, EntityOwnerSelectionStrategyConfig.newBuilder().build());
1031 private Props newShardProps(ShardIdentifier shardId, Map<String,String> peers, String memberName,
1032 EntityOwnerSelectionStrategyConfig config) {
1033 return newShardBuilder(shardId, peers, memberName).ownerSelectionStrategyConfig(config).props()
1034 .withDispatcher(Dispatchers.DefaultDispatcherId());
1037 private EntityOwnershipShard.Builder newShardBuilder(ShardIdentifier shardId, Map<String,String> peers,
1038 String memberName) {
1039 return EntityOwnershipShard.newBuilder().id(shardId).peerAddresses(peers).datastoreContext(
1040 dataStoreContextBuilder.build()).schemaContext(SCHEMA_CONTEXT).localMemberName(
1041 MemberName.forName(memberName)).ownerSelectionStrategyConfig(
1042 EntityOwnerSelectionStrategyConfig.newBuilder().build());
1045 private Map<String, String> peerMap(String... peerIds) {
1046 ImmutableMap.Builder<String, String> builder = ImmutableMap.<String, String>builder();
1047 for(String peerId: peerIds) {
1048 builder.put(peerId, actorFactory.createTestActorPath(peerId)).build();
1051 return builder.build();
1054 private static class TestEntityOwnershipShard extends EntityOwnershipShard {
1055 private final TestActorRef<MessageCollectorActor> collectorActor;
1056 private final Map<Class<?>, Predicate<?>> dropMessagesOfType = new ConcurrentHashMap<>();
1058 TestEntityOwnershipShard(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1060 this.collectorActor = collectorActor;
1063 @SuppressWarnings({ "unchecked", "rawtypes" })
1065 public void handleCommand(Object message) {
1066 if(collectorActor != null) {
1067 collectorActor.tell(message, ActorRef.noSender());
1070 Predicate drop = dropMessagesOfType.get(message.getClass());
1071 if(drop == null || !drop.test(message)) {
1072 super.handleCommand(message);
1076 void startDroppingMessagesOfType(Class<?> msgClass) {
1077 dropMessagesOfType.put(msgClass, msg -> true);
1080 <T> void startDroppingMessagesOfType(Class<T> msgClass, Predicate<T> filter) {
1081 dropMessagesOfType.put(msgClass, filter);
1084 void stopDroppingMessagesOfType(Class<?> msgClass) {
1085 dropMessagesOfType.remove(msgClass);
1088 TestActorRef<MessageCollectorActor> collectorActor() {
1089 return collectorActor;
1092 static Props props(Builder builder) {
1093 return props(builder, null);
1096 static Props props(Builder builder, TestActorRef<MessageCollectorActor> collectorActor) {
1097 return Props.create(TestEntityOwnershipShard.class, builder, collectorActor).
1098 withDispatcher(Dispatchers.DefaultDispatcherId());