+ @Test
+ public void testOnUnregisterCandidateLocal() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ kit.waitUntilLeader(shard);
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+
+ // Register
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+ // Unregister
+
+ reset(candidate);
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, "");
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Register again
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+ }
+
+ @Test
+ public void testOwnershipChanges() throws Exception {
+ ShardTestKit kit = new ShardTestKit(getSystem());
+ TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
+ kit.waitUntilLeader(shard);
+
+ Entity entity = new Entity(ENTITY_TYPE, ENTITY_ID1);
+ EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
+ ShardDataTree shardDataTree = shard.underlyingActor().getDataStore();
+
+ // Add a remote candidate
+
+ String remoteMemberName1 = "remoteMember1";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ // Register local
+
+ shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ // Verify the remote candidate becomes owner
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Add another remote candidate and verify ownership doesn't change
+
+ reset(candidate);
+ String remoteMemberName2 = "remoteMember2";
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Remove the second remote candidate and verify ownership doesn't change
+
+ reset(candidate);
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Remove the first remote candidate and verify the local candidate becomes owner
+
+ reset(candidate);
+ deleteNode(candidatePath(ENTITY_TYPE, ENTITY_ID1, remoteMemberName1), shardDataTree);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName1);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
+
+ // Add the second remote candidate back and verify ownership doesn't change
+
+ reset(candidate);
+ writeNode(ENTITY_OWNERS_PATH, entityOwnersWithCandidate(ENTITY_TYPE, ENTITY_ID1, remoteMemberName2), shardDataTree);
+
+ verifyCommittedEntityCandidate(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verify(candidate, never()).ownershipChanged(any(Entity.class), anyBoolean(), anyBoolean());
+
+ // Unregister the local candidate and verify the second remote candidate becomes owner
+
+ shard.tell(new UnregisterCandidateLocal(candidate, entity), kit.getRef());
+ kit.expectMsgClass(SuccessReply.class);
+
+ verifyEntityCandidateRemoved(shard, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
+ verifyOwner(shard, ENTITY_TYPE, ENTITY_ID1, remoteMemberName2);
+ }
+
+ private void verifyEntityCandidateRemoved(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ verifyNodeRemoved(candidatePath(entityType, entityId, candidateName),
+ new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ throw new AssertionError("Failed to read " + path, e);
+ }
+ }
+ });
+ }
+
+ private void verifyCommittedEntityCandidate(final TestActorRef<EntityOwnershipShard> shard, String entityType,
+ YangInstanceIdentifier entityId, String candidateName) {
+ verifyEntityCandidate(entityType, entityId, candidateName, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ throw new AssertionError("Failed to read " + path, e);
+ }
+ }
+ });