2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.entityownership;
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.dispatch.Dispatchers;
16 import akka.testkit.TestActorRef;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.util.Collections;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.junit.After;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
28 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
29 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
31 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
32 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
37 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
38 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
39 import org.opendaylight.controller.cluster.raft.TestActorFactory;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
45 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
46 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
47 import org.opendaylight.yangtools.yang.common.QName;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
53 * Unit tests for EntityOwnershipShard.
55 * @author Thomas Pantelis
57 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
58 private static final String ENTITY_TYPE = "test type";
59 private static final YangInstanceIdentifier ENTITY_ID1 =
60 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
61 private static final YangInstanceIdentifier ENTITY_ID2 =
62 YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
63 private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
64 private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
65 private static final String LOCAL_MEMBER_NAME = "member-1";
67 private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
68 .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
70 private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
71 private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
74 public void tearDown() {
79 public void testOnRegisterCandidateLocal() throws Exception {
80 ShardTestKit kit = new ShardTestKit(getSystem());
82 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
84 kit.waitUntilLeader(shard);
86 YangInstanceIdentifier entityId = ENTITY_ID1;
87 Entity entity = new Entity(ENTITY_TYPE, entityId);
88 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
90 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
91 kit.expectMsgClass(SuccessReply.class);
93 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
97 public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
98 ShardTestKit kit = new ShardTestKit(getSystem());
100 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
102 String peerId = actorFactory.generateActorId("follower");
103 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
104 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
106 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
107 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
108 withDispatcher(Dispatchers.DefaultDispatcherId()));
110 YangInstanceIdentifier entityId = ENTITY_ID1;
111 Entity entity = new Entity(ENTITY_TYPE, entityId);
112 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
114 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
115 kit.expectMsgClass(SuccessReply.class);
117 // Now grant the vote so the shard becomes the leader. This should retry the commit.
118 peer.underlyingActor().grantVote = true;
120 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
124 public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
125 ShardTestKit kit = new ShardTestKit(getSystem());
127 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
128 shardTransactionCommitTimeoutInSeconds(1);
130 String peerId = actorFactory.generateActorId("follower");
131 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
132 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
134 MockFollower follower = peer.underlyingActor();
135 follower.grantVote = true;
137 // Drop AppendEntries so consensus isn't reached.
138 follower.dropAppendEntries = true;
140 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
141 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
142 withDispatcher(Dispatchers.DefaultDispatcherId()));
144 kit.waitUntilLeader(shard);
146 YangInstanceIdentifier entityId = ENTITY_ID1;
147 Entity entity = new Entity(ENTITY_TYPE, entityId);
148 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
150 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
151 kit.expectMsgClass(SuccessReply.class);
153 // Wait enough time for the commit to timeout.
154 Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
156 // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
157 // write being applied to the state.
158 follower.dropAppendEntries = false;
159 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
163 public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
164 ShardTestKit kit = new ShardTestKit(getSystem());
166 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
167 shardIsolatedLeaderCheckIntervalInMillis(50);
169 String peerId = actorFactory.generateActorId("follower");
170 TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
171 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
173 MockFollower follower = peer.underlyingActor();
174 follower.grantVote = true;
176 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
177 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
178 withDispatcher(Dispatchers.DefaultDispatcherId()));
180 kit.waitUntilLeader(shard);
182 // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
183 follower.dropAppendEntries = true;
184 Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
186 YangInstanceIdentifier entityId = ENTITY_ID1;
187 Entity entity = new Entity(ENTITY_TYPE, entityId);
188 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
190 shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
191 kit.expectMsgClass(SuccessReply.class);
193 // Resume AppendEntries - the candidate write should now be committed.
194 follower.dropAppendEntries = false;
195 verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
199 public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
200 ShardTestKit kit = new ShardTestKit(getSystem());
202 dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
204 String peerId = actorFactory.generateActorId("leader");
205 TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
206 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
208 TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
209 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
210 withDispatcher(Dispatchers.DefaultDispatcherId()));
212 shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
213 DataStoreVersions.CURRENT_VERSION), peer);
215 EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
217 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
218 kit.expectMsgClass(SuccessReply.class);
220 MockLeader leader = peer.underlyingActor();
221 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
222 leader.modificationsReceived, 5, TimeUnit.SECONDS));
223 verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
225 leader.modificationsReceived = new CountDownLatch(2);
226 leader.sendReply = false;
228 shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
230 shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
231 kit.expectMsgClass(SuccessReply.class);
233 assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
234 leader.modificationsReceived, 5, TimeUnit.SECONDS));
235 verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
238 private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
239 YangInstanceIdentifier entityId, String candidateName) throws Exception {
240 verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
243 private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
244 YangInstanceIdentifier entityId, String candidateName) throws Exception {
245 assertEquals("BatchedModifications size", 1, mods.getModifications().size());
246 assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
247 verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
248 entityId, candidateName);
251 private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
252 Stopwatch sw = Stopwatch.createStarted();
253 while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
254 NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, EntityOwnershipShard.ENTITY_OWNERS_PATH);
259 Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
265 private Props newShardProps() {
266 return newShardProps(Collections.<String,String>emptyMap());
269 private Props newShardProps(Map<String,String> peers) {
270 return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
274 public static class MockFollower extends UntypedActor {
275 volatile boolean grantVote;
276 volatile boolean dropAppendEntries;
277 private final String myId;
279 public MockFollower(String myId) {
284 public void onReceive(Object message) {
285 if(message instanceof RequestVote) {
287 getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
289 } else if(message instanceof AppendEntries) {
290 if(!dropAppendEntries) {
291 AppendEntries req = (AppendEntries) message;
292 long lastIndex = req.getLeaderCommit();
293 if (req.getEntries().size() > 0) {
294 for(ReplicatedLogEntry entry : req.getEntries()) {
295 lastIndex = entry.getIndex();
299 getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
300 DataStoreVersions.CURRENT_VERSION), getSelf());
306 public static class MockLeader extends UntypedActor {
307 volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
308 volatile BatchedModifications receivedModifications;
309 volatile boolean sendReply = true;
312 public void onReceive(Object message) {
313 if(message instanceof BatchedModifications) {
314 receivedModifications = (BatchedModifications) message;
315 modificationsReceived.countDown();
317 getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());