2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.timeout;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.TestActorRef;
20 import com.google.common.base.Function;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import org.junit.After;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
34 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
37 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
38 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
39 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
40 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
41 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.Modification;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
46 import org.opendaylight.controller.cluster.raft.TestActorFactory;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
49 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
50 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
51 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
52 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
53 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
54 import org.opendaylight.yangtools.yang.common.QName;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
60 * Unit tests for EntityOwnershipShard.
62 * @author Thomas Pantelis
64 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
65 private static final String ENTITY_TYPE = "test type";
66 private static final YangInstanceIdentifier ENTITY_ID1 =
67 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
68 private static final YangInstanceIdentifier ENTITY_ID2 =
69 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
70 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
71 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
72 private static final String LOCAL_MEMBER_NAME = "member-1";
74 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
75 .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
77 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
78 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
81 public void tearDown() {
86 public void testOnRegisterCandidateLocal() throws Exception {
87 ShardTestKit kit = new ShardTestKit(getSystem());
89 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
91 kit.waitUntilLeader(shard);
93 YangInstanceIdentifier entityId = ENTITY_ID1;
94 Entity entity = new Entity(ENTITY_TYPE, entityId);
95 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
97 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
98 kit.expectMsgClass(SuccessReply.class);
100 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
102 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
104 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
108 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
109 ShardTestKit kit = new ShardTestKit(getSystem());
111 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
113 String peerId = actorFactory.generateActorId("follower");
114 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
115 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
117 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
118 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
119 withDispatcher(Dispatchers.DefaultDispatcherId()));
121 YangInstanceIdentifier entityId = ENTITY_ID1;
122 Entity entity = new Entity(ENTITY_TYPE, entityId);
123 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
125 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
126 kit.expectMsgClass(SuccessReply.class);
128 // Now grant the vote so the shard becomes the leader. This should retry the commit.
129 peer.underlyingActor().grantVote = true;
131 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
133 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
135 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
139 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
140 ShardTestKit kit = new ShardTestKit(getSystem());
142 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
143 shardTransactionCommitTimeoutInSeconds(1);
145 String peerId = actorFactory.generateActorId("follower");
146 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
147 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
149 MockFollower follower = peer.underlyingActor();
150 follower.grantVote = true;
152 // Drop AppendEntries so consensus isn't reached.
153 follower.dropAppendEntries = true;
155 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
156 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
157 withDispatcher(Dispatchers.DefaultDispatcherId()));
159 kit.waitUntilLeader(shard);
161 YangInstanceIdentifier entityId = ENTITY_ID1;
162 Entity entity = new Entity(ENTITY_TYPE, entityId);
163 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
165 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
166 kit.expectMsgClass(SuccessReply.class);
168 // Wait enough time for the commit to timeout.
169 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
171 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
172 // write being applied to the state.
173 follower.dropAppendEntries = false;
175 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
177 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
179 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
183 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
184 ShardTestKit kit = new ShardTestKit(getSystem());
186 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
187 shardIsolatedLeaderCheckIntervalInMillis(50);
189 String peerId = actorFactory.generateActorId("follower");
190 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
191 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
193 MockFollower follower = peer.underlyingActor();
194 follower.grantVote = true;
196 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
197 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
198 withDispatcher(Dispatchers.DefaultDispatcherId()));
200 kit.waitUntilLeader(shard);
202 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
203 follower.dropAppendEntries = true;
204 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
206 YangInstanceIdentifier entityId = ENTITY_ID1;
207 Entity entity = new Entity(ENTITY_TYPE, entityId);
208 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
210 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
211 kit.expectMsgClass(SuccessReply.class);
213 // Resume AppendEntries - the candidate write should now be committed.
214 follower.dropAppendEntries = false;
215 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
217 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
219 verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
223 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
224 ShardTestKit kit = new ShardTestKit(getSystem());
226 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
227 shardBatchedModificationCount(5);
229 String peerId = actorFactory.generateActorId("leader");
230 TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
231 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
233 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
234 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
235 withDispatcher(Dispatchers.DefaultDispatcherId()));
237 shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
238 DataStoreVersions.CURRENT_VERSION), peer);
240 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
242 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
243 kit.expectMsgClass(SuccessReply.class);
245 MockLeader leader = peer.underlyingActor();
246 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
247 leader.modificationsReceived, 5, TimeUnit.SECONDS));
248 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
251 shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
253 // Test with initial commit timeout and subsequent retry.
255 leader.modificationsReceived = new CountDownLatch(1);
256 leader.sendReply = false;
258 shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
260 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
261 kit.expectMsgClass(SuccessReply.class);
263 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
264 leader.modificationsReceived, 5, TimeUnit.SECONDS));
265 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
268 // Send a bunch of registration messages quickly and verify.
272 leader.modificationsReceived = new CountDownLatch(max);
273 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
274 for(int i = 1; i <= max; i++) {
275 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
277 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
280 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
281 leader.modificationsReceived, 10, TimeUnit.SECONDS));
283 // Sleep a little to ensure no additional BatchedModifications are received.
285 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
287 List<Modification> receivedMods = leader.getAndClearReceivedModifications();
288 for(int i = 0; i < max; i++) {
289 verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
292 assertEquals("# modifications received", max, receivedMods.size());
295 private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
296 YangInstanceIdentifier entityId, String candidateName) throws Exception {
297 verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
300 private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
301 YangInstanceIdentifier entityId, String candidateName) throws Exception {
302 assertEquals("BatchedModifications size", 1, mods.size());
303 verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
306 private void verifyBatchedEntityCandidate(Modification mod, String entityType,
307 YangInstanceIdentifier entityId, String candidateName) throws Exception {
308 assertEquals("Modification type", MergeModification.class, mod.getClass());
309 verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
310 entityId, candidateName);
313 private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
314 Stopwatch sw = Stopwatch.createStarted();
315 while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
316 NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
321 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
327 private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
328 String localMemberName) {
329 verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
331 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
333 return AbstractShardTest.readStore(shard, path);
334 } catch(Exception e) {
341 private Props newShardProps() {
342 return newShardProps(Collections.<String,String>emptyMap());
345 private Props newShardProps(Map<String,String> peers) {
346 return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
350 public static class MockFollower extends UntypedActor {
351 volatile boolean grantVote;
352 volatile boolean dropAppendEntries;
353 private final String myId;
355 public MockFollower(String myId) {
360 public void onReceive(Object message) {
361 if(message instanceof RequestVote) {
363 getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
365 } else if(message instanceof AppendEntries) {
366 if(!dropAppendEntries) {
367 AppendEntries req = (AppendEntries) message;
368 long lastIndex = req.getLeaderCommit();
369 if (req.getEntries().size() > 0) {
370 for(ReplicatedLogEntry entry : req.getEntries()) {
371 lastIndex = entry.getIndex();
375 getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
376 DataStoreVersions.CURRENT_VERSION), getSelf());
382 public static class MockLeader extends UntypedActor {
383 volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
384 List<Modification> receivedModifications = new ArrayList<>();
385 volatile boolean sendReply = true;
389 public void onReceive(Object message) {
390 if(message instanceof BatchedModifications) {
392 Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
396 BatchedModifications mods = (BatchedModifications) message;
397 synchronized (receivedModifications) {
398 for(int i = 0; i < mods.getModifications().size(); i++) {
399 receivedModifications.add(mods.getModifications().get(i));
400 modificationsReceived.countDown();
404 getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
411 List<Modification> getAndClearReceivedModifications() {
412 synchronized (receivedModifications) {
413 List<Modification> ret = new ArrayList<>(receivedModifications);
414 receivedModifications.clear();