import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
@Mock
private EntityOwnershipListener follower2MockListener;
+ @Mock
+ private EntityOwnershipListener member1Listener, member2Listener, member3Listener;
+
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
verifyOwner(follower1Node.configDataStore(), ENTITY2, "member-3");
}
+ /**
+ * Tests if candidate(s) get unregistered for restarted nodes, in case majority of nodes restart including the node
+ * that was leader for entity-ownership shard.
+ * Tests for the bug <a href="https://bugs.opendaylight.org/show_bug.cgi?id=5613>5613</a>
+ * @throws Exception
+ */
+ @Test
+ public void testCandidatesRemovedOnRestartOfMajorityNodes() throws Exception {
+ followerDatastoreContextBuilder.shardElectionTimeoutFactor(5).
+ customRaftPolicyImplementation(DisableElectionsRaftPolicy.class.getName());
+
+ String name = "test";
+ MemberNode member1Node = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+
+ MemberNode member2Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ MemberNode member3Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ DistributedDataStore member1DistributedDataStore = member1Node.configDataStore();
+
+ member1DistributedDataStore.waitTillReady();
+ member2Node.configDataStore().waitTillReady();
+ member3Node.configDataStore().waitTillReady();
+
+ EntityOwnershipService member1EntityOwnershipService = newOwnershipService(member1DistributedDataStore);
+ EntityOwnershipService member2EntityOwnershipService = newOwnershipService(member2Node.configDataStore());
+ EntityOwnershipService member3EntityOwnershipService = newOwnershipService(member3Node.configDataStore());
+ member1EntityOwnershipService.registerListener(ENTITY_TYPE1, member1Listener);
+ member2EntityOwnershipService.registerListener(ENTITY_TYPE1, member2Listener);
+ member3EntityOwnershipService.registerListener(ENTITY_TYPE1, member3Listener);
+
+
+ member1Node.kit().waitUntilLeader(member1Node.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ // Register follower1 candidate for entity1 and verify it becomes owner
+
+ member2EntityOwnershipService.registerCandidate(ENTITY1);
+ verifyOwner(member1DistributedDataStore, ENTITY1, "member-2");
+ verify(member2Listener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
+
+
+ // Register leader candidate for entity1
+
+ member1EntityOwnershipService.registerCandidate(ENTITY1);
+ member3EntityOwnershipService.registerCandidate(ENTITY1);
+ verifyCandidates(member1DistributedDataStore, ENTITY1, "member-2", "member-1", "member-3");
+ verifyOwner(member1DistributedDataStore, ENTITY1, "member-2");
+ verify(member2Listener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, true, true));
+ verify(member1Listener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+ verify(member3Listener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1, false, false, true));
+
+ // Stop shard leader and a owner of ENTITY1 (majority of nodes)
+ member1Node.cleanup();
+ member2Node.cleanup();
+
+ InMemoryJournal.get("member-1-shard-entity-ownership-config_test").clear();
+ InMemoryJournal.get("member-2-shard-entity-ownership-config_test").clear();
+
+
+
+ // wait for peer downs
+ member3Node.waitForMemberDown("member-1");
+ member3Node.waitForMemberDown("member-2");
+
+ // this is needed as the peer-up is not received in this integration test framework
+ member3Node.cleanup();
+ Mockito.reset(member3Listener);
+
+ member3Node = MemberNode.builder(memberNodes).akkaConfig("Member3").testName(name).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(leaderDatastoreContextBuilder).build();
+ member3EntityOwnershipService = newOwnershipService(member3Node.configDataStore());
+ member3EntityOwnershipService.registerListener(ENTITY_TYPE1, member3Listener);
+ member3EntityOwnershipService.registerCandidate(ENTITY1);
+
+ // re-start shard leader and owner of ENTITY1
+ member1Node = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ member2Node = MemberNode.builder(memberNodes).akkaConfig("Member2").testName(name).
+ moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false).
+ datastoreContextBuilder(followerDatastoreContextBuilder).build();
+
+ member1DistributedDataStore = member1Node.configDataStore();
+ member1EntityOwnershipService = newOwnershipService(member1DistributedDataStore);
+ member2EntityOwnershipService = newOwnershipService(member2Node.configDataStore());
+ member1EntityOwnershipService.registerListener(ENTITY_TYPE1, member1Listener);
+ member2EntityOwnershipService.registerListener(ENTITY_TYPE1, member2Listener);
+
+ // wait for peers to come up
+ member3Node.waitForMembersUp("member-1", "member-2");
+ member1Node.waitForMembersUp("member-3");
+
+ member3Node.kit().waitUntilLeader(member3Node.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
+
+ // wait for new leader to consume all modifications
+ Uninterruptibles.sleepUninterruptibly(5000, TimeUnit.MILLISECONDS);
+ // confirm that only member-3 is the candidate now (as 1 and 2 are restarted)
+ verifyCandidates(member3Node.configDataStore(), ENTITY1, "member-3");
+ Uninterruptibles.sleepUninterruptibly(5000, TimeUnit.MILLISECONDS);
+
+ // reset the listeners before registering for candidates again
+ Mockito.reset(member2Listener);
+ Mockito.reset(member1Listener);
+ member1EntityOwnershipService.registerCandidate(ENTITY1);
+ member2EntityOwnershipService.registerCandidate(ENTITY1);
+ // confirm that all are candidates again
+ verifyCandidates(member3Node.configDataStore(), ENTITY1, "member-3", "member-1", "member-2");
+ // confirm that 1 and 2 get event after they register for candidates again
+ verify(member2Listener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1));
+ verify(member1Listener, timeout(5000)).ownershipChanged(ownershipChange(ENTITY1));
+ }
+
/**
* Reproduces bug <a href="https://bugs.opendaylight.org/show_bug.cgi?id=4554">4554</a>
*
* @throws CandidateAlreadyRegisteredException
*/
- @Test
+
public void testCloseCandidateRegistrationInQuickSuccession() throws CandidateAlreadyRegisteredException {
String name = "testCloseCandidateRegistrationInQuickSuccession";
MemberNode leaderNode = MemberNode.builder(memberNodes).akkaConfig("Member1").testName(name ).
entityPath(entity.getType(), entity.getId()).node(Candidate.QNAME)).get(5, TimeUnit.SECONDS);
try {
assertEquals("Candidates not found for " + entity, true, possible.isPresent());
- Collection<String> actual = new ArrayList<>();
+ Collection<String> actual = new HashSet<>();
for(MapEntryNode candidate: ((MapNode)possible.get()).getValue()) {
actual.add(candidate.getChild(CANDIDATE_NAME_NODE_ID).get().getValue().toString());
}
- assertEquals("Candidates for " + entity, Arrays.asList(expCandidates), actual);
+ assertEquals("Candidates for " + entity, new HashSet(Arrays.asList(expCandidates)), actual);
return;
} catch (AssertionError e) {
lastError = e;
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+ peer1.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
// Add candidates for entity2 with peerMember2 as the owner
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
+ peer1.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName1), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName1);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID2, peerMemberName2);
// Add candidates for entity3 with peerMember2 as the owner.
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, LOCAL_MEMBER_NAME);
+ peer1.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName1), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName1);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID3, peerMemberName2);
// Add only candidate peerMember2 for entity4.
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID4)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID4, peerMemberName2), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, peerMemberName2);
// Add only candidate peerMember1 for entity5.
+ peer1.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID5)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID5, peerMemberName1), kit);
verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID5, peerMemberName1);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
verifyOwner(leader, ENTITY_TYPE, ENTITY_ID4, "");
- // Add back candidate peerMember2 for entities 1, 2, & 3.
+ peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
+ peer2.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
+ peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
+ peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
+ // Add back candidate peerMember2 for entities 1, 2, & 3.
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID3)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID2, peerMemberName2), kit);
commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID3, peerMemberName2), kit);
// Kill the local leader and elect peer2 the leader. This should cause a new owner to be selected for
// the entities (1 and 3) previously owned by the local leader member.
- peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
- peer2.tell(new PeerUp(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
- peer2.tell(new PeerUp(peerMemberName1, peerId1.toString()), ActorRef.noSender());
leader.tell(PoisonPill.getInstance(), ActorRef.noSender());
peer2.tell(new PeerDown(LOCAL_MEMBER_NAME, leaderId.toString()), ActorRef.noSender());
verifyOwner(peer2, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
}
+ @Test
+ public void testCandidateRemovedWhenCandidateNotRegisteredLocally() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+
+ dataStoreContextBuilder.shardHeartbeatIntervalInMillis(500).shardElectionTimeoutFactor(10000);
+
+ String peerMemberName1 = "peerMember1";
+ String peerMemberName2 = "peerMember2";
+
+ ShardIdentifier leaderId = newShardId(LOCAL_MEMBER_NAME);
+ ShardIdentifier peerId1 = newShardId(peerMemberName1);
+ ShardIdentifier peerId2 = newShardId(peerMemberName2);
+
+ TestActorRef<EntityOwnershipShard> peer1 = actorFactory.createTestActor(newShardProps(peerId1,
+ ImmutableMap.<String, String>builder().put(leaderId.toString(), "").put(peerId2.toString(), "").build(),
+ peerMemberName1, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId1.toString());
+
+ TestActorRef<EntityOwnershipShard> peer2 = actorFactory.createTestActor(newShardProps(peerId2,
+ ImmutableMap.<String, String>builder().put(leaderId.toString(), "").put(peerId1.toString(), "").build(),
+ peerMemberName2, EntityOwnerSelectionStrategyConfig.newBuilder().build()).withDispatcher(Dispatchers.DefaultDispatcherId()), peerId2.toString());
+
+ TestActorRef<EntityOwnershipShard> leader = actorFactory.createTestActor(newShardProps(leaderId,
+ ImmutableMap.<String, String>builder().put(peerId1.toString(), peer1.path().toString()).
+ put(peerId2.toString(), peer2.path().toString()).build(), LOCAL_MEMBER_NAME, EntityOwnerSelectionStrategyConfig.newBuilder().build()).
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderId.toString());
+ leader.tell(new ElectionTimeout(), leader);
+
+ kit.waitUntilLeader(leader);
+
+ peer2.tell(new PeerAddressResolved(peerId1.toString(), peer1.path().toString()), ActorRef.noSender());
+ peer2.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
+ peer1.tell(new PeerAddressResolved(peerId2.toString(), peer2.path().toString()), ActorRef.noSender());
+ peer1.tell(new PeerAddressResolved(leaderId.toString(), leader.path().toString()), ActorRef.noSender());
+
+ leader.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+ peer2.tell(new RegisterCandidateLocal(new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+ commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName2), kit);
+ verifyCommittedEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName2);
+
+ // Try adding peerMemberName1 as candidate without registering locally
+ commitModification(leader, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, peerMemberName1), kit);
+
+ verifyOwner(leader, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+
+ // confirm peerMemberName1 is not candidate as was not registered locally
+ verifyNoEntityCandidate(leader, ENTITY_TYPE, ENTITY_ID1, peerMemberName1);
+ }
+
+
@Test
public void testLocalCandidateRemovedWithCandidateRegistered() throws Exception {
ShardTestKit kit = new ShardTestKit(getSystem());