07b7f0e33462dc70df6d025fb8ae36b73795b199
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / compat / PreLithiumShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.compat;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Mockito.inOrder;
14 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.Patterns;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import java.io.IOException;
24 import java.util.Collections;
25 import java.util.HashSet;
26 import java.util.Set;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
33 import org.opendaylight.controller.cluster.datastore.Shard;
34 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
35 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
36 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
42 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
43 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.Modification;
45 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
46 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
47 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
48 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
50 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
51 import org.opendaylight.controller.cluster.raft.Snapshot;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
53 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
54 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
55 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
56 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
57 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
58 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
59 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
60 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
61 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
62 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
63 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
64 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
65 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
66 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
67 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
68 import scala.concurrent.Future;
69 import scala.concurrent.duration.FiniteDuration;
70
71 /**
72  * Unit tests for backwards compatibility with pre-Lithium versions.
73  *
74  * @author Thomas Pantelis
75  */
76 public class PreLithiumShardTest extends AbstractShardTest {
77
78     private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
79         MutableCompositeModification compMod = new MutableCompositeModification();
80         for(Modification mod: mods) {
81             compMod.addModification(mod);
82         }
83
84         return new CompositeModificationPayload(compMod.toSerializable());
85     }
86
87     private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
88         MutableCompositeModification compMod = new MutableCompositeModification();
89         for(Modification mod: mods) {
90             compMod.addModification(mod);
91         }
92
93         return new CompositeModificationByteStringPayload(compMod.toSerializable());
94     }
95
96     private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
97         MutableCompositeModification compMod = new MutableCompositeModification();
98         for(Modification mod: mods) {
99             compMod.addModification(mod);
100         }
101
102         return new ModificationPayload(compMod);
103     }
104
105     @Test
106     public void testApplyHelium2VersionSnapshot() throws Exception {
107         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
108                 "testApplyHelium2VersionSnapshot");
109
110         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
111
112         DataTree store = InMemoryDataTreeFactory.getInstance().create();
113         store.setSchemaContext(SCHEMA_CONTEXT);
114
115         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
116
117         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
118         NormalizedNode<?,?> expected = readStore(store, root);
119
120         NormalizedNodeMessages.Container encode = codec.encode(expected);
121
122         Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(),
123                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
124
125         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
126
127         NormalizedNode<?,?> actual = readStore(shard, root);
128
129         assertEquals("Root node", expected, actual);
130
131         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
132     }
133
134     @Test
135     public void testHelium2VersionApplyStateLegacy() throws Exception {
136
137         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy");
138
139         NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
140
141         ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
142                 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
143
144         shard.underlyingActor().onReceiveCommand(applyState);
145
146         NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
147         assertEquals("Applied state", node, actual);
148
149         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
150     }
151
152     @Test
153     public void testHelium2VersionRecovery() throws Exception {
154
155         DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
156         testStore.setSchemaContext(SCHEMA_CONTEXT);
157
158         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
159
160         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
161
162         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
163                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
164                                 getNormalizedNode().toByteString().toByteArray(),
165                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
166
167         InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " +
168                 "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"));
169
170         // Set up the InMemoryJournal.
171
172         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
173                   new WriteModification(TestModel.OUTER_LIST_PATH,
174                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
175
176         int nListEntries = 16;
177         Set<Integer> listEntryKeys = new HashSet<>();
178         int i = 1;
179
180         // Add some CompositeModificationPayload entries
181         for(; i <= 8; i++) {
182             listEntryKeys.add(Integer.valueOf(i));
183             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
184                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
185             Modification mod = new MergeModification(path,
186                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
187             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
188                     newLegacyPayload(mod)));
189         }
190
191         // Add some CompositeModificationByteStringPayload entries
192         for(; i <= nListEntries; i++) {
193             listEntryKeys.add(Integer.valueOf(i));
194             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
195                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
196             Modification mod = new MergeModification(path,
197                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
198             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
199                     newLegacyByteStringPayload(mod)));
200         }
201
202         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries));
203
204         testRecovery(listEntryKeys);
205     }
206
207     @SuppressWarnings({ "unchecked" })
208     @Test
209     public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
210         new ShardTestKit(getSystem()) {{
211             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
212                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
213                     "testPreLithiumConcurrentThreePhaseCommits");
214
215             waitUntilLeader(shard);
216
217             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
218
219             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
220
221             String transactionID1 = "tx1";
222             MutableCompositeModification modification1 = new MutableCompositeModification();
223             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
224                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
225
226             String transactionID2 = "tx2";
227             MutableCompositeModification modification2 = new MutableCompositeModification();
228             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
229                     TestModel.OUTER_LIST_PATH,
230                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
231                     modification2);
232
233             String transactionID3 = "tx3";
234             MutableCompositeModification modification3 = new MutableCompositeModification();
235             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
236                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
237                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
238                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
239                     modification3);
240
241             long timeoutSec = 5;
242             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
243             final Timeout timeout = new Timeout(duration);
244
245             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
246             // by the ShardTransaction.
247
248             shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
249                     cohort1, modification1, true, false), getRef());
250             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
251                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
252             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
253
254             // Send the CanCommitTransaction message for the first Tx.
255
256             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
257             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
258                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
259             assertEquals("Can commit", true, canCommitReply.getCanCommit());
260
261             // Send the ForwardedReadyTransaction for the next 2 Tx's.
262
263             shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
264                     cohort2, modification2, true, false), getRef());
265             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
266
267             shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
268                     cohort3, modification3, true, false), getRef());
269             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
270
271             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
272             // processed after the first Tx completes.
273
274             Future<Object> canCommitFuture1 = Patterns.ask(shard,
275                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
276
277             Future<Object> canCommitFuture2 = Patterns.ask(shard,
278                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
279
280             // Send the CommitTransaction message for the first Tx. After it completes, it should
281             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
282
283             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
284             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
285
286             // Wait for the next 2 Tx's to complete.
287
288             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
289             final CountDownLatch commitLatch = new CountDownLatch(2);
290
291             class OnFutureComplete extends OnComplete<Object> {
292                 private final Class<?> expRespType;
293
294                 OnFutureComplete(final Class<?> expRespType) {
295                     this.expRespType = expRespType;
296                 }
297
298                 @Override
299                 public void onComplete(final Throwable error, final Object resp) {
300                     if(error != null) {
301                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
302                     } else {
303                         try {
304                             assertEquals("Commit response type", expRespType, resp.getClass());
305                             onSuccess(resp);
306                         } catch (Exception e) {
307                             caughtEx.set(e);
308                         }
309                     }
310                 }
311
312                 void onSuccess(final Object resp) throws Exception {
313                 }
314             }
315
316             class OnCommitFutureComplete extends OnFutureComplete {
317                 OnCommitFutureComplete() {
318                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
319                 }
320
321                 @Override
322                 public void onComplete(final Throwable error, final Object resp) {
323                     super.onComplete(error, resp);
324                     commitLatch.countDown();
325                 }
326             }
327
328             class OnCanCommitFutureComplete extends OnFutureComplete {
329                 private final String transactionID;
330
331                 OnCanCommitFutureComplete(final String transactionID) {
332                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
333                     this.transactionID = transactionID;
334                 }
335
336                 @Override
337                 void onSuccess(final Object resp) throws Exception {
338                     CanCommitTransactionReply canCommitReply =
339                             CanCommitTransactionReply.fromSerializable(resp);
340                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
341
342                     Future<Object> commitFuture = Patterns.ask(shard,
343                             new CommitTransaction(transactionID).toSerializable(), timeout);
344                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
345                 }
346             }
347
348             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
349                     getSystem().dispatcher());
350
351             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
352                     getSystem().dispatcher());
353
354             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
355
356             if(caughtEx.get() != null) {
357                 throw caughtEx.get();
358             }
359
360             assertEquals("Commits complete", true, done);
361
362             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
363             inOrder.verify(cohort1).canCommit();
364             inOrder.verify(cohort1).preCommit();
365             inOrder.verify(cohort1).commit();
366             inOrder.verify(cohort2).canCommit();
367             inOrder.verify(cohort2).preCommit();
368             inOrder.verify(cohort2).commit();
369             inOrder.verify(cohort3).canCommit();
370             inOrder.verify(cohort3).preCommit();
371             inOrder.verify(cohort3).commit();
372
373             // Verify data in the data store.
374
375             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
376             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
377             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
378                     outerList.getValue() instanceof Iterable);
379             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
380             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
381                        entry instanceof MapEntryNode);
382             MapEntryNode mapEntry = (MapEntryNode)entry;
383             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
384                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
385             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
386             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
387
388             verifyLastApplied(shard, 2);
389
390             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
391         }};
392     }
393 }