/* * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore.compat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.inOrder; import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.HELIUM_2_VERSION; import akka.actor.ActorRef; import akka.actor.PoisonPill; import akka.dispatch.Dispatchers; import akka.dispatch.OnComplete; import akka.pattern.Patterns; import akka.testkit.TestActorRef; import akka.util.Timeout; import com.google.common.base.Optional; import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.modification.MergeModification; import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry; import org.opendaylight.controller.cluster.raft.Snapshot; import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries; import org.opendaylight.controller.cluster.raft.base.messages.ApplyState; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; /** * Unit tests for backwards compatibility with pre-Lithium versions. * * @author Thomas Pantelis */ public class PreLithiumShardTest extends AbstractShardTest { private static CompositeModificationPayload newLegacyPayload(final Modification... mods) { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); } return new CompositeModificationPayload(compMod.toSerializable()); } private static CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); } return new CompositeModificationByteStringPayload(compMod.toSerializable()); } private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); } return new ModificationPayload(compMod); } @Test public void testApplyHelium2VersionSnapshot() throws Exception { TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyHelium2VersionSnapshot"); NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT); DataTree store = InMemoryDataTreeFactory.getInstance().create(); store.setSchemaContext(SCHEMA_CONTEXT); writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); YangInstanceIdentifier root = YangInstanceIdentifier.builder().build(); NormalizedNode expected = readStore(store, root); NormalizedNodeMessages.Container encode = codec.encode(expected); Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(), Collections.emptyList(), 1, 2, 3, 4); shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState()); NormalizedNode actual = readStore(shard, root); assertEquals("Root node", expected, actual); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } @Test public void testHelium2VersionApplyStateLegacy() throws Exception { new ShardTestKit(getSystem()) {{ TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy"); waitUntilLeader(shard); NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node)))); shard.underlyingActor().onReceiveCommand(applyState); NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); assertEquals("Applied state", node, actual); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } @Test public void testHelium2VersionRecovery() throws Exception { DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); testStore.setSchemaContext(SCHEMA_CONTEXT); writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); NormalizedNode root = readStore(testStore, YangInstanceIdentifier.builder().build()); InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT).encode(root). getNormalizedNode().toByteString().toByteArray(), Collections.emptyList(), 0, 1, -1, -1)); InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " + "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1")); // Set up the InMemoryJournal. InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload( new WriteModification(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build())))); int nListEntries = 16; Set listEntryKeys = new HashSet<>(); int i = 1; // Add some CompositeModificationPayload entries for(; i <= 8; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyPayload(mod))); } // Add some CompositeModificationByteStringPayload entries for(; i <= nListEntries; i++) { listEntryKeys.add(Integer.valueOf(i)); YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); Modification mod = new MergeModification(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, newLegacyByteStringPayload(mod))); } InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries)); testRecovery(listEntryKeys); } @SuppressWarnings({ "unchecked" }) @Test public void testPreLithiumConcurrentThreePhaseCommits() throws Throwable { new ShardTestKit(getSystem()) {{ final TestActorRef shard = TestActorRef.create(getSystem(), newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()), "testPreLithiumConcurrentThreePhaseCommits"); waitUntilLeader(shard); // Setup 3 simulated transactions with mock cohorts backed by real cohorts. ShardDataTree dataStore = shard.underlyingActor().getDataStore(); String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), modification3); long timeoutSec = 5; final FiniteDuration duration = FiniteDuration.create(timeoutSec, TimeUnit.SECONDS); final Timeout timeout = new Timeout(duration); // Simulate the ForwardedReadyTransaction message for the first Tx that would be sent // by the ShardTransaction. shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION, cohort1, modification1, true, false), getRef()); ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable( expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath()); // Send the CanCommitTransaction message for the first Tx. shard.tell(new CanCommitTransaction(transactionID1).toSerializable(), getRef()); CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable( expectMsgClass(duration, CanCommitTransactionReply.SERIALIZABLE_CLASS)); assertEquals("Can commit", true, canCommitReply.getCanCommit()); // Send the ForwardedReadyTransaction for the next 2 Tx's. shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION, cohort2, modification2, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION, cohort3, modification3, true, false), getRef()); expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS); // Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and // processed after the first Tx completes. Future canCommitFuture1 = Patterns.ask(shard, new CanCommitTransaction(transactionID2).toSerializable(), timeout); Future canCommitFuture2 = Patterns.ask(shard, new CanCommitTransaction(transactionID3).toSerializable(), timeout); // Send the CommitTransaction message for the first Tx. After it completes, it should // trigger the 2nd Tx to proceed which should in turn then trigger the 3rd. shard.tell(new CommitTransaction(transactionID1).toSerializable(), getRef()); expectMsgClass(duration, CommitTransactionReply.SERIALIZABLE_CLASS); // Wait for the next 2 Tx's to complete. final AtomicReference caughtEx = new AtomicReference<>(); final CountDownLatch commitLatch = new CountDownLatch(2); class OnFutureComplete extends OnComplete { private final Class expRespType; OnFutureComplete(final Class expRespType) { this.expRespType = expRespType; } @Override public void onComplete(final Throwable error, final Object resp) { if(error != null) { caughtEx.set(new AssertionError(getClass().getSimpleName() + " failure", error)); } else { try { assertEquals("Commit response type", expRespType, resp.getClass()); onSuccess(resp); } catch (Exception e) { caughtEx.set(e); } } } void onSuccess(final Object resp) throws Exception { } } class OnCommitFutureComplete extends OnFutureComplete { OnCommitFutureComplete() { super(CommitTransactionReply.SERIALIZABLE_CLASS); } @Override public void onComplete(final Throwable error, final Object resp) { super.onComplete(error, resp); commitLatch.countDown(); } } class OnCanCommitFutureComplete extends OnFutureComplete { private final String transactionID; OnCanCommitFutureComplete(final String transactionID) { super(CanCommitTransactionReply.SERIALIZABLE_CLASS); this.transactionID = transactionID; } @Override void onSuccess(final Object resp) throws Exception { CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(resp); assertEquals("Can commit", true, canCommitReply.getCanCommit()); Future commitFuture = Patterns.ask(shard, new CommitTransaction(transactionID).toSerializable(), timeout); commitFuture.onComplete(new OnCommitFutureComplete(), getSystem().dispatcher()); } } canCommitFuture1.onComplete(new OnCanCommitFutureComplete(transactionID2), getSystem().dispatcher()); canCommitFuture2.onComplete(new OnCanCommitFutureComplete(transactionID3), getSystem().dispatcher()); boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS); if(caughtEx.get() != null) { throw caughtEx.get(); } assertEquals("Commits complete", true, done); InOrder inOrder = inOrder(cohort1, cohort2, cohort3); inOrder.verify(cohort1).canCommit(); inOrder.verify(cohort1).preCommit(); inOrder.verify(cohort1).commit(); inOrder.verify(cohort2).canCommit(); inOrder.verify(cohort2).preCommit(); inOrder.verify(cohort2).commit(); inOrder.verify(cohort3).canCommit(); inOrder.verify(cohort3).preCommit(); inOrder.verify(cohort3).commit(); // Verify data in the data store. NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", outerList.getValue() instanceof Iterable); Object entry = ((Iterable)outerList.getValue()).iterator().next(); assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " entry is not MapEntryNode", entry instanceof MapEntryNode); MapEntryNode mapEntry = (MapEntryNode)entry; Optional> idLeaf = mapEntry.getChild(new YangInstanceIdentifier.NodeIdentifier(TestModel.ID_QNAME)); assertTrue("Missing leaf " + TestModel.ID_QNAME.getLocalName(), idLeaf.isPresent()); assertEquals(TestModel.ID_QNAME.getLocalName() + " value", 1, idLeaf.get().getValue()); verifyLastApplied(shard, 2); shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); }}; } }