2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.TestActorRef;
18 import com.google.common.base.Function;
19 import com.google.common.base.Stopwatch;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.junit.After;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
32 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
35 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
36 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
38 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.Modification;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.TestActorFactory;
45 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
48 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
50 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
51 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
52 import org.opendaylight.yangtools.yang.common.QName;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58 * Unit tests for EntityOwnershipShard.
60 * @author Thomas Pantelis
62 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
63 private static final String ENTITY_TYPE = "test type";
64 private static final YangInstanceIdentifier ENTITY_ID1 =
65 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
66 private static final YangInstanceIdentifier ENTITY_ID2 =
67 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
68 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
69 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
70 private static final String LOCAL_MEMBER_NAME = "member-1";
72 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
73 .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
75 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
76 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
79 public void tearDown() {
84 public void testOnRegisterCandidateLocal() throws Exception {
85 ShardTestKit kit = new ShardTestKit(getSystem());
87 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
89 kit.waitUntilLeader(shard);
91 YangInstanceIdentifier entityId = ENTITY_ID1;
92 Entity entity = new Entity(ENTITY_TYPE, entityId);
93 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
95 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
96 kit.expectMsgClass(SuccessReply.class);
98 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
100 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
104 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
105 ShardTestKit kit = new ShardTestKit(getSystem());
107 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
109 String peerId = actorFactory.generateActorId("follower");
110 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
111 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
113 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
114 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
115 withDispatcher(Dispatchers.DefaultDispatcherId()));
117 YangInstanceIdentifier entityId = ENTITY_ID1;
118 Entity entity = new Entity(ENTITY_TYPE, entityId);
119 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
121 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
122 kit.expectMsgClass(SuccessReply.class);
124 // Now grant the vote so the shard becomes the leader. This should retry the commit.
125 peer.underlyingActor().grantVote = true;
127 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
129 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
133 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
134 ShardTestKit kit = new ShardTestKit(getSystem());
136 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
137 shardTransactionCommitTimeoutInSeconds(1);
139 String peerId = actorFactory.generateActorId("follower");
140 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
141 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
143 MockFollower follower = peer.underlyingActor();
144 follower.grantVote = true;
146 // Drop AppendEntries so consensus isn't reached.
147 follower.dropAppendEntries = true;
149 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
150 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
151 withDispatcher(Dispatchers.DefaultDispatcherId()));
153 kit.waitUntilLeader(shard);
155 YangInstanceIdentifier entityId = ENTITY_ID1;
156 Entity entity = new Entity(ENTITY_TYPE, entityId);
157 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
159 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
160 kit.expectMsgClass(SuccessReply.class);
162 // Wait enough time for the commit to timeout.
163 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
165 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
166 // write being applied to the state.
167 follower.dropAppendEntries = false;
169 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
171 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
175 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
176 ShardTestKit kit = new ShardTestKit(getSystem());
178 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
179 shardIsolatedLeaderCheckIntervalInMillis(50);
181 String peerId = actorFactory.generateActorId("follower");
182 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
183 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
185 MockFollower follower = peer.underlyingActor();
186 follower.grantVote = true;
188 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
189 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
190 withDispatcher(Dispatchers.DefaultDispatcherId()));
192 kit.waitUntilLeader(shard);
194 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
195 follower.dropAppendEntries = true;
196 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
198 YangInstanceIdentifier entityId = ENTITY_ID1;
199 Entity entity = new Entity(ENTITY_TYPE, entityId);
200 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
202 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
203 kit.expectMsgClass(SuccessReply.class);
205 // Resume AppendEntries - the candidate write should now be committed.
206 follower.dropAppendEntries = false;
207 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
209 verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
213 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
214 ShardTestKit kit = new ShardTestKit(getSystem());
216 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
217 shardBatchedModificationCount(5);
219 String peerId = actorFactory.generateActorId("leader");
220 TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
221 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
223 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
224 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
225 withDispatcher(Dispatchers.DefaultDispatcherId()));
227 shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
228 DataStoreVersions.CURRENT_VERSION), peer);
230 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
232 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
233 kit.expectMsgClass(SuccessReply.class);
235 MockLeader leader = peer.underlyingActor();
236 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
237 leader.modificationsReceived, 5, TimeUnit.SECONDS));
238 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
241 shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
243 // Test with initial commit timeout and subsequent retry.
245 leader.modificationsReceived = new CountDownLatch(1);
246 leader.sendReply = false;
248 shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
250 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
251 kit.expectMsgClass(SuccessReply.class);
253 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
254 leader.modificationsReceived, 5, TimeUnit.SECONDS));
255 verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
258 // Send a bunch of registration messages quickly and verify.
262 leader.modificationsReceived = new CountDownLatch(max);
263 List<YangInstanceIdentifier> entityIds = new ArrayList<>();
264 for(int i = 1; i <= max; i++) {
265 YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
267 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
270 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
271 leader.modificationsReceived, 10, TimeUnit.SECONDS));
273 // Sleep a little to ensure no additional BatchedModifications are received.
275 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
277 List<Modification> receivedMods = leader.getAndClearReceivedModifications();
278 for(int i = 0; i < max; i++) {
279 verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
282 assertEquals("# modifications received", max, receivedMods.size());
285 private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
286 YangInstanceIdentifier entityId, String candidateName) throws Exception {
287 verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
290 private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
291 YangInstanceIdentifier entityId, String candidateName) throws Exception {
292 assertEquals("BatchedModifications size", 1, mods.size());
293 verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
296 private void verifyBatchedEntityCandidate(Modification mod, String entityType,
297 YangInstanceIdentifier entityId, String candidateName) throws Exception {
298 assertEquals("Modification type", MergeModification.class, mod.getClass());
299 verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
300 entityId, candidateName);
303 private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
304 Stopwatch sw = Stopwatch.createStarted();
305 while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
306 NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
311 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
317 private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
318 String localMemberName) {
319 verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
321 public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
323 return AbstractShardTest.readStore(shard, path);
324 } catch(Exception e) {
331 private Props newShardProps() {
332 return newShardProps(Collections.<String,String>emptyMap());
335 private Props newShardProps(Map<String,String> peers) {
336 return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
340 public static class MockFollower extends UntypedActor {
341 volatile boolean grantVote;
342 volatile boolean dropAppendEntries;
343 private final String myId;
345 public MockFollower(String myId) {
350 public void onReceive(Object message) {
351 if(message instanceof RequestVote) {
353 getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
355 } else if(message instanceof AppendEntries) {
356 if(!dropAppendEntries) {
357 AppendEntries req = (AppendEntries) message;
358 long lastIndex = req.getLeaderCommit();
359 if (req.getEntries().size() > 0) {
360 for(ReplicatedLogEntry entry : req.getEntries()) {
361 lastIndex = entry.getIndex();
365 getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
366 DataStoreVersions.CURRENT_VERSION), getSelf());
372 public static class MockLeader extends UntypedActor {
373 volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
374 List<Modification> receivedModifications = new ArrayList<>();
375 volatile boolean sendReply = true;
379 public void onReceive(Object message) {
380 if(message instanceof BatchedModifications) {
382 Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
386 BatchedModifications mods = (BatchedModifications) message;
387 synchronized (receivedModifications) {
388 for(int i = 0; i < mods.getModifications().size(); i++) {
389 receivedModifications.add(mods.getModifications().get(i));
390 modificationsReceived.countDown();
394 getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
401 List<Modification> getAndClearReceivedModifications() {
402 synchronized (receivedModifications) {
403 List<Modification> ret = new ArrayList<>(receivedModifications);
404 receivedModifications.clear();