49953ac143e7c31f10ab7538895f58aced528602
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.mockito.Mockito.timeout;
13 import static org.mockito.Mockito.verify;
14 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
15 import akka.actor.ActorRef;
16 import akka.actor.Props;
17 import akka.actor.UntypedActor;
18 import akka.dispatch.Dispatchers;
19 import akka.testkit.TestActorRef;
20 import com.google.common.base.Function;
21 import com.google.common.base.Stopwatch;
22 import com.google.common.collect.ImmutableMap;
23 import com.google.common.util.concurrent.Uninterruptibles;
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicInteger;
31 import org.junit.After;
32 import org.junit.Test;
33 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
34 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
35 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
36 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
37 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
38 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
39 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
40 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
41 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.Modification;
45 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
46 import org.opendaylight.controller.cluster.raft.TestActorFactory;
47 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
48 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
49 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
50 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
51 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
52 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
53 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
54 import org.opendaylight.yangtools.yang.common.QName;
55 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
56 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
57 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
58
59 /**
60  * Unit tests for EntityOwnershipShard.
61  *
62  * @author Thomas Pantelis
63  */
64 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
65     private static final String ENTITY_TYPE = "test type";
66     private static final YangInstanceIdentifier ENTITY_ID1 =
67             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
68     private static final YangInstanceIdentifier ENTITY_ID2 =
69             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
70     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
71     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
72     private static final String LOCAL_MEMBER_NAME = "member-1";
73
74     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
75             .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
76
77     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
78     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
79
80     @After
81     public void tearDown() {
82         actorFactory.close();
83     }
84
85     @Test
86     public void testOnRegisterCandidateLocal() throws Exception {
87         ShardTestKit kit = new ShardTestKit(getSystem());
88
89         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
90
91         kit.waitUntilLeader(shard);
92
93         YangInstanceIdentifier entityId = ENTITY_ID1;
94         Entity entity = new Entity(ENTITY_TYPE, entityId);
95         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
96
97         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
98         kit.expectMsgClass(SuccessReply.class);
99
100         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
101
102         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
103
104         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
105     }
106
107     @Test
108     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
109         ShardTestKit kit = new ShardTestKit(getSystem());
110
111         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
112
113         String peerId = actorFactory.generateActorId("follower");
114         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
115                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
116
117         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
118                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
119                 withDispatcher(Dispatchers.DefaultDispatcherId()));
120
121         YangInstanceIdentifier entityId = ENTITY_ID1;
122         Entity entity = new Entity(ENTITY_TYPE, entityId);
123         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
124
125         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
126         kit.expectMsgClass(SuccessReply.class);
127
128         // Now grant the vote so the shard becomes the leader. This should retry the commit.
129         peer.underlyingActor().grantVote = true;
130
131         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
132
133         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
134
135         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
136     }
137
138     @Test
139     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
140         ShardTestKit kit = new ShardTestKit(getSystem());
141
142         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
143                 shardTransactionCommitTimeoutInSeconds(1);
144
145         String peerId = actorFactory.generateActorId("follower");
146         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
147                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
148
149         MockFollower follower = peer.underlyingActor();
150         follower.grantVote = true;
151
152         // Drop AppendEntries so consensus isn't reached.
153         follower.dropAppendEntries = true;
154
155         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
156                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
157                 withDispatcher(Dispatchers.DefaultDispatcherId()));
158
159         kit.waitUntilLeader(shard);
160
161         YangInstanceIdentifier entityId = ENTITY_ID1;
162         Entity entity = new Entity(ENTITY_TYPE, entityId);
163         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
164
165         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
166         kit.expectMsgClass(SuccessReply.class);
167
168         // Wait enough time for the commit to timeout.
169         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
170
171         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
172         // write being applied to the state.
173         follower.dropAppendEntries = false;
174
175         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
176
177         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
178
179         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
180     }
181
182     @Test
183     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
184         ShardTestKit kit = new ShardTestKit(getSystem());
185
186         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
187                 shardIsolatedLeaderCheckIntervalInMillis(50);
188
189         String peerId = actorFactory.generateActorId("follower");
190         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
191                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
192
193         MockFollower follower = peer.underlyingActor();
194         follower.grantVote = true;
195
196         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
197                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
198                 withDispatcher(Dispatchers.DefaultDispatcherId()));
199
200         kit.waitUntilLeader(shard);
201
202         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
203         follower.dropAppendEntries = true;
204         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
205
206         YangInstanceIdentifier entityId = ENTITY_ID1;
207         Entity entity = new Entity(ENTITY_TYPE, entityId);
208         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
209
210         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
211         kit.expectMsgClass(SuccessReply.class);
212
213         // Resume AppendEntries - the candidate write should now be committed.
214         follower.dropAppendEntries = false;
215         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
216
217         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
218
219         verify(candidate, timeout(5000)).ownershipChanged(entity, false, true);
220     }
221
222     @Test
223     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
224         ShardTestKit kit = new ShardTestKit(getSystem());
225
226         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
227                 shardBatchedModificationCount(5);
228
229         String peerId = actorFactory.generateActorId("leader");
230         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
231                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
232
233         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
234                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
235                 withDispatcher(Dispatchers.DefaultDispatcherId()));
236
237         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
238                 DataStoreVersions.CURRENT_VERSION), peer);
239
240         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
241
242         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
243         kit.expectMsgClass(SuccessReply.class);
244
245         MockLeader leader = peer.underlyingActor();
246         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
247                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
248         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
249                 LOCAL_MEMBER_NAME);
250
251         shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
252
253         // Test with initial commit timeout and subsequent retry.
254
255         leader.modificationsReceived = new CountDownLatch(1);
256         leader.sendReply = false;
257
258         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
259
260         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
261         kit.expectMsgClass(SuccessReply.class);
262
263         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
264                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
265         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
266                 LOCAL_MEMBER_NAME);
267
268         // Send a bunch of registration messages quickly and verify.
269
270         int max = 100;
271         leader.delay = 4;
272         leader.modificationsReceived = new CountDownLatch(max);
273         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
274         for(int i = 1; i <= max; i++) {
275             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
276             entityIds.add(id);
277             shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
278         }
279
280         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
281                 leader.modificationsReceived, 10, TimeUnit.SECONDS));
282
283         // Sleep a little to ensure no additional BatchedModifications are received.
284
285         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
286
287         List<Modification> receivedMods = leader.getAndClearReceivedModifications();
288         for(int i = 0; i < max; i++) {
289             verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
290         }
291
292         assertEquals("# modifications received", max, receivedMods.size());
293     }
294
295     private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
296             YangInstanceIdentifier entityId, String candidateName) throws Exception {
297         verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
298     }
299
300     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
301             YangInstanceIdentifier entityId, String candidateName) throws Exception {
302         assertEquals("BatchedModifications size", 1, mods.size());
303         verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
304     }
305
306     private void verifyBatchedEntityCandidate(Modification mod, String entityType,
307             YangInstanceIdentifier entityId, String candidateName) throws Exception {
308         assertEquals("Modification type", MergeModification.class, mod.getClass());
309         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
310                 entityId, candidateName);
311     }
312
313     private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
314         Stopwatch sw = Stopwatch.createStarted();
315         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
316             NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
317             if(node != null) {
318                 return node;
319             }
320
321             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
322         }
323
324         return null;
325     }
326
327     private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
328             String localMemberName) {
329         verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
330             @Override
331             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
332                 try {
333                     return AbstractShardTest.readStore(shard, path);
334                 } catch(Exception e) {
335                     return null;
336                 }
337             }
338         });
339     }
340
341     private Props newShardProps() {
342         return newShardProps(Collections.<String,String>emptyMap());
343     }
344
345     private Props newShardProps(Map<String,String> peers) {
346         return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
347                 LOCAL_MEMBER_NAME);
348     }
349
350     public static class MockFollower extends UntypedActor {
351         volatile boolean grantVote;
352         volatile boolean dropAppendEntries;
353         private final String myId;
354
355         public MockFollower(String myId) {
356             this.myId = myId;
357         }
358
359         @Override
360         public void onReceive(Object message) {
361             if(message instanceof RequestVote) {
362                 if(grantVote) {
363                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
364                 }
365             } else if(message instanceof AppendEntries) {
366                 if(!dropAppendEntries) {
367                     AppendEntries req = (AppendEntries) message;
368                     long lastIndex = req.getLeaderCommit();
369                     if (req.getEntries().size() > 0) {
370                         for(ReplicatedLogEntry entry : req.getEntries()) {
371                             lastIndex = entry.getIndex();
372                         }
373                     }
374
375                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
376                             DataStoreVersions.CURRENT_VERSION), getSelf());
377                 }
378             }
379         }
380     }
381
382     public static class MockLeader extends UntypedActor {
383         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
384         List<Modification> receivedModifications = new ArrayList<>();
385         volatile boolean sendReply = true;
386         volatile long delay;
387
388         @Override
389         public void onReceive(Object message) {
390             if(message instanceof BatchedModifications) {
391                 if(delay > 0) {
392                     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
393                 }
394
395                 if(sendReply) {
396                     BatchedModifications mods = (BatchedModifications) message;
397                     synchronized (receivedModifications) {
398                         for(int i = 0; i < mods.getModifications().size(); i++) {
399                             receivedModifications.add(mods.getModifications().get(i));
400                             modificationsReceived.countDown();
401                         }
402                     }
403
404                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
405                 } else {
406                     sendReply = true;
407                 }
408             }
409         }
410
411         List<Modification> getAndClearReceivedModifications() {
412             synchronized (receivedModifications) {
413                 List<Modification> ret = new ArrayList<>(receivedModifications);
414                 receivedModifications.clear();
415                 return ret;
416             }
417         }
418     }
419 }