import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.raft.ReplicatedLogImplEntry;
import org.opendaylight.controller.cluster.raft.Snapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyLogEntries;
-import org.opendaylight.controller.cluster.raft.base.messages.ApplySnapshot;
import org.opendaylight.controller.cluster.raft.base.messages.ApplyState;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
import scala.concurrent.Future;
*/
public class PreLithiumShardTest extends AbstractShardTest {
- private CompositeModificationPayload newLegacyPayload(final Modification... mods) {
+ private static CompositeModificationPayload newLegacyPayload(final Modification... mods) {
MutableCompositeModification compMod = new MutableCompositeModification();
for(Modification mod: mods) {
compMod.addModification(mod);
return new CompositeModificationPayload(compMod.toSerializable());
}
- private CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
+ private static CompositeModificationByteStringPayload newLegacyByteStringPayload(final Modification... mods) {
MutableCompositeModification compMod = new MutableCompositeModification();
for(Modification mod: mods) {
compMod.addModification(mod);
return new CompositeModificationByteStringPayload(compMod.toSerializable());
}
- private ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
+ private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException {
MutableCompositeModification compMod = new MutableCompositeModification();
for(Modification mod: mods) {
compMod.addModification(mod);
NormalizedNodeToNodeCodec codec = new NormalizedNodeToNodeCodec(SCHEMA_CONTEXT);
- DataTree store = InMemoryDataTreeFactory.getInstance().create();
+ DataTree store = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
store.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(store, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
NormalizedNodeMessages.Container encode = codec.encode(expected);
- ApplySnapshot applySnapshot = new ApplySnapshot(Snapshot.create(
- encode.getNormalizedNode().toByteString().toByteArray(),
- Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4));
+ Snapshot snapshot = Snapshot.create(encode.getNormalizedNode().toByteString().toByteArray(),
+ Collections.<ReplicatedLogEntry>emptyList(), 1, 2, 3, 4);
- shard.underlyingActor().onReceiveCommand(applySnapshot);
+ shard.underlyingActor().getRaftActorSnapshotCohort().applySnapshot(snapshot.getState());
NormalizedNode<?,?> actual = readStore(shard, root);
@Test
public void testHelium2VersionApplyStateLegacy() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(),
+ "testHelium2VersionApplyStateLegacy");
- TestActorRef<Shard> shard = TestActorRef.create(getSystem(), newShardProps(), "testHelium2VersionApplyStateLegacy");
+ waitUntilLeader(shard);
- NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+ NormalizedNode<?, ?> node = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
- ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
- newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
+ ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2,
+ newLegacyByteStringPayload(new WriteModification(TestModel.TEST_PATH, node))));
- shard.underlyingActor().onReceiveCommand(applyState);
+ shard.underlyingActor().onReceiveCommand(applyState);
- NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
- assertEquals("Applied state", node, actual);
+ NormalizedNode<?,?> actual = readStore(shard, TestModel.TEST_PATH);
+ assertEquals("Applied state", node, actual);
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }};
}
@Test
public void testHelium2VersionRecovery() throws Exception {
- DataTree testStore = InMemoryDataTreeFactory.getInstance().create();
+ DataTree testStore = InMemoryDataTreeFactory.getInstance().create(TreeType.OPERATIONAL);
testStore.setSchemaContext(SCHEMA_CONTEXT);
writeToStore(testStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
getNormalizedNode().toByteString().toByteArray(),
Collections.<ReplicatedLogEntry>emptyList(), 0, 1, -1, -1));
+ InMemoryJournal.addEntry(shardID.toString(), 0, new String("Dummy data as snapshot sequence number is " +
+ "set to 0 in InMemorySnapshotStore and journal recovery seq number will start from 1"));
+
// Set up the InMemoryJournal.
- InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
+ InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, newLegacyPayload(
new WriteModification(TestModel.OUTER_LIST_PATH,
ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()))));
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
newLegacyPayload(mod)));
}
.nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build();
Modification mod = new MergeModification(path,
ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
- InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1,
+ InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
newLegacyByteStringPayload(mod)));
}
- InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, new ApplyLogEntries(nListEntries));
+ InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, new ApplyLogEntries(nListEntries));
testRecovery(listEntryKeys);
}
// Simulate the ForwardedReadyTransaction message for the first Tx that would be sent
// by the ShardTransaction.
- shard.tell(new ForwardedReadyTransaction(transactionID1, HELIUM_2_VERSION,
- cohort1, modification1, true, false), getRef());
+ shard.tell(prepareForwardedReadyTransaction(cohort1, transactionID1, HELIUM_2_VERSION, false), getRef());
ReadyTransactionReply readyReply = ReadyTransactionReply.fromSerializable(
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS));
assertEquals("Cohort path", shard.path().toString(), readyReply.getCohortPath());
// Send the ForwardedReadyTransaction for the next 2 Tx's.
- shard.tell(new ForwardedReadyTransaction(transactionID2, HELIUM_2_VERSION,
- cohort2, modification2, true, false), getRef());
+ shard.tell(prepareForwardedReadyTransaction(cohort2, transactionID2, HELIUM_2_VERSION, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
- shard.tell(new ForwardedReadyTransaction(transactionID3, HELIUM_2_VERSION,
- cohort3, modification3, true, false), getRef());
+ shard.tell(prepareForwardedReadyTransaction(cohort3, transactionID3, HELIUM_2_VERSION, false), getRef());
expectMsgClass(duration, ReadyTransactionReply.SERIALIZABLE_CLASS);
// Send the CanCommitTransaction message for the next 2 Tx's. These should get queued and