e4aaaa188851b4db4e19d69f822f56857497a944
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.TestActorRef;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.Collections;
22 import java.util.Map;
23 import java.util.concurrent.CountDownLatch;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.atomic.AtomicInteger;
26 import org.junit.After;
27 import org.junit.Test;
28 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
29 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
31 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
32 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
33 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
34 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
35 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
36 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
37 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
38 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
39 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
40 import org.opendaylight.controller.cluster.raft.TestActorFactory;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
42 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
44 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
45 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
46 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
47 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
48 import org.opendaylight.yangtools.yang.common.QName;
49 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
50 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
51 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
52
53 /**
54  * Unit tests for EntityOwnershipShard.
55  *
56  * @author Thomas Pantelis
57  */
58 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
59     private static final String ENTITY_TYPE = "test type";
60     private static final YangInstanceIdentifier ENTITY_ID1 =
61             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
62     private static final YangInstanceIdentifier ENTITY_ID2 =
63             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
64     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
65     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
66     private static final String LOCAL_MEMBER_NAME = "member-1";
67
68     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
69             .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
70
71     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
72     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
73
74     @After
75     public void tearDown() {
76         actorFactory.close();
77     }
78
79     @Test
80     public void testOnRegisterCandidateLocal() throws Exception {
81         ShardTestKit kit = new ShardTestKit(getSystem());
82
83         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
84
85         kit.waitUntilLeader(shard);
86
87         YangInstanceIdentifier entityId = ENTITY_ID1;
88         Entity entity = new Entity(ENTITY_TYPE, entityId);
89         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
90
91         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
92         kit.expectMsgClass(SuccessReply.class);
93
94         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
95     }
96
97     @Test
98     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
99         ShardTestKit kit = new ShardTestKit(getSystem());
100
101         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
102
103         String peerId = actorFactory.generateActorId("follower");
104         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
105                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
106
107         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
108                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
109                 withDispatcher(Dispatchers.DefaultDispatcherId()));
110
111         YangInstanceIdentifier entityId = ENTITY_ID1;
112         Entity entity = new Entity(ENTITY_TYPE, entityId);
113         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
114
115         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
116         kit.expectMsgClass(SuccessReply.class);
117
118         // Now grant the vote so the shard becomes the leader. This should retry the commit.
119         peer.underlyingActor().grantVote = true;
120
121         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
122     }
123
124     @Test
125     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
126         ShardTestKit kit = new ShardTestKit(getSystem());
127
128         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
129                 shardTransactionCommitTimeoutInSeconds(1);
130
131         String peerId = actorFactory.generateActorId("follower");
132         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
133                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
134
135         MockFollower follower = peer.underlyingActor();
136         follower.grantVote = true;
137
138         // Drop AppendEntries so consensus isn't reached.
139         follower.dropAppendEntries = true;
140
141         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
142                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
143                 withDispatcher(Dispatchers.DefaultDispatcherId()));
144
145         kit.waitUntilLeader(shard);
146
147         YangInstanceIdentifier entityId = ENTITY_ID1;
148         Entity entity = new Entity(ENTITY_TYPE, entityId);
149         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
150
151         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
152         kit.expectMsgClass(SuccessReply.class);
153
154         // Wait enough time for the commit to timeout.
155         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
156
157         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
158         // write being applied to the state.
159         follower.dropAppendEntries = false;
160         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
161     }
162
163     @Test
164     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
165         ShardTestKit kit = new ShardTestKit(getSystem());
166
167         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
168                 shardIsolatedLeaderCheckIntervalInMillis(50);
169
170         String peerId = actorFactory.generateActorId("follower");
171         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
172                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
173
174         MockFollower follower = peer.underlyingActor();
175         follower.grantVote = true;
176
177         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
178                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
179                 withDispatcher(Dispatchers.DefaultDispatcherId()));
180
181         kit.waitUntilLeader(shard);
182
183         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
184         follower.dropAppendEntries = true;
185         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
186
187         YangInstanceIdentifier entityId = ENTITY_ID1;
188         Entity entity = new Entity(ENTITY_TYPE, entityId);
189         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
190
191         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
192         kit.expectMsgClass(SuccessReply.class);
193
194         // Resume AppendEntries - the candidate write should now be committed.
195         follower.dropAppendEntries = false;
196         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
197     }
198
199     @Test
200     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
201         ShardTestKit kit = new ShardTestKit(getSystem());
202
203         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
204
205         String peerId = actorFactory.generateActorId("leader");
206         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
207                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
208
209         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
210                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
211                 withDispatcher(Dispatchers.DefaultDispatcherId()));
212
213         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
214                 DataStoreVersions.CURRENT_VERSION), peer);
215
216         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
217
218         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
219         kit.expectMsgClass(SuccessReply.class);
220
221         MockLeader leader = peer.underlyingActor();
222         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
223                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
224         verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
225
226         leader.modificationsReceived = new CountDownLatch(2);
227         leader.sendReply = false;
228
229         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
230
231         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
232         kit.expectMsgClass(SuccessReply.class);
233
234         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
235                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
236         verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
237     }
238
239     private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
240             YangInstanceIdentifier entityId, String candidateName) throws Exception {
241         verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
242     }
243
244     private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
245             YangInstanceIdentifier entityId, String candidateName) throws Exception {
246         assertEquals("BatchedModifications size", 1, mods.getModifications().size());
247         assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
248         verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
249                 entityId, candidateName);
250     }
251
252     private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
253         Stopwatch sw = Stopwatch.createStarted();
254         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
255             NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
256             if(node != null) {
257                 return node;
258             }
259
260             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
261         }
262
263         return null;
264     }
265
266     private Props newShardProps() {
267         return newShardProps(Collections.<String,String>emptyMap());
268     }
269
270     private Props newShardProps(Map<String,String> peers) {
271         return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
272                 LOCAL_MEMBER_NAME);
273     }
274
275     public static class MockFollower extends UntypedActor {
276         volatile boolean grantVote;
277         volatile boolean dropAppendEntries;
278         private final String myId;
279
280         public MockFollower(String myId) {
281             this.myId = myId;
282         }
283
284         @Override
285         public void onReceive(Object message) {
286             if(message instanceof RequestVote) {
287                 if(grantVote) {
288                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
289                 }
290             } else if(message instanceof AppendEntries) {
291                 if(!dropAppendEntries) {
292                     AppendEntries req = (AppendEntries) message;
293                     long lastIndex = req.getLeaderCommit();
294                     if (req.getEntries().size() > 0) {
295                         for(ReplicatedLogEntry entry : req.getEntries()) {
296                             lastIndex = entry.getIndex();
297                         }
298                     }
299
300                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
301                             DataStoreVersions.CURRENT_VERSION), getSelf());
302                 }
303             }
304         }
305     }
306
307     public static class MockLeader extends UntypedActor {
308         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
309         volatile BatchedModifications receivedModifications;
310         volatile boolean sendReply = true;
311
312         @Override
313         public void onReceive(Object message) {
314             if(message instanceof BatchedModifications) {
315                 receivedModifications = (BatchedModifications) message;
316                 modificationsReceived.countDown();
317                 if(sendReply) {
318                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
319                 } else {
320                     sendReply = true;
321                 }
322             }
323         }
324     }
325 }