2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.TestActorRef;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.Collections;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import org.junit.After;
27 import org.junit.Test;
28 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
29 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
32 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
33 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
34 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
38 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
39 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.TestActorFactory;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
46 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
47 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
48 import org.opendaylight.yangtools.yang.common.QName;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
54 * Unit tests for EntityOwnershipShard.
56 * @author Thomas Pantelis
58 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
59 private static final String ENTITY_TYPE = "test type";
60 private static final YangInstanceIdentifier ENTITY_ID1 =
61 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
62 private static final YangInstanceIdentifier ENTITY_ID2 =
63 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
64 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
65 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
66 private static final String LOCAL_MEMBER_NAME = "member-1";
68 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
69 .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
71 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
72 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
75 public void tearDown() {
80 public void testOnRegisterCandidateLocal() throws Exception {
81 ShardTestKit kit = new ShardTestKit(getSystem());
83 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
85 kit.waitUntilLeader(shard);
87 YangInstanceIdentifier entityId = ENTITY_ID1;
88 Entity entity = new Entity(ENTITY_TYPE, entityId);
89 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
91 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
92 kit.expectMsgClass(SuccessReply.class);
94 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
98 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
99 ShardTestKit kit = new ShardTestKit(getSystem());
101 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
103 String peerId = actorFactory.generateActorId("follower");
104 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
105 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
107 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
108 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
109 withDispatcher(Dispatchers.DefaultDispatcherId()));
111 YangInstanceIdentifier entityId = ENTITY_ID1;
112 Entity entity = new Entity(ENTITY_TYPE, entityId);
113 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
115 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
116 kit.expectMsgClass(SuccessReply.class);
118 // Now grant the vote so the shard becomes the leader. This should retry the commit.
119 peer.underlyingActor().grantVote = true;
121 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
125 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
126 ShardTestKit kit = new ShardTestKit(getSystem());
128 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
129 shardTransactionCommitTimeoutInSeconds(1);
131 String peerId = actorFactory.generateActorId("follower");
132 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
133 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
135 MockFollower follower = peer.underlyingActor();
136 follower.grantVote = true;
138 // Drop AppendEntries so consensus isn't reached.
139 follower.dropAppendEntries = true;
141 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
142 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
143 withDispatcher(Dispatchers.DefaultDispatcherId()));
145 kit.waitUntilLeader(shard);
147 YangInstanceIdentifier entityId = ENTITY_ID1;
148 Entity entity = new Entity(ENTITY_TYPE, entityId);
149 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
151 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
152 kit.expectMsgClass(SuccessReply.class);
154 // Wait enough time for the commit to timeout.
155 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
157 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
158 // write being applied to the state.
159 follower.dropAppendEntries = false;
160 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
164 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
165 ShardTestKit kit = new ShardTestKit(getSystem());
167 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
168 shardIsolatedLeaderCheckIntervalInMillis(50);
170 String peerId = actorFactory.generateActorId("follower");
171 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
172 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
174 MockFollower follower = peer.underlyingActor();
175 follower.grantVote = true;
177 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
178 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
179 withDispatcher(Dispatchers.DefaultDispatcherId()));
181 kit.waitUntilLeader(shard);
183 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
184 follower.dropAppendEntries = true;
185 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
187 YangInstanceIdentifier entityId = ENTITY_ID1;
188 Entity entity = new Entity(ENTITY_TYPE, entityId);
189 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
191 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
192 kit.expectMsgClass(SuccessReply.class);
194 // Resume AppendEntries - the candidate write should now be committed.
195 follower.dropAppendEntries = false;
196 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
201 ShardTestKit kit = new ShardTestKit(getSystem());
203 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
205 String peerId = actorFactory.generateActorId("leader");
206 TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
207 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
209 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
210 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
211 withDispatcher(Dispatchers.DefaultDispatcherId()));
213 shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
214 DataStoreVersions.CURRENT_VERSION), peer);
216 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
218 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
219 kit.expectMsgClass(SuccessReply.class);
221 MockLeader leader = peer.underlyingActor();
222 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
223 leader.modificationsReceived, 5, TimeUnit.SECONDS));
224 verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
226 leader.modificationsReceived = new CountDownLatch(2);
227 leader.sendReply = false;
229 shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
231 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
232 kit.expectMsgClass(SuccessReply.class);
234 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
235 leader.modificationsReceived, 5, TimeUnit.SECONDS));
236 verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
239 private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
240 YangInstanceIdentifier entityId, String candidateName) throws Exception {
241 verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
244 private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
245 YangInstanceIdentifier entityId, String candidateName) throws Exception {
246 assertEquals("BatchedModifications size", 1, mods.getModifications().size());
247 assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
248 verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
249 entityId, candidateName);
252 private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
253 Stopwatch sw = Stopwatch.createStarted();
254 while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
255 NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
260 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
266 private Props newShardProps() {
267 return newShardProps(Collections.<String,String>emptyMap());
270 private Props newShardProps(Map<String,String> peers) {
271 return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
275 public static class MockFollower extends UntypedActor {
276 volatile boolean grantVote;
277 volatile boolean dropAppendEntries;
278 private final String myId;
280 public MockFollower(String myId) {
285 public void onReceive(Object message) {
286 if(message instanceof RequestVote) {
288 getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
290 } else if(message instanceof AppendEntries) {
291 if(!dropAppendEntries) {
292 AppendEntries req = (AppendEntries) message;
293 long lastIndex = req.getLeaderCommit();
294 if (req.getEntries().size() > 0) {
295 for(ReplicatedLogEntry entry : req.getEntries()) {
296 lastIndex = entry.getIndex();
300 getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
301 DataStoreVersions.CURRENT_VERSION), getSelf());
307 public static class MockLeader extends UntypedActor {
308 volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
309 volatile BatchedModifications receivedModifications;
310 volatile boolean sendReply = true;
313 public void onReceive(Object message) {
314 if(message instanceof BatchedModifications) {
315 receivedModifications = (BatchedModifications) message;
316 modificationsReceived.countDown();
318 getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());