import akka.actor.UntypedActor;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
+import com.google.common.base.Function;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
kit.expectMsgClass(SuccessReply.class);
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
peer.underlyingActor().grantVote = true;
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
// Resume AppendEntries - the follower should ack the commit which should then result in the candidate
// write being applied to the state.
follower.dropAppendEntries = false;
+
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
// Resume AppendEntries - the candidate write should now be committed.
follower.dropAppendEntries = false;
verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
+
+ verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
}
@Test
return null;
}
+ private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
+ String localMemberName) {
+ verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
+ @Override
+ public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
+ try {
+ return AbstractShardTest.readStore(shard, path);
+ } catch(Exception e) {
+ return null;
+ }
+ }
+ });
+ }
+
private Props newShardProps() {
return newShardProps(Collections.<String,String>emptyMap());
}