Bug 4105: Change commit retry mechanism in EntityOwnershipShard
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.TestActorRef;
18 import com.google.common.base.Stopwatch;
19 import com.google.common.collect.ImmutableMap;
20 import com.google.common.util.concurrent.Uninterruptibles;
21 import java.util.ArrayList;
22 import java.util.Collections;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.concurrent.CountDownLatch;
26 import java.util.concurrent.TimeUnit;
27 import java.util.concurrent.atomic.AtomicInteger;
28 import org.junit.After;
29 import org.junit.Test;
30 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
31 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
32 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
34 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
35 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
36 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
37 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
40 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
41 import org.opendaylight.controller.cluster.datastore.modification.Modification;
42 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
43 import org.opendaylight.controller.cluster.raft.TestActorFactory;
44 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
45 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
46 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
48 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
49 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
50 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
51 import org.opendaylight.yangtools.yang.common.QName;
52 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
53 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
54 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
55
56 /**
57  * Unit tests for EntityOwnershipShard.
58  *
59  * @author Thomas Pantelis
60  */
61 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
62     private static final String ENTITY_TYPE = "test type";
63     private static final YangInstanceIdentifier ENTITY_ID1 =
64             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
65     private static final YangInstanceIdentifier ENTITY_ID2 =
66             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
67     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
68     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
69     private static final String LOCAL_MEMBER_NAME = "member-1";
70
71     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
72             .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
73
74     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
75     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
76
77     @After
78     public void tearDown() {
79         actorFactory.close();
80     }
81
82     @Test
83     public void testOnRegisterCandidateLocal() throws Exception {
84         ShardTestKit kit = new ShardTestKit(getSystem());
85
86         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
87
88         kit.waitUntilLeader(shard);
89
90         YangInstanceIdentifier entityId = ENTITY_ID1;
91         Entity entity = new Entity(ENTITY_TYPE, entityId);
92         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
93
94         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
95         kit.expectMsgClass(SuccessReply.class);
96
97         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
98     }
99
100     @Test
101     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
102         ShardTestKit kit = new ShardTestKit(getSystem());
103
104         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
105
106         String peerId = actorFactory.generateActorId("follower");
107         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
108                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
109
110         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
111                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
112                 withDispatcher(Dispatchers.DefaultDispatcherId()));
113
114         YangInstanceIdentifier entityId = ENTITY_ID1;
115         Entity entity = new Entity(ENTITY_TYPE, entityId);
116         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
117
118         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
119         kit.expectMsgClass(SuccessReply.class);
120
121         // Now grant the vote so the shard becomes the leader. This should retry the commit.
122         peer.underlyingActor().grantVote = true;
123
124         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
125     }
126
127     @Test
128     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
129         ShardTestKit kit = new ShardTestKit(getSystem());
130
131         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
132                 shardTransactionCommitTimeoutInSeconds(1);
133
134         String peerId = actorFactory.generateActorId("follower");
135         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
136                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
137
138         MockFollower follower = peer.underlyingActor();
139         follower.grantVote = true;
140
141         // Drop AppendEntries so consensus isn't reached.
142         follower.dropAppendEntries = true;
143
144         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
145                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
146                 withDispatcher(Dispatchers.DefaultDispatcherId()));
147
148         kit.waitUntilLeader(shard);
149
150         YangInstanceIdentifier entityId = ENTITY_ID1;
151         Entity entity = new Entity(ENTITY_TYPE, entityId);
152         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
153
154         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
155         kit.expectMsgClass(SuccessReply.class);
156
157         // Wait enough time for the commit to timeout.
158         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
159
160         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
161         // write being applied to the state.
162         follower.dropAppendEntries = false;
163         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
164     }
165
166     @Test
167     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
168         ShardTestKit kit = new ShardTestKit(getSystem());
169
170         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
171                 shardIsolatedLeaderCheckIntervalInMillis(50);
172
173         String peerId = actorFactory.generateActorId("follower");
174         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
175                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
176
177         MockFollower follower = peer.underlyingActor();
178         follower.grantVote = true;
179
180         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
181                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
182                 withDispatcher(Dispatchers.DefaultDispatcherId()));
183
184         kit.waitUntilLeader(shard);
185
186         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
187         follower.dropAppendEntries = true;
188         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
189
190         YangInstanceIdentifier entityId = ENTITY_ID1;
191         Entity entity = new Entity(ENTITY_TYPE, entityId);
192         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
193
194         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
195         kit.expectMsgClass(SuccessReply.class);
196
197         // Resume AppendEntries - the candidate write should now be committed.
198         follower.dropAppendEntries = false;
199         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
200     }
201
202     @Test
203     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
204         ShardTestKit kit = new ShardTestKit(getSystem());
205
206         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
207                 shardBatchedModificationCount(5);
208
209         String peerId = actorFactory.generateActorId("leader");
210         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
211                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
212
213         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
214                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
215                 withDispatcher(Dispatchers.DefaultDispatcherId()));
216
217         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
218                 DataStoreVersions.CURRENT_VERSION), peer);
219
220         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
221
222         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
223         kit.expectMsgClass(SuccessReply.class);
224
225         MockLeader leader = peer.underlyingActor();
226         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
227                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
228         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
229                 LOCAL_MEMBER_NAME);
230
231         shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
232
233         // Test with initial commit timeout and subsequent retry.
234
235         leader.modificationsReceived = new CountDownLatch(1);
236         leader.sendReply = false;
237
238         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
239
240         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
241         kit.expectMsgClass(SuccessReply.class);
242
243         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
244                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
245         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
246                 LOCAL_MEMBER_NAME);
247
248         // Send a bunch of registration messages quickly and verify.
249
250         int max = 100;
251         leader.delay = 4;
252         leader.modificationsReceived = new CountDownLatch(max);
253         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
254         for(int i = 1; i <= max; i++) {
255             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
256             entityIds.add(id);
257             shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
258         }
259
260         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
261                 leader.modificationsReceived, 10, TimeUnit.SECONDS));
262
263         // Sleep a little to ensure no additional BatchedModifications are received.
264
265         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
266
267         List<Modification> receivedMods = leader.getAndClearReceivedModifications();
268         for(int i = 0; i < max; i++) {
269             verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
270         }
271
272         assertEquals("# modifications received", max, receivedMods.size());
273     }
274
275     private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
276             YangInstanceIdentifier entityId, String candidateName) throws Exception {
277         verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
278     }
279
280     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
281             YangInstanceIdentifier entityId, String candidateName) throws Exception {
282         assertEquals("BatchedModifications size", 1, mods.size());
283         verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
284     }
285
286     private void verifyBatchedEntityCandidate(Modification mod, String entityType,
287             YangInstanceIdentifier entityId, String candidateName) throws Exception {
288         assertEquals("Modification type", MergeModification.class, mod.getClass());
289         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
290                 entityId, candidateName);
291     }
292
293     private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
294         Stopwatch sw = Stopwatch.createStarted();
295         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
296             NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
297             if(node != null) {
298                 return node;
299             }
300
301             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
302         }
303
304         return null;
305     }
306
307     private Props newShardProps() {
308         return newShardProps(Collections.<String,String>emptyMap());
309     }
310
311     private Props newShardProps(Map<String,String> peers) {
312         return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
313                 LOCAL_MEMBER_NAME);
314     }
315
316     public static class MockFollower extends UntypedActor {
317         volatile boolean grantVote;
318         volatile boolean dropAppendEntries;
319         private final String myId;
320
321         public MockFollower(String myId) {
322             this.myId = myId;
323         }
324
325         @Override
326         public void onReceive(Object message) {
327             if(message instanceof RequestVote) {
328                 if(grantVote) {
329                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
330                 }
331             } else if(message instanceof AppendEntries) {
332                 if(!dropAppendEntries) {
333                     AppendEntries req = (AppendEntries) message;
334                     long lastIndex = req.getLeaderCommit();
335                     if (req.getEntries().size() > 0) {
336                         for(ReplicatedLogEntry entry : req.getEntries()) {
337                             lastIndex = entry.getIndex();
338                         }
339                     }
340
341                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
342                             DataStoreVersions.CURRENT_VERSION), getSelf());
343                 }
344             }
345         }
346     }
347
348     public static class MockLeader extends UntypedActor {
349         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
350         List<Modification> receivedModifications = new ArrayList<>();
351         volatile boolean sendReply = true;
352         volatile long delay;
353
354         @Override
355         public void onReceive(Object message) {
356             if(message instanceof BatchedModifications) {
357                 if(delay > 0) {
358                     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
359                 }
360
361                 if(sendReply) {
362                     BatchedModifications mods = (BatchedModifications) message;
363                     synchronized (receivedModifications) {
364                         for(int i = 0; i < mods.getModifications().size(); i++) {
365                             receivedModifications.add(mods.getModifications().get(i));
366                             modificationsReceived.countDown();
367                         }
368                     }
369
370                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
371                 } else {
372                     sendReply = true;
373                 }
374             }
375         }
376
377         List<Modification> getAndClearReceivedModifications() {
378             synchronized (receivedModifications) {
379                 List<Modification> ret = new ArrayList<>(receivedModifications);
380                 receivedModifications.clear();
381                 return ret;
382             }
383         }
384     }
385 }