Bug 4105: Choose Owner for an Entity based on first come first served basis
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import static org.opendaylight.controller.cluster.datastore.entityownership.EntityOwnersModel.ENTITY_OWNERS_PATH;
13 import akka.actor.ActorRef;
14 import akka.actor.Props;
15 import akka.actor.UntypedActor;
16 import akka.dispatch.Dispatchers;
17 import akka.testkit.TestActorRef;
18 import com.google.common.base.Function;
19 import com.google.common.base.Stopwatch;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.util.concurrent.Uninterruptibles;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicInteger;
29 import org.junit.After;
30 import org.junit.Test;
31 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
32 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
33 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
34 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
35 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
36 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
37 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
38 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.Modification;
43 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
44 import org.opendaylight.controller.cluster.raft.TestActorFactory;
45 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
46 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
47 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
48 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
49 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
50 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
51 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
52 import org.opendaylight.yangtools.yang.common.QName;
53 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
54 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
55 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
56
57 /**
58  * Unit tests for EntityOwnershipShard.
59  *
60  * @author Thomas Pantelis
61  */
62 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
63     private static final String ENTITY_TYPE = "test type";
64     private static final YangInstanceIdentifier ENTITY_ID1 =
65             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
66     private static final YangInstanceIdentifier ENTITY_ID2 =
67             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
68     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
69     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
70     private static final String LOCAL_MEMBER_NAME = "member-1";
71
72     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
73             .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
74
75     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
76     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
77
78     @After
79     public void tearDown() {
80         actorFactory.close();
81     }
82
83     @Test
84     public void testOnRegisterCandidateLocal() throws Exception {
85         ShardTestKit kit = new ShardTestKit(getSystem());
86
87         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
88
89         kit.waitUntilLeader(shard);
90
91         YangInstanceIdentifier entityId = ENTITY_ID1;
92         Entity entity = new Entity(ENTITY_TYPE, entityId);
93         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
94
95         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
96         kit.expectMsgClass(SuccessReply.class);
97
98         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
99
100         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
101     }
102
103     @Test
104     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
105         ShardTestKit kit = new ShardTestKit(getSystem());
106
107         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
108
109         String peerId = actorFactory.generateActorId("follower");
110         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
111                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
112
113         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
114                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
115                 withDispatcher(Dispatchers.DefaultDispatcherId()));
116
117         YangInstanceIdentifier entityId = ENTITY_ID1;
118         Entity entity = new Entity(ENTITY_TYPE, entityId);
119         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
120
121         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
122         kit.expectMsgClass(SuccessReply.class);
123
124         // Now grant the vote so the shard becomes the leader. This should retry the commit.
125         peer.underlyingActor().grantVote = true;
126
127         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
128
129         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
130     }
131
132     @Test
133     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
134         ShardTestKit kit = new ShardTestKit(getSystem());
135
136         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
137                 shardTransactionCommitTimeoutInSeconds(1);
138
139         String peerId = actorFactory.generateActorId("follower");
140         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
141                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
142
143         MockFollower follower = peer.underlyingActor();
144         follower.grantVote = true;
145
146         // Drop AppendEntries so consensus isn't reached.
147         follower.dropAppendEntries = true;
148
149         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
150                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
151                 withDispatcher(Dispatchers.DefaultDispatcherId()));
152
153         kit.waitUntilLeader(shard);
154
155         YangInstanceIdentifier entityId = ENTITY_ID1;
156         Entity entity = new Entity(ENTITY_TYPE, entityId);
157         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
158
159         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
160         kit.expectMsgClass(SuccessReply.class);
161
162         // Wait enough time for the commit to timeout.
163         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
164
165         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
166         // write being applied to the state.
167         follower.dropAppendEntries = false;
168
169         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
170
171         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
172     }
173
174     @Test
175     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
176         ShardTestKit kit = new ShardTestKit(getSystem());
177
178         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
179                 shardIsolatedLeaderCheckIntervalInMillis(50);
180
181         String peerId = actorFactory.generateActorId("follower");
182         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
183                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
184
185         MockFollower follower = peer.underlyingActor();
186         follower.grantVote = true;
187
188         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
189                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
190                 withDispatcher(Dispatchers.DefaultDispatcherId()));
191
192         kit.waitUntilLeader(shard);
193
194         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
195         follower.dropAppendEntries = true;
196         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
197
198         YangInstanceIdentifier entityId = ENTITY_ID1;
199         Entity entity = new Entity(ENTITY_TYPE, entityId);
200         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
201
202         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
203         kit.expectMsgClass(SuccessReply.class);
204
205         // Resume AppendEntries - the candidate write should now be committed.
206         follower.dropAppendEntries = false;
207         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
208
209         verifyOwner(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
210     }
211
212     @Test
213     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
214         ShardTestKit kit = new ShardTestKit(getSystem());
215
216         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100).
217                 shardBatchedModificationCount(5);
218
219         String peerId = actorFactory.generateActorId("leader");
220         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
221                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
222
223         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
224                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
225                 withDispatcher(Dispatchers.DefaultDispatcherId()));
226
227         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
228                 DataStoreVersions.CURRENT_VERSION), peer);
229
230         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
231
232         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
233         kit.expectMsgClass(SuccessReply.class);
234
235         MockLeader leader = peer.underlyingActor();
236         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
237                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
238         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID1,
239                 LOCAL_MEMBER_NAME);
240
241         shard.tell(dataStoreContextBuilder.shardElectionTimeoutFactor(2).build(), ActorRef.noSender());
242
243         // Test with initial commit timeout and subsequent retry.
244
245         leader.modificationsReceived = new CountDownLatch(1);
246         leader.sendReply = false;
247
248         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
249
250         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
251         kit.expectMsgClass(SuccessReply.class);
252
253         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
254                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
255         verifyBatchedEntityCandidate(leader.getAndClearReceivedModifications(), ENTITY_TYPE, ENTITY_ID2,
256                 LOCAL_MEMBER_NAME);
257
258         // Send a bunch of registration messages quickly and verify.
259
260         int max = 100;
261         leader.delay = 4;
262         leader.modificationsReceived = new CountDownLatch(max);
263         List<YangInstanceIdentifier> entityIds = new ArrayList<>();
264         for(int i = 1; i <= max; i++) {
265             YangInstanceIdentifier id = YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "test" + i));
266             entityIds.add(id);
267             shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, id)), kit.getRef());
268         }
269
270         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
271                 leader.modificationsReceived, 10, TimeUnit.SECONDS));
272
273         // Sleep a little to ensure no additional BatchedModifications are received.
274
275         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
276
277         List<Modification> receivedMods = leader.getAndClearReceivedModifications();
278         for(int i = 0; i < max; i++) {
279             verifyBatchedEntityCandidate(receivedMods.get(i), ENTITY_TYPE, entityIds.get(i), LOCAL_MEMBER_NAME);
280         }
281
282         assertEquals("# modifications received", max, receivedMods.size());
283     }
284
285     private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
286             YangInstanceIdentifier entityId, String candidateName) throws Exception {
287         verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
288     }
289
290     private void verifyBatchedEntityCandidate(List<Modification> mods, String entityType,
291             YangInstanceIdentifier entityId, String candidateName) throws Exception {
292         assertEquals("BatchedModifications size", 1, mods.size());
293         verifyBatchedEntityCandidate(mods.get(0), entityType, entityId, candidateName);
294     }
295
296     private void verifyBatchedEntityCandidate(Modification mod, String entityType,
297             YangInstanceIdentifier entityId, String candidateName) throws Exception {
298         assertEquals("Modification type", MergeModification.class, mod.getClass());
299         verifyEntityCandidate(((MergeModification)mod).getData(), entityType,
300                 entityId, candidateName);
301     }
302
303     private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
304         Stopwatch sw = Stopwatch.createStarted();
305         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
306             NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, ENTITY_OWNERS_PATH);
307             if(node != null) {
308                 return node;
309             }
310
311             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
312         }
313
314         return null;
315     }
316
317     private void verifyOwner(final TestActorRef<EntityOwnershipShard> shard, String entityType, YangInstanceIdentifier entityId,
318             String localMemberName) {
319         verifyOwner(localMemberName, entityType, entityId, new Function<YangInstanceIdentifier, NormalizedNode<?,?>>() {
320             @Override
321             public NormalizedNode<?, ?> apply(YangInstanceIdentifier path) {
322                 try {
323                     return AbstractShardTest.readStore(shard, path);
324                 } catch(Exception e) {
325                     return null;
326                 }
327             }
328         });
329     }
330
331     private Props newShardProps() {
332         return newShardProps(Collections.<String,String>emptyMap());
333     }
334
335     private Props newShardProps(Map<String,String> peers) {
336         return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
337                 LOCAL_MEMBER_NAME);
338     }
339
340     public static class MockFollower extends UntypedActor {
341         volatile boolean grantVote;
342         volatile boolean dropAppendEntries;
343         private final String myId;
344
345         public MockFollower(String myId) {
346             this.myId = myId;
347         }
348
349         @Override
350         public void onReceive(Object message) {
351             if(message instanceof RequestVote) {
352                 if(grantVote) {
353                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
354                 }
355             } else if(message instanceof AppendEntries) {
356                 if(!dropAppendEntries) {
357                     AppendEntries req = (AppendEntries) message;
358                     long lastIndex = req.getLeaderCommit();
359                     if (req.getEntries().size() > 0) {
360                         for(ReplicatedLogEntry entry : req.getEntries()) {
361                             lastIndex = entry.getIndex();
362                         }
363                     }
364
365                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
366                             DataStoreVersions.CURRENT_VERSION), getSelf());
367                 }
368             }
369         }
370     }
371
372     public static class MockLeader extends UntypedActor {
373         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
374         List<Modification> receivedModifications = new ArrayList<>();
375         volatile boolean sendReply = true;
376         volatile long delay;
377
378         @Override
379         public void onReceive(Object message) {
380             if(message instanceof BatchedModifications) {
381                 if(delay > 0) {
382                     Uninterruptibles.sleepUninterruptibly(delay, TimeUnit.MILLISECONDS);
383                 }
384
385                 if(sendReply) {
386                     BatchedModifications mods = (BatchedModifications) message;
387                     synchronized (receivedModifications) {
388                         for(int i = 0; i < mods.getModifications().size(); i++) {
389                             receivedModifications.add(mods.getModifications().get(i));
390                             modificationsReceived.countDown();
391                         }
392                     }
393
394                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
395                 } else {
396                     sendReply = true;
397                 }
398             }
399         }
400
401         List<Modification> getAndClearReceivedModifications() {
402             synchronized (receivedModifications) {
403                 List<Modification> ret = new ArrayList<>(receivedModifications);
404                 receivedModifications.clear();
405                 return ret;
406             }
407         }
408     }
409 }