Merge "Refactor ShardTest"
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / compat / PreLithiumShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.compat;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Mockito.inOrder;
14 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.Patterns;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import com.google.common.util.concurrent.MoreExecutors;
24 import java.io.IOException;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.Set;
28 import java.util.concurrent.CountDownLatch;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicReference;
31 import org.junit.Test;
32 import org.mockito.InOrder;
33 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
34 import org.opendaylight.controller.cluster.datastore.Shard;
35 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
36 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
45 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
48 import org.opendaylight.controller.cluster.datastore.utils.InMemoryJournal;
49 import org.opendaylight.controller.cluster.datastore.utils.InMemorySnapshotStore;
50 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
51 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
52 import org.opendaylight.controller.cluster.raft.Snapshot;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
54 import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
55 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
56 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
57 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
58 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
59 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
60 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
61 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
62 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
63 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
64 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
65 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
66 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
67 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
68 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
69 import scala.concurrent.Future;
70 import scala.concurrent.duration.FiniteDuration;
71
72 /**
73  * Unit tests for backwards compatibility with pre-Lithium versions.
74  *
75  * @author Thomas Pantelis
76  */
77 public class PreLithiumShardTest extends AbstractShardTest {
78
79     private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
80         MutableCompositeModification compMod = new MutableCompositeModification();
81         for(Modification mod: mods) {
82             compMod.addModification(mod);
83         }
84
85         return new CompositeModificationPayload(compMod.toSerializable());
86     }
87
88     private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
89         MutableCompositeModification compMod = new MutableCompositeModification();
90         for(Modification mod: mods) {
91             compMod.addModification(mod);
92         }
93
94         return new CompositeModificationByteStringPayload(compMod.toSerializable());
95     }
96
97     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
98         MutableCompositeModification compMod = new MutableCompositeModification();
99         for(Modification mod: mods) {
100             compMod.addModification(mod);
101         }
102
103         return new ModificationPayload(compMod);
104     }
105
106     @Test
107     public void testApplyHelium2VersionSnapshot() throws Exception {
108         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
109                 "testApplyHelium2VersionSnapshot");
110
111         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
112
113         InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", MoreExecutors.sameThreadExecutor());
114         store.onGlobalContextUpdated(SCHEMA_CONTEXT);
115
116         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
117
118         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
119         NormalizedNode<?,?> expected = readStore(store, root);
120
121         NormalizedNodeMessages.Container encode = codec.encode(expected);
122
123         ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
124                 encode.getNormalizedNode().toByteString().toByteArray(),
125                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
126
127         shard.underlyingActor().onReceiveCommand(applySnapshot);
128
129         NormalizedNode<?,?> actual = readStore(shard, root);
130
131         assertEquals("Root node", expected, actual);
132
133         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
134     }
135
136     @Test
137     public void testHelium2VersionApplyStateLegacy() throws Exception {
138
139         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy");
140
141         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
142
143         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
144                 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
145
146         shard.underlyingActor().onReceiveCommand(applyState);
147
148         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
149         assertEquals("Applied state", node, actual);
150
151         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
152     }
153
154     @Test
155     public void testHelium2VersionRecovery() throws Exception {
156
157         // Set up the InMemorySnapshotStore.
158
159         InMemoryDOMDataStore testStore = InMemoryDOMDataStoreFactory.create("Test", null, null);
160         testStore.onGlobalContextUpdated(SCHEMA_CONTEXT);
161
162         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
163
164         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
165
166         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
167                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
168                                 getNormalizedNode().toByteString().toByteArray(),
169                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
170
171         // Set up the InMemoryJournal.
172
173         InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
174                   new WriteModification(TestModel.OUTER_LIST_PATH,
175                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
176
177         int nListEntries = 16;
178         Set<Integer> listEntryKeys = new HashSet<>();
179         int i = 1;
180
181         // Add some CompositeModificationPayload entries
182         for(; i <= 8; i++) {
183             listEntryKeys.add(Integer.valueOf(i));
184             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
185                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
186             Modification mod = new MergeModification(path,
187                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
188             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
189                     newLegacyPayload(mod)));
190         }
191
192         // Add some CompositeModificationByteStringPayload entries
193         for(; i <= nListEntries; i++) {
194             listEntryKeys.add(Integer.valueOf(i));
195             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
196                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
197             Modification mod = new MergeModification(path,
198                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
199             InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
200                     newLegacyByteStringPayload(mod)));
201         }
202
203         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
204
205         testRecovery(listEntryKeys);
206     }
207
208     @SuppressWarnings({ "unchecked" })
209     @Test
210     public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
211         new ShardTestKit(getSystem()) {{
212             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
213                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
214                     "testConcurrentThreePhaseCommits");
215
216             waitUntilLeader(shard);
217
218             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
219
220             InMemoryDOMDataStore dataStore = shard.underlyingActor().getDataStore();
221
222             String transactionID1 = "tx1";
223             MutableCompositeModification modification1 = new MutableCompositeModification();
224             DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
225                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
226
227             String transactionID2 = "tx2";
228             MutableCompositeModification modification2 = new MutableCompositeModification();
229             DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
230                     TestModel.OUTER_LIST_PATH,
231                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
232                     modification2);
233
234             String transactionID3 = "tx3";
235             MutableCompositeModification modification3 = new MutableCompositeModification();
236             DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
237                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
238                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
239                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
240                     modification3);
241
242             long timeoutSec = 5;
243             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
244             final Timeout timeout = new Timeout(duration);
245
246             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
247             // by the ShardTransaction.
248
249             shard.tell(new ForwardedReadyTransaction(transactionID1, CURRENT_VERSION,
250                     cohort1, modification1, true), getRef());
251             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
252                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
253             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
254
255             // Send the CanCommitTransaction message for the first Tx.
256
257             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
258             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
259                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
260             assertEquals("Can commit", true, canCommitReply.getCanCommit());
261
262             // Send the ForwardedReadyTransaction for the next 2 Tx's.
263
264             shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION,
265                     cohort2, modification2, true), getRef());
266             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
267
268             shard.tell(new ForwardedReadyTransaction(transactionID3, CURRENT_VERSION,
269                     cohort3, modification3, true), getRef());
270             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
271
272             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
273             // processed after the first Tx completes.
274
275             Future<Object> canCommitFuture1 = Patterns.ask(shard,
276                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
277
278             Future<Object> canCommitFuture2 = Patterns.ask(shard,
279                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
280
281             // Send the CommitTransaction message for the first Tx. After it completes, it should
282             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
283
284             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
285             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
286
287             // Wait for the next 2 Tx's to complete.
288
289             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
290             final CountDownLatch commitLatch = new CountDownLatch(2);
291
292             class OnFutureComplete extends OnComplete<Object> {
293                 private final Class<?> expRespType;
294
295                 OnFutureComplete(final Class<?> expRespType) {
296                     this.expRespType = expRespType;
297                 }
298
299                 @Override
300                 public void onComplete(final Throwable error, final Object resp) {
301                     if(error != null) {
302                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
303                     } else {
304                         try {
305                             assertEquals("Commit response type", expRespType, resp.getClass());
306                             onSuccess(resp);
307                         } catch (Exception e) {
308                             caughtEx.set(e);
309                         }
310                     }
311                 }
312
313                 void onSuccess(final Object resp) throws Exception {
314                 }
315             }
316
317             class OnCommitFutureComplete extends OnFutureComplete {
318                 OnCommitFutureComplete() {
319                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
320                 }
321
322                 @Override
323                 public void onComplete(final Throwable error, final Object resp) {
324                     super.onComplete(error, resp);
325                     commitLatch.countDown();
326                 }
327             }
328
329             class OnCanCommitFutureComplete extends OnFutureComplete {
330                 private final String transactionID;
331
332                 OnCanCommitFutureComplete(final String transactionID) {
333                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
334                     this.transactionID = transactionID;
335                 }
336
337                 @Override
338                 void onSuccess(final Object resp) throws Exception {
339                     CanCommitTransactionReply canCommitReply =
340                             CanCommitTransactionReply.fromSerializable(resp);
341                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
342
343                     Future<Object> commitFuture = Patterns.ask(shard,
344                             new CommitTransaction(transactionID).toSerializable(), timeout);
345                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
346                 }
347             }
348
349             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
350                     getSystem().dispatcher());
351
352             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
353                     getSystem().dispatcher());
354
355             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
356
357             if(caughtEx.get() != null) {
358                 throw caughtEx.get();
359             }
360
361             assertEquals("Commits complete", true, done);
362
363             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
364             inOrder.verify(cohort1).canCommit();
365             inOrder.verify(cohort1).preCommit();
366             inOrder.verify(cohort1).commit();
367             inOrder.verify(cohort2).canCommit();
368             inOrder.verify(cohort2).preCommit();
369             inOrder.verify(cohort2).commit();
370             inOrder.verify(cohort3).canCommit();
371             inOrder.verify(cohort3).preCommit();
372             inOrder.verify(cohort3).commit();
373
374             // Verify data in the data store.
375
376             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
377             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
378             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
379                     outerList.getValue() instanceof Iterable);
380             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
381             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
382                        entry instanceof MapEntryNode);
383             MapEntryNode mapEntry = (MapEntryNode)entry;
384             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
385                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
386             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
387             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
388
389             verifyLastLogIndex(shard, 2);
390
391             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
392         }};
393     }
394 }