2 * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.controller.cluster.datastore.compat;
10 import static org.junit.Assert.assertEquals;
11 import static org.junit.Assert.assertNotNull;
12 import static org.junit.Assert.assertTrue;
13 import static org.mockito.Mockito.inOrder;
14 import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION;
15 import akka.actor.ActorRef;
16 import akka.actor.PoisonPill;
17 import akka.dispatch.Dispatchers;
18 import akka.dispatch.OnComplete;
19 import akka.pattern.Patterns;
20 import akka.testkit.TestActorRef;
21 import akka.util.Timeout;
22 import com.google.common.base.Optional;
23 import java.io.IOException;
24 import java.util.Collections;
25 import java.util.HashSet;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.TimeUnit;
29 import java.util.concurrent.atomic.AtomicReference;
30 import org.junit.Test;
31 import org.mockito.InOrder;
32 import org.opendaylight.controller.cluster.datastore.AbstractShardTest;
33 import org.opendaylight.controller.cluster.datastore.Shard;
34 import org.opendaylight.controller.cluster.datastore.ShardDataTree;
35 import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort;
36 import org.opendaylight.controller.cluster.datastore.ShardTestKit;
37 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
38 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
39 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
40 import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
41 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
42 import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
43 import org.opendaylight.controller.cluster.datastore.modification.Modification;
44 import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
45 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
46 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
47 import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
48 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
49 import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
50 import org.opendaylight.controller.cluster.raft.Snapshot;
51 import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
52 import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
53 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
54 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
55 import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
56 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
57 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
58 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
59 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
60 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
61 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
62 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
63 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
64 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
65 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
66 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
67 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
68 import scala.concurrent.Future;
69 import scala.concurrent.duration.FiniteDuration;
72 * Unit tests for backwards compatibility with pre-Lithium versions.
74 * @author Thomas Pantelis
76 public class PreLithiumShardTest extends AbstractShardTest {
78 private static CompositeModificationPayload newLegacyPayload(final Modification... mods) {
79 MutableCompositeModification compMod = new MutableCompositeModification();
80 for(Modification mod: mods) {
81 compMod.addModification(mod);
84 return new CompositeModificationPayload(compMod.toSerializable());
87 private static CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
88 MutableCompositeModification compMod = new MutableCompositeModification();
89 for(Modification mod: mods) {
90 compMod.addModification(mod);
93 return new CompositeModificationByteStringPayload(compMod.toSerializable());
96 private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
97 MutableCompositeModification compMod = new MutableCompositeModification();
98 for(Modification mod: mods) {
99 compMod.addModification(mod);
102 return new ModificationPayload(compMod);
106 public void testApplyHelium2VersionSnapshot() throws Exception {
107 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
108 "testApplyHelium2VersionSnapshot");
110 NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
112 DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
113 store.setSchemaContext(SCHEMA_CONTEXT);
115 writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
117 YangInstanceIdentifier root = YangInstanceIdentifier.builder().build();
118 NormalizedNode<?,?> expected = readStore(store, root);
120 NormalizedNodeMessages.Container encode = codec.encode(expected);
122 Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(),
123 Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
125 shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
127 NormalizedNode<?,?> actual = readStore(shard, root);
129 assertEquals("Root node", expected, actual);
131 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
135 public void testHelium2VersionApplyStateLegacy() throws Exception {
136 new ShardTestKit(getSystem()) {{
137 TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
138 "testHelium2VersionApplyStateLegacy");
140 waitUntilLeader(shard);
142 NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
144 ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
145 newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
147 shard.underlyingActor().onReceiveCommand(applyState);
149 NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
150 assertEquals("Applied state", node, actual);
152 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
157 public void testHelium2VersionRecovery() throws Exception {
159 DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
160 testStore.setSchemaContext(SCHEMA_CONTEXT);
162 writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
164 NormalizedNode<?, ?> root = readStore(testStore, YangInstanceIdentifier.builder().build());
166 InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create(
167 new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root).
168 getNormalizedNode().toByteString().toByteArray(),
169 Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
171 InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " +
172 "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"));
174 // Set up the InMemoryJournal.
176 InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
177 new WriteModification(TestModel.OUTER_LIST_PATH,
178 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
180 int nListEntries = 16;
181 Set<Integer> listEntryKeys = new HashSet<>();
184 // Add some CompositeModificationPayload entries
186 listEntryKeys.add(Integer.valueOf(i));
187 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
188 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
189 Modification mod = new MergeModification(path,
190 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
191 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
192 newLegacyPayload(mod)));
195 // Add some CompositeModificationByteStringPayload entries
196 for(; i <= nListEntries; i++) {
197 listEntryKeys.add(Integer.valueOf(i));
198 YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
199 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
200 Modification mod = new MergeModification(path,
201 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
202 InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
203 newLegacyByteStringPayload(mod)));
206 InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries));
208 testRecovery(listEntryKeys);
211 @SuppressWarnings({ "unchecked" })
213 public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable {
214 new ShardTestKit(getSystem()) {{
215 final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
216 newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
217 "testPreLithiumConcurrentThreePhaseCommits");
219 waitUntilLeader(shard);
221 // Setup 3 simulated transactions with mock cohorts backed by real cohorts.
223 ShardDataTree dataStore = shard.underlyingActor().getDataStore();
225 String transactionID1 = "tx1";
226 MutableCompositeModification modification1 = new MutableCompositeModification();
227 ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore,
228 TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1);
230 String transactionID2 = "tx2";
231 MutableCompositeModification modification2 = new MutableCompositeModification();
232 ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore,
233 TestModel.OUTER_LIST_PATH,
234 ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(),
237 String transactionID3 = "tx3";
238 MutableCompositeModification modification3 = new MutableCompositeModification();
239 ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore,
240 YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH)
241 .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(),
242 ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1),
246 final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS);
247 final Timeout timeout = new Timeout(duration);
249 // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
250 // by the ShardTransaction.
252 shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef());
253 ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
254 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
255 assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
257 // Send the CanCommitTransaction message for the first Tx.
259 shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef());
260 CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
261 expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS));
262 assertEquals("Can commit", true, canCommitReply.getCanCommit());
264 // Send the ForwardedReadyTransaction for the next 2 Tx's.
266 shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef());
267 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
269 shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef());
270 expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
272 // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and
273 // processed after the first Tx completes.
275 Future<Object> canCommitFuture1 = Patterns.ask(shard,
276 new CanCommitTransaction(transactionID2).toSerializable(), timeout);
278 Future<Object> canCommitFuture2 = Patterns.ask(shard,
279 new CanCommitTransaction(transactionID3).toSerializable(), timeout);
281 // Send the CommitTransaction message for the first Tx. After it completes, it should
282 // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd.
284 shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef());
285 expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS);
287 // Wait for the next 2 Tx's to complete.
289 final AtomicReference<Throwable> caughtEx = new AtomicReference<>();
290 final CountDownLatch commitLatch = new CountDownLatch(2);
292 class OnFutureComplete extends OnComplete<Object> {
293 private final Class<?> expRespType;
295 OnFutureComplete(final Class<?> expRespType) {
296 this.expRespType = expRespType;
300 public void onComplete(final Throwable error, final Object resp) {
302 caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error));
305 assertEquals("Commit response type", expRespType, resp.getClass());
307 } catch (Exception e) {
313 void onSuccess(final Object resp) throws Exception {
317 class OnCommitFutureComplete extends OnFutureComplete {
318 OnCommitFutureComplete() {
319 super(CommitTransactionReply.SERIALIZABLE_CLASS);
323 public void onComplete(final Throwable error, final Object resp) {
324 super.onComplete(error, resp);
325 commitLatch.countDown();
329 class OnCanCommitFutureComplete extends OnFutureComplete {
330 private final String transactionID;
332 OnCanCommitFutureComplete(final String transactionID) {
333 super(CanCommitTransactionReply.SERIALIZABLE_CLASS);
334 this.transactionID = transactionID;
338 void onSuccess(final Object resp) throws Exception {
339 CanCommitTransactionReply canCommitReply =
340 CanCommitTransactionReply.fromSerializable(resp);
341 assertEquals("Can commit", true, canCommitReply.getCanCommit());
343 Future<Object> commitFuture = Patterns.ask(shard,
344 new CommitTransaction(transactionID).toSerializable(), timeout);
345 commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher());
349 canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2),
350 getSystem().dispatcher());
352 canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3),
353 getSystem().dispatcher());
355 boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
357 if(caughtEx.get() != null) {
358 throw caughtEx.get();
361 assertEquals("Commits complete", true, done);
363 InOrder inOrder = inOrder(cohort1, cohort2, cohort3);
364 inOrder.verify(cohort1).canCommit();
365 inOrder.verify(cohort1).preCommit();
366 inOrder.verify(cohort1).commit();
367 inOrder.verify(cohort2).canCommit();
368 inOrder.verify(cohort2).preCommit();
369 inOrder.verify(cohort2).commit();
370 inOrder.verify(cohort3).canCommit();
371 inOrder.verify(cohort3).preCommit();
372 inOrder.verify(cohort3).commit();
374 // Verify data in the data store.
376 NormalizedNode<?, ?> outerList = readStore(shard, TestModel.OUTER_LIST_PATH);
377 assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList);
378 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable",
379 outerList.getValue() instanceof Iterable);
380 Object entry = ((Iterable<Object>)outerList.getValue()).iterator().next();
381 assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode",
382 entry instanceof MapEntryNode);
383 MapEntryNode mapEntry = (MapEntryNode)entry;
384 Optional<DataContainerChild<? extends PathArgument, ?>> idLeaf =
385 mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME));
386 assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent());
387 assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue());
389 verifyLastApplied(shard, 2);
391 shard.tell(PoisonPill.getInstance(), ActorRef.noSender());