7a251b978435d2101e39bd18b12af2012798168c
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / entityownership / EntityOwnershipShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.entityownership;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.mockito.Mockito.mock;
12 import akka.actor.ActorRef;
13 import akka.actor.Props;
14 import akka.actor.UntypedActor;
15 import akka.dispatch.Dispatchers;
16 import akka.testkit.TestActorRef;
17 import com.google.common.base.Stopwatch;
18 import com.google.common.collect.ImmutableMap;
19 import com.google.common.util.concurrent.Uninterruptibles;
20 import java.util.Collections;
21 import java.util.Map;
22 import java.util.concurrent.CountDownLatch;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import org.junit.After;
26 import org.junit.Test;
27 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
28 import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
29 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
30 import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
31 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
32 import org.opendaylight.controller.cluster.datastore.entityownership.messages.RegisterCandidateLocal;
33 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
34 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
35 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
36 import org.opendaylight.controller.cluster.datastore.messages.SuccessReply;
37 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
38 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
39 import org.opendaylight.controller.cluster.raft.TestActorFactory;
40 import org.opendaylight.controller.cluster.raft.messages.AppendEntries;
41 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
42 import org.opendaylight.controller.cluster.raft.messages.RequestVote;
43 import org.opendaylight.controller.cluster.raft.messages.RequestVoteReply;
44 import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelper;
45 import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
46 import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipCandidate;
47 import org.opendaylight.yangtools.yang.common.QName;
48 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
49 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
50 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
51
52 /**
53  * Unit tests for EntityOwnershipShard.
54  *
55  * @author Thomas Pantelis
56  */
57 public class EntityOwnershipShardTest extends AbstractEntityOwnershipTest {
58     private static final String ENTITY_TYPE = "test type";
59     private static final YangInstanceIdentifier ENTITY_ID1 =
60             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity1"));
61     private static final YangInstanceIdentifier ENTITY_ID2 =
62             YangInstanceIdentifier.of(QName.create("test", "2015-08-14", "entity2"));
63     private static final SchemaContext SCHEMA_CONTEXT = SchemaContextHelper.entityOwners();
64     private static final AtomicInteger NEXT_SHARD_NUM = new AtomicInteger();
65     private static final String LOCAL_MEMBER_NAME = "member-1";
66
67     private final ShardIdentifier shardID = ShardIdentifier.builder().memberName(LOCAL_MEMBER_NAME)
68             .shardName("entity-ownership").type("operational" + NEXT_SHARD_NUM.getAndIncrement()).build();
69
70     private final Builder dataStoreContextBuilder = DatastoreContext.newBuilder();
71     private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
72
73     @After
74     public void tearDown() {
75         actorFactory.close();
76     }
77
78     @Test
79     public void testOnRegisterCandidateLocal() throws Exception {
80         ShardTestKit kit = new ShardTestKit(getSystem());
81
82         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps());
83
84         kit.waitUntilLeader(shard);
85
86         YangInstanceIdentifier entityId = ENTITY_ID1;
87         Entity entity = new Entity(ENTITY_TYPE, entityId);
88         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
89
90         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
91         kit.expectMsgClass(SuccessReply.class);
92
93         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
94     }
95
96     @Test
97     public void testOnRegisterCandidateLocalWithNoInitialLeader() throws Exception {
98         ShardTestKit kit = new ShardTestKit(getSystem());
99
100         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2);
101
102         String peerId = actorFactory.generateActorId("follower");
103         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
104                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
105
106         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
107                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
108                 withDispatcher(Dispatchers.DefaultDispatcherId()));
109
110         YangInstanceIdentifier entityId = ENTITY_ID1;
111         Entity entity = new Entity(ENTITY_TYPE, entityId);
112         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
113
114         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
115         kit.expectMsgClass(SuccessReply.class);
116
117         // Now grant the vote so the shard becomes the leader. This should retry the commit.
118         peer.underlyingActor().grantVote = true;
119
120         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
121     }
122
123     @Test
124     public void testOnRegisterCandidateLocalWithNoInitialConsensus() throws Exception {
125         ShardTestKit kit = new ShardTestKit(getSystem());
126
127         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
128                 shardTransactionCommitTimeoutInSeconds(1);
129
130         String peerId = actorFactory.generateActorId("follower");
131         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
132                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
133
134         MockFollower follower = peer.underlyingActor();
135         follower.grantVote = true;
136
137         // Drop AppendEntries so consensus isn't reached.
138         follower.dropAppendEntries = true;
139
140         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
141                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
142                 withDispatcher(Dispatchers.DefaultDispatcherId()));
143
144         kit.waitUntilLeader(shard);
145
146         YangInstanceIdentifier entityId = ENTITY_ID1;
147         Entity entity = new Entity(ENTITY_TYPE, entityId);
148         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
149
150         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
151         kit.expectMsgClass(SuccessReply.class);
152
153         // Wait enough time for the commit to timeout.
154         Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
155
156         // Resume AppendEntries - the follower should ack the commit which should then result in the candidate
157         // write being applied to the state.
158         follower.dropAppendEntries = false;
159         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
160     }
161
162     @Test
163     public void testOnRegisterCandidateLocalWithIsolatedLeader() throws Exception {
164         ShardTestKit kit = new ShardTestKit(getSystem());
165
166         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(2).
167                 shardIsolatedLeaderCheckIntervalInMillis(50);
168
169         String peerId = actorFactory.generateActorId("follower");
170         TestActorRef<MockFollower> peer = actorFactory.createTestActor(Props.create(MockFollower.class, peerId).
171                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
172
173         MockFollower follower = peer.underlyingActor();
174         follower.grantVote = true;
175
176         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
177                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
178                 withDispatcher(Dispatchers.DefaultDispatcherId()));
179
180         kit.waitUntilLeader(shard);
181
182         // Drop AppendEntries and wait enough time for the shard to switch to IsolatedLeader.
183         follower.dropAppendEntries = true;
184         Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
185
186         YangInstanceIdentifier entityId = ENTITY_ID1;
187         Entity entity = new Entity(ENTITY_TYPE, entityId);
188         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
189
190         shard.tell(new RegisterCandidateLocal(candidate, entity), kit.getRef());
191         kit.expectMsgClass(SuccessReply.class);
192
193         // Resume AppendEntries - the candidate write should now be committed.
194         follower.dropAppendEntries = false;
195         verifyCommittedEntityCandidate(shard, ENTITY_TYPE, entityId, LOCAL_MEMBER_NAME);
196     }
197
198     @Test
199     public void testOnRegisterCandidateLocalWithRemoteLeader() throws Exception {
200         ShardTestKit kit = new ShardTestKit(getSystem());
201
202         dataStoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(100);
203
204         String peerId = actorFactory.generateActorId("leader");
205         TestActorRef<MockLeader> peer = actorFactory.createTestActor(Props.create(MockLeader.class).
206                 withDispatcher(Dispatchers.DefaultDispatcherId()), peerId);
207
208         TestActorRef<EntityOwnershipShard> shard = actorFactory.createTestActor(newShardProps(
209                 ImmutableMap.<String, String>builder().put(peerId, peer.path().toString()).build()).
210                 withDispatcher(Dispatchers.DefaultDispatcherId()));
211
212         shard.tell(new AppendEntries(1L, peerId, -1L, -1L, Collections.<ReplicatedLogEntry>emptyList(), -1L, -1L,
213                 DataStoreVersions.CURRENT_VERSION), peer);
214
215         EntityOwnershipCandidate candidate = mock(EntityOwnershipCandidate.class);
216
217         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID1)), kit.getRef());
218         kit.expectMsgClass(SuccessReply.class);
219
220         MockLeader leader = peer.underlyingActor();
221         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
222                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
223         verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID1, LOCAL_MEMBER_NAME);
224
225         leader.modificationsReceived = new CountDownLatch(2);
226         leader.sendReply = false;
227
228         shard.tell(dataStoreContextBuilder.shardTransactionCommitTimeoutInSeconds(1).build(), ActorRef.noSender());
229
230         shard.tell(new RegisterCandidateLocal(candidate, new Entity(ENTITY_TYPE, ENTITY_ID2)), kit.getRef());
231         kit.expectMsgClass(SuccessReply.class);
232
233         assertEquals("Leader received BatchedModifications", true, Uninterruptibles.awaitUninterruptibly(
234                 leader.modificationsReceived, 5, TimeUnit.SECONDS));
235         verifyBatchedEntityCandidate(leader.receivedModifications, ENTITY_TYPE, ENTITY_ID2, LOCAL_MEMBER_NAME);
236     }
237
238     private void verifyCommittedEntityCandidate(TestActorRef<EntityOwnershipShard> shard, String entityType,
239             YangInstanceIdentifier entityId, String candidateName) throws Exception {
240         verifyEntityCandidate(readEntityOwners(shard), entityType, entityId, candidateName);
241     }
242
243     private void verifyBatchedEntityCandidate(BatchedModifications mods, String entityType,
244             YangInstanceIdentifier entityId, String candidateName) throws Exception {
245         assertEquals("BatchedModifications size", 1, mods.getModifications().size());
246         assertEquals("Modification type", MergeModification.class, mods.getModifications().get(0).getClass());
247         verifyEntityCandidate(((MergeModification)mods.getModifications().get(0)).getData(), entityType,
248                 entityId, candidateName);
249     }
250
251     private NormalizedNode<?, ?> readEntityOwners(TestActorRef<EntityOwnershipShard> shard) throws Exception {
252         Stopwatch sw = Stopwatch.createStarted();
253         while(sw.elapsed(TimeUnit.MILLISECONDS) <= 5000) {
254             NormalizedNode<?, ?> node = AbstractShardTest.readStore(shard, EntityOwnershipShard.ENTITY_OWNERS_PATH);
255             if(node != null) {
256                 return node;
257             }
258
259             Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
260         }
261
262         return null;
263     }
264
265     private Props newShardProps() {
266         return newShardProps(Collections.<String,String>emptyMap());
267     }
268
269     private Props newShardProps(Map<String,String> peers) {
270         return EntityOwnershipShard.props(shardID, peers, dataStoreContextBuilder.build(), SCHEMA_CONTEXT,
271                 LOCAL_MEMBER_NAME);
272     }
273
274     public static class MockFollower extends UntypedActor {
275         volatile boolean grantVote;
276         volatile boolean dropAppendEntries;
277         private final String myId;
278
279         public MockFollower(String myId) {
280             this.myId = myId;
281         }
282
283         @Override
284         public void onReceive(Object message) {
285             if(message instanceof RequestVote) {
286                 if(grantVote) {
287                     getSender().tell(new RequestVoteReply(((RequestVote)message).getTerm(), true), getSelf());
288                 }
289             } else if(message instanceof AppendEntries) {
290                 if(!dropAppendEntries) {
291                     AppendEntries req = (AppendEntries) message;
292                     long lastIndex = req.getLeaderCommit();
293                     if (req.getEntries().size() > 0) {
294                         for(ReplicatedLogEntry entry : req.getEntries()) {
295                             lastIndex = entry.getIndex();
296                         }
297                     }
298
299                     getSender().tell(new AppendEntriesReply(myId, req.getTerm(), true, lastIndex, req.getTerm(),
300                             DataStoreVersions.CURRENT_VERSION), getSelf());
301                 }
302             }
303         }
304     }
305
306     public static class MockLeader extends UntypedActor {
307         volatile CountDownLatch modificationsReceived = new CountDownLatch(1);
308         volatile BatchedModifications receivedModifications;
309         volatile boolean sendReply = true;
310
311         @Override
312         public void onReceive(Object message) {
313             if(message instanceof BatchedModifications) {
314                 receivedModifications = (BatchedModifications) message;
315                 modificationsReceived.countDown();
316                 if(sendReply) {
317                     getSender().tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
318                 } else {
319                     sendReply = true;
320                 }
321             }
322         }
323     }
324 }