5fb02619e9b7e859e4fabc0d678d848aa79d82ae
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / test / java / org / opendaylight / controller / cluster / datastore / compat / PreLithiumShardTest.java
1 /*
2  * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8 package org.opendaylight.controller.cluster.datastore.compat;
9
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Mockito.inOrder;
14 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.Patterns;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import java.util.Collections;
24 import java.util.HashSet;
25 import java.util.Set;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.atomic.AtomicReference;
29 import org.junit.Test;
30 import org.mockito.InOrder;
31 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
32 import org.opendaylight.controller.cluster.datastore.Shard;
33 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
34 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
35 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
36 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
38 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
40 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
42 import org.opendaylight.controller.cluster.datastore.modification.Modification;
43 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
44 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
45 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
46 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
47 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
48 import org.opendaylight.controller.cluster.raft.Snapshot;
49 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
50 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
51 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
52 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
53 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
54 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
55 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
56 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
57 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
58 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
59 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
60 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
61 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
62 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
63 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
64 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
65 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
66 import scala.concurrent.Future;
67 import scala.concurrent.duration.FiniteDuration;
68
69 /**
70  * Unit tests for backwards compatibility with pre-Lithium versions.
71  *
72  * @author Thomas Pantelis
73  */
74 public class PreLithiumShardTest extends AbstractShardTest {
75
76     private static CompositeModificationPayload newLegacyPayload(final Modification... mods) {
77         MutableCompositeModification compMod = new MutableCompositeModification();
78         for(Modification mod: mods) {
79             compMod.addModification(mod);
80         }
81
82         return new CompositeModificationPayload(compMod.toSerializable());
83     }
84
85     private static CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
86         MutableCompositeModification compMod = new MutableCompositeModification();
87         for(Modification mod: mods) {
88             compMod.addModification(mod);
89         }
90
91         return new CompositeModificationByteStringPayload(compMod.toSerializable());
92     }
93
94     @Test
95     public void testApplyHelium2VersionSnapshot() throws Exception {
96         TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
97                 "testApplyHelium2VersionSnapshot");
98
99         NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
100
101         DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
102         store.setSchemaContext(SCHEMA_CONTEXT);
103
104         writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
105
106         YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
107         NormalizedNode<?,?> expected = readStore(store, root);
108
109         NormalizedNodeMessages.Container encode = codec.encode(expected);
110
111         Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(),
112                 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
113
114         shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
115
116         NormalizedNode<?,?> actual = readStore(shard, root);
117
118         assertEquals("Root node", expected, actual);
119
120         shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
121     }
122
123     @Test
124     public void testHelium2VersionApplyStateLegacy() throws Exception {
125         new ShardTestKit(getSystem()) {{
126             TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
127                     "testHelium2VersionApplyStateLegacy");
128
129             waitUntilLeader(shard);
130
131             NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
132
133             ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
134                     newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
135
136             shard.underlyingActor().onReceiveCommand(applyState);
137
138             NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
139             assertEquals("Applied state", node, actual);
140
141             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
142         }};
143     }
144
145     @Test
146     public void testHelium2VersionRecovery() throws Exception {
147
148         DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
149         testStore.setSchemaContext(SCHEMA_CONTEXT);
150
151         writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
152
153         NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
154
155         InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
156                 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
157                                 getNormalizedNode().toByteString().toByteArray(),
158                                 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
159
160         InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " +
161                 "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"));
162
163         // Set up the InMemoryJournal.
164
165         InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
166                   new WriteModification(TestModel.OUTER_LIST_PATH,
167                           ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
168
169         int nListEntries = 16;
170         Set<Integer> listEntryKeys = new HashSet<>();
171         int i = 1;
172
173         // Add some CompositeModificationPayload entries
174         for(; i <= 8; i++) {
175             listEntryKeys.add(Integer.valueOf(i));
176             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
177                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
178             Modification mod = new MergeModification(path,
179                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
180             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
181                     newLegacyPayload(mod)));
182         }
183
184         // Add some CompositeModificationByteStringPayload entries
185         for(; i <= nListEntries; i++) {
186             listEntryKeys.add(Integer.valueOf(i));
187             YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
188                     .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
189             Modification mod = new MergeModification(path,
190                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
191             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
192                     newLegacyByteStringPayload(mod)));
193         }
194
195         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries));
196
197         testRecovery(listEntryKeys);
198     }
199
200     @SuppressWarnings({ "unchecked" })
201     @Test
202     public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
203         new ShardTestKit(getSystem()) {{
204             final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
205                     newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
206                     "testPreLithiumConcurrentThreePhaseCommits");
207
208             waitUntilLeader(shard);
209
210             // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
211
212             ShardDataTree dataStore = shard.underlyingActor().getDataStore();
213
214             String transactionID1 = "tx1";
215             MutableCompositeModification modification1 = new MutableCompositeModification();
216             ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
217                     TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
218
219             String transactionID2 = "tx2";
220             MutableCompositeModification modification2 = new MutableCompositeModification();
221             ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
222                     TestModel.OUTER_LIST_PATH,
223                     ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
224                     modification2);
225
226             String transactionID3 = "tx3";
227             MutableCompositeModification modification3 = new MutableCompositeModification();
228             ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
229                     YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
230                         .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
231                     ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
232                     modification3);
233
234             long timeoutSec = 5;
235             final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
236             final Timeout timeout = new Timeout(duration);
237
238             // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
239             // by the ShardTransaction.
240
241             shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef());
242             ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
243                     expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
244             assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
245
246             // Send the CanCommitTransaction message for the first Tx.
247
248             shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
249             CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
250                     expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
251             assertEquals("Can commit", true, canCommitReply.getCanCommit());
252
253             // Send the ForwardedReadyTransaction for the next 2 Tx's.
254
255             shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef());
256             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
257
258             shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef());
259             expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
260
261             // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
262             // processed after the first Tx completes.
263
264             Future<Object> canCommitFuture1 = Patterns.ask(shard,
265                     new CanCommitTransaction(transactionID2).toSerializable(), timeout);
266
267             Future<Object> canCommitFuture2 = Patterns.ask(shard,
268                     new CanCommitTransaction(transactionID3).toSerializable(), timeout);
269
270             // Send the CommitTransaction message for the first Tx. After it completes, it should
271             // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
272
273             shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
274             expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
275
276             // Wait for the next 2 Tx's to complete.
277
278             final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
279             final CountDownLatch commitLatch = new CountDownLatch(2);
280
281             class OnFutureComplete extends OnComplete<Object> {
282                 private final Class<?> expRespType;
283
284                 OnFutureComplete(final Class<?> expRespType) {
285                     this.expRespType = expRespType;
286                 }
287
288                 @Override
289                 public void onComplete(final Throwable error, final Object resp) {
290                     if(error != null) {
291                         caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
292                     } else {
293                         try {
294                             assertEquals("Commit response type", expRespType, resp.getClass());
295                             onSuccess(resp);
296                         } catch (Exception e) {
297                             caughtEx.set(e);
298                         }
299                     }
300                 }
301
302                 void onSuccess(final Object resp) throws Exception {
303                 }
304             }
305
306             class OnCommitFutureComplete extends OnFutureComplete {
307                 OnCommitFutureComplete() {
308                     super(CommitTransactionReply.SERIALIZABLE_CLASS);
309                 }
310
311                 @Override
312                 public void onComplete(final Throwable error, final Object resp) {
313                     super.onComplete(error, resp);
314                     commitLatch.countDown();
315                 }
316             }
317
318             class OnCanCommitFutureComplete extends OnFutureComplete {
319                 private final String transactionID;
320
321                 OnCanCommitFutureComplete(final String transactionID) {
322                     super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
323                     this.transactionID = transactionID;
324                 }
325
326                 @Override
327                 void onSuccess(final Object resp) throws Exception {
328                     CanCommitTransactionReply canCommitReply =
329                             CanCommitTransactionReply.fromSerializable(resp);
330                     assertEquals("Can commit", true, canCommitReply.getCanCommit());
331
332                     Future<Object> commitFuture = Patterns.ask(shard,
333                             new CommitTransaction(transactionID).toSerializable(), timeout);
334                     commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
335                 }
336             }
337
338             canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
339                     getSystem().dispatcher());
340
341             canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
342                     getSystem().dispatcher());
343
344             boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
345
346             if(caughtEx.get() != null) {
347                 throw caughtEx.get();
348             }
349
350             assertEquals("Commits complete", true, done);
351
352             InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
353             inOrder.verify(cohort1).canCommit();
354             inOrder.verify(cohort1).preCommit();
355             inOrder.verify(cohort1).commit();
356             inOrder.verify(cohort2).canCommit();
357             inOrder.verify(cohort2).preCommit();
358             inOrder.verify(cohort2).commit();
359             inOrder.verify(cohort3).canCommit();
360             inOrder.verify(cohort3).preCommit();
361             inOrder.verify(cohort3).commit();
362
363             // Verify data in the data store.
364
365             NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
366             assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
367             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
368                     outerList.getValue() instanceof Iterable);
369             Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
370             assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
371                        entry instanceof MapEntryNode);
372             MapEntryNode mapEntry = (MapEntryNode)entry;
373             Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
374                     mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
375             assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
376             assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
377
378             verifyLastApplied(shard, 2);
379
380             shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
381         }};
382     }
383 }