2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.TestActorRef;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.junit.After;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
31 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
34 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
35 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
36 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
37 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
40 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.Modification;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
43 import org.opendaylight.controller.cluster.raft.TestActorFactory;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
49 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
50 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
57 * Unit tests for EntityOwnershipShard.
59 * @author Thomas Pantelis
61 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
62 private static final String ENTITY_TYPE = "test type";
63 private static final YangInstanceIdentifier ENTITY_ID1 =
64 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
65 private static final YangInstanceIdentifier ENTITY_ID2 =
66 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
67 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
68 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
69 private static final String LOCAL_MEMBER_NAME = "member-1";
71 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
72 .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
74 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
75 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
78 public void tearDown() {
83 public void testOnRegisterCandidateLocal() throws Exception {
84 ShardTestKit kit = new ShardTestKit(getSystem());
86 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
88 kit.waitUntilLeader(shard);
90 YangInstanceIdentifier entityId = ENTITY_ID1;
91 Entity entity = new Entity(ENTITY_TYPE, entityId);
92 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
94 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
95 kit.expectMsgClass(SuccessReply.class);
97 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
101 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
102 ShardTestKit kit = new ShardTestKit(getSystem());
104 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
106 String peerId = actorFactory.generateActorId("follower");
107 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
108 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
110 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
111 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
112 withDispatcher(Dispatchers.DefaultDispatcherId()));
114 YangInstanceIdentifier entityId = ENTITY_ID1;
115 Entity entity = new Entity(ENTITY_TYPE, entityId);
116 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
118 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
119 kit.expectMsgClass(SuccessReply.class);
121 // Now grant the vote so the shard becomes the leader. This should retry the commit.
122 peer.underlyingActor().grantVote = true;
124 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
128 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
129 ShardTestKit kit = new ShardTestKit(getSystem());
131 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
132 shardTransactionCommitTimeoutInSeconds(1);
134 String peerId = actorFactory.generateActorId("follower");
135 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
136 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
138 MockFollower follower = peer.underlyingActor();
139 follower.grantVote = true;
141 // Drop AppendEntries so consensus isn't reached.
142 follower.dropAppendEntries = true;
144 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
145 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
146 withDispatcher(Dispatchers.DefaultDispatcherId()));
148 kit.waitUntilLeader(shard);
150 YangInstanceIdentifier entityId = ENTITY_ID1;
151 Entity entity = new Entity(ENTITY_TYPE, entityId);
152 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
154 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
155 kit.expectMsgClass(SuccessReply.class);
157 // Wait enough time for the commit to timeout.
158 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
160 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
161 // write being applied to the state.
162 follower.dropAppendEntries = false;
163 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
167 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
168 ShardTestKit kit = new ShardTestKit(getSystem());
170 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
171 shardIsolatedLeaderCheckIntervalInMillis(50);
173 String peerId = actorFactory.generateActorId("follower");
174 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
175 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
177 MockFollower follower = peer.underlyingActor();
178 follower.grantVote = true;
180 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
181 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
182 withDispatcher(Dispatchers.DefaultDispatcherId()));
184 kit.waitUntilLeader(shard);
186 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
187 follower.dropAppendEntries = true;
188 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
190 YangInstanceIdentifier entityId = ENTITY_ID1;
191 Entity entity = new Entity(ENTITY_TYPE, entityId);
192 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
194 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
195 kit.expectMsgClass(SuccessReply.class);
197 // Resume AppendEntries - the candidate write should now be committed.
198 follower.dropAppendEntries = false;
199 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
203 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
204 ShardTestKit kit = new ShardTestKit(getSystem());
206 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
207 shardBatchedModificationCount(5);
209 String peerId = actorFactory.generateActorId("leader");
210 TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
211 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
213 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
214 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
215 withDispatcher(Dispatchers.DefaultDispatcherId()));
217 shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
218 DataStoreVersions.CURRENT_VERSION), peer);
220 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
222 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
223 kit.expectMsgClass(SuccessReply.class);
225 MockLeader leader = peer.underlyingActor();
226 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
227 leader.modificationsReceived, 5, TimeUnit.SECONDS));
228 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
231 shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
233 // Test with initial commit timeout and subsequent retry.
235 leader.modificationsReceived = new CountDownLatch(1);
236 leader.sendReply = false;
238 shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
240 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
241 kit.expectMsgClass(SuccessReply.class);
243 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
244 leader.modificationsReceived, 5, TimeUnit.SECONDS));
245 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
248 // Send a bunch of registration messages quickly and verify.
252 leader.modificationsReceived = new CountDownLatch(max);
253 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
254 for(int i = 1; i <= max; i++) {
255 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
257 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
260 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
261 leader.modificationsReceived, 10, TimeUnit.SECONDS));
263 // Sleep a little to ensure no additional BatchedModifications are received.
265 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
267 List<Modification> receivedMods = leader.getAndClearReceivedModifications();
268 for(int i = 0; i < max; i++) {
269 verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
272 assertEquals("# modifications received", max, receivedMods.size());
275 private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
276 YangInstanceIdentifier entityId, String candidateName) throws Exception {
277 verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
280 private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
281 YangInstanceIdentifier entityId, String candidateName) throws Exception {
282 assertEquals("BatchedModifications size", 1, mods.size());
283 verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
286 private void verifyBatchedEntityCandidate(Modification mod, String entityType,
287 YangInstanceIdentifier entityId, String candidateName) throws Exception {
288 assertEquals("Modification type", MergeModification.class, mod.getClass());
289 verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
290 entityId, candidateName);
293 private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
294 Stopwatch sw = Stopwatch.createStarted();
295 while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
296 NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
301 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
307 private Props newShardProps() {
308 return newShardProps(Collections.<String,String>emptyMap());
311 private Props newShardProps(Map<String,String> peers) {
312 return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
316 public static class MockFollower extends UntypedActor {
317 volatile boolean grantVote;
318 volatile boolean dropAppendEntries;
319 private final String myId;
321 public MockFollower(String myId) {
326 public void onReceive(Object message) {
327 if(message instanceof RequestVote) {
329 getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
331 } else if(message instanceof AppendEntries) {
332 if(!dropAppendEntries) {
333 AppendEntries req = (AppendEntries) message;
334 long lastIndex = req.getLeaderCommit();
335 if (req.getEntries().size() > 0) {
336 for(ReplicatedLogEntry entry : req.getEntries()) {
337 lastIndex = entry.getIndex();
341 getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
342 DataStoreVersions.CURRENT_VERSION), getSelf());
348 public static class MockLeader extends UntypedActor {
349 volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
350 List<Modification> receivedModifications = new ArrayList<>();
351 volatile boolean sendReply = true;
355 public void onReceive(Object message) {
356 if(message instanceof BatchedModifications) {
358 Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
362 BatchedModifications mods = (BatchedModifications) message;
363 synchronized (receivedModifications) {
364 for(int i = 0; i < mods.getModifications().size(); i++) {
365 receivedModifications.add(mods.getModifications().get(i));
366 modificationsReceived.countDown();
370 getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
377 List<Modification> getAndClearReceivedModifications() {
378 synchronized (receivedModifications) {
379 List<Modification> ret = new ArrayList<>(receivedModifications);
380 receivedModifications.clear();