CDS: add backend support for local transactions 72/19072/7
authorRobert Varga <rovarga@cisco.com>
Sat, 25 Apr 2015 08:44:13 +0000 (10:44 +0200)
committerTony Tkacik <ttkacik@cisco.com>
Tue, 12 May 2015 08:30:27 +0000 (08:30 +0000)
Add the message for submitting a DataTreeModification from the frontend
and the logic to apply it to the Shard's DataTree instance. This is
similar to BatchedModifications, except is is always considered ready.

The ReadyLocalTransaction is not directly serializable, but we create a
new serializer, which turns it into BatchedModifications when
serializing to byte stream.

Change-Id: I23a39c7b5997b48a355af73287d19bbf3ab0ae20
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/resources/application.conf
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java [new file with mode: 0644]

index 2fd51de8b674be89a065f28d5f2cacd16ec101fc..148fa1881b836252417cdccd92ea5440950f82c1 100644 (file)
@@ -46,6 +46,7 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
@@ -228,6 +229,8 @@ public class Shard extends RaftActor {
             } else if (message instanceof ForwardedReadyTransaction) {
                 commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
                         getSender(), this);
+            } else if (message instanceof ReadyLocalTransaction) {
+                handleReadyLocalTransaction((ReadyLocalTransaction)message);
             } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
                 handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
             } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
@@ -393,6 +396,15 @@ public class Shard extends RaftActor {
         commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
     }
 
+    private void noLeaderError(Object message) {
+        // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+        // it more resilient in case we're in the process of electing a new leader.
+        getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+            "Could not find the leader for shard %s. This typically happens" +
+            " when the system is coming up or recovering and a leader is being elected. Try again" +
+            " later.", persistenceId()))), getSelf());
+    }
+
     private void handleBatchedModifications(BatchedModifications batched) {
         // This message is sent to prepare the modificationsa transaction directly on the Shard as an
         // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
@@ -423,12 +435,27 @@ public class Shard extends RaftActor {
                 LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
                 leader.forward(batched, getContext());
             } else {
-                // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
-                // it more resilient in case we're in the process of electing a new leader.
-                getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
-                    "Could not find the leader for shard %s. This typically happens" +
-                    " when the system is coming up or recovering and a leader is being elected. Try again" +
-                    " later.", persistenceId()))), getSelf());
+                noLeaderError(batched);
+            }
+        }
+    }
+
+    private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
+        if (isLeader()) {
+            try {
+                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
+            } catch (Exception e) {
+                LOG.error("{}: Error handling LocalModifications for Tx {}", persistenceId(),
+                        message.getTransactionID(), e);
+                getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+            }
+        } else {
+            ActorSelection leader = getLeader();
+            if (leader != null) {
+                LOG.debug("{}: Forwarding LocalModifications to leader {}", persistenceId(), leader);
+                leader.forward(message, getContext());
+            } else {
+                noLeaderError(message);
             }
         }
     }
index 30947fa6662b4a56d5b091cfe3133d019c9f9a24..97816a55ccb5920e289eed12d665de8ca6a2ce35 100644 (file)
@@ -26,6 +26,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
@@ -194,6 +195,30 @@ public class ShardCommitCoordinator {
         return batched.isReady();
     }
 
+    /**
+     * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
+     * been prepared beforehand by the sender and we just need to drive them through into the dataTree.
+     *
+     * @param message
+     * @param sender
+     * @param shard
+     */
+    void handleReadyLocalTransaction(ReadyLocalTransaction message, ActorRef sender, Shard shard) {
+        final ShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(dataTree, message.getModification());
+        final CohortEntry cohortEntry = new CohortEntry(message.getTransactionID(), cohort);
+        cohortCache.put(message.getTransactionID(), cohortEntry);
+        cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
+        log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionID());
+
+        if (message.isDoCommitOnReady()) {
+            cohortEntry.setReplySender(sender);
+            cohortEntry.setShard(shard);
+            handleCanCommit(cohortEntry);
+        } else {
+            sender.tell(readyTransactionReply(shard), shard.self());
+        }
+    }
+
     private void handleCanCommit(CohortEntry cohortEntry) {
         String transactionID = cohortEntry.getTransactionID();
 
@@ -431,6 +456,12 @@ public class ShardCommitCoordinator {
             this.transaction = null;
         }
 
+        CohortEntry(String transactionID, ShardDataTreeCohort cohort) {
+            this.transactionID = transactionID;
+            this.cohort = cohort;
+            this.transaction = null;
+        }
+
         void updateLastAccessTime() {
             lastAccessTime = System.currentTimeMillis();
         }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransaction.java
new file mode 100644 (file)
index 0000000..6d952f9
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+
+/**
+ * Message notifying the shard leader to apply modifications which have been
+ * prepared locally against its DataTree. This message is not directly serializable,
+ * simply because the leader and sender need to be on the same system.
+ */
+public final class ReadyLocalTransaction {
+    private final DataTreeModification modification;
+    private final String transactionID;
+    private final boolean doCommitOnReady;
+
+    public ReadyLocalTransaction(final String transactionID, DataTreeModification modification, boolean doCommitOnReady) {
+        this.transactionID = Preconditions.checkNotNull(transactionID);
+        this.modification = Preconditions.checkNotNull(modification);
+        this.doCommitOnReady = doCommitOnReady;
+    }
+
+    public String getTransactionID() {
+        return transactionID;
+    }
+
+    public DataTreeModification getModification() {
+        return modification;
+    }
+
+    public boolean isDoCommitOnReady() {
+        return doCommitOnReady;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializer.java
new file mode 100644 (file)
index 0000000..237a5c6
--- /dev/null
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.serialization.JSerializer;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import javax.annotation.Nonnull;
+import org.apache.commons.lang3.SerializationUtils;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor;
+
+/**
+ * Specialized message transformer, which transforms a {@link ReadyLocalTransaction}
+ * into a {@link BatchedModifications} message. This serializer needs to be plugged
+ * into akka serialization to allow forwarding of ReadyLocalTransaction to remote
+ * shards.
+ */
+public final class ReadyLocalTransactionSerializer extends JSerializer {
+    @Override
+    public int identifier() {
+        return 97439437;
+    }
+
+    @Override
+    public boolean includeManifest() {
+        return false;
+    }
+
+    @Override
+    public byte[] toBinary(final Object obj) {
+        Preconditions.checkArgument(obj instanceof ReadyLocalTransaction, "Unsupported object type %s", obj.getClass());
+        final ReadyLocalTransaction msg = (ReadyLocalTransaction) obj;
+        final BatchedModifications batched = new BatchedModifications(msg.getTransactionID(),
+                DataStoreVersions.CURRENT_VERSION, "");
+        batched.setDoCommitOnReady(msg.isDoCommitOnReady());
+        batched.setReady(true);
+
+        msg.getModification().applyToCursor(new BatchedCursor(batched));
+
+        return SerializationUtils.serialize(batched);
+    }
+
+    @Override
+    public Object fromBinaryJava(final byte[] bytes, final Class<?> clazz) {
+        return SerializationUtils.deserialize(bytes);
+    }
+
+    private static final class BatchedCursor implements DataTreeModificationCursor {
+        private final Deque<YangInstanceIdentifier> stack = new ArrayDeque<>();
+        private final BatchedModifications message;
+
+        BatchedCursor(final BatchedModifications message) {
+            this.message = Preconditions.checkNotNull(message);
+            stack.push(YangInstanceIdentifier.EMPTY);
+        }
+
+        @Override
+        public void delete(final PathArgument child) {
+            message.addModification(new DeleteModification(stack.peek().node(child)));
+        }
+
+        @Override
+        public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
+            message.addModification(new MergeModification(stack.peek().node(child), data));
+        }
+
+        @Override
+        public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
+            message.addModification(new WriteModification(stack.peek().node(child), data));
+        }
+
+        @Override
+        public void enter(@Nonnull final PathArgument child) {
+            stack.push(stack.peek().node(child));
+        }
+
+        @Override
+        public void enter(@Nonnull final PathArgument... path) {
+            for (PathArgument arg : path) {
+                enter(arg);
+            }
+        }
+
+        @Override
+        public void enter(@Nonnull final Iterable<PathArgument> path) {
+            for (PathArgument arg : path) {
+                enter(arg);
+            }
+        }
+
+        @Override
+        public void exit() {
+            stack.pop();
+        }
+
+        @Override
+        public void exit(final int depth) {
+            Preconditions.checkArgument(depth < stack.size(), "Stack holds only %s elements, cannot exit %s levels", stack.size(), depth);
+            for (int i = 0; i < depth; ++i) {
+                stack.pop();
+            }
+        }
+
+        @Override
+        public Optional<NormalizedNode<?, ?>> readNode(@Nonnull final PathArgument child) {
+            throw new UnsupportedOperationException("Not implemented");
+        }
+
+        @Override
+        public void close() {
+            // No-op
+        }
+    }
+}
index 6be6cda5d3d741de221190e55fcf71d042e4c24a..83bb12d80634abb1bab006c5b803277f09dcc10a 100644 (file)
@@ -20,11 +20,12 @@ odl-cluster-data {
       serializers {
                 java = "akka.serialization.JavaSerializer"
                 proto = "akka.remote.serialization.ProtobufSerializer"
+                readylocal = "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransactionSerializer"
               }
 
               serialization-bindings {
                   "com.google.protobuf.Message" = proto
-
+                  "org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction" = readylocal
               }
     }
     remote {
index 06d9f360aae23ab8edb0d99e5529ae6457e1d3b7..40227e056bfff82351c8e19eafb237e93f4d7580 100644 (file)
@@ -54,6 +54,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTran
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
+import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
@@ -99,6 +100,7 @@ import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgum
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
 import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
@@ -1070,6 +1072,82 @@ public class ShardTest extends AbstractShardTest {
         }};
     }
 
+    @Test
+    public void testReadyLocalTransactionWithImmediateCommit() throws Exception{
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testReadyLocalTransactionWithImmediateCommit");
+
+            waitUntilLeader(shard);
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+
+            ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+            MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+            new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+            String txId = "tx1";
+            ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+
+            shard.tell(readyMessage, getRef());
+
+            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+            assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
+    @Test
+    public void testReadyLocalTransactionWithThreePhaseCommit() throws Exception{
+        new ShardTestKit(getSystem()) {{
+            final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
+                    newShardProps().withDispatcher(Dispatchers.DefaultDispatcherId()),
+                    "testReadyLocalTransactionWithThreePhaseCommit");
+
+            waitUntilLeader(shard);
+
+            ShardDataTree dataStore = shard.underlyingActor().getDataStore();
+
+            DataTreeModification modification = dataStore.getDataTree().takeSnapshot().newModification();
+
+            ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+            new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+            MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+            new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+            String txId = "tx1";
+            ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, false);
+
+            shard.tell(readyMessage, getRef());
+
+            expectMsgClass(ReadyTransactionReply.class);
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CanCommitTransaction(txId).toSerializable(), getRef());
+            CanCommitTransactionReply canCommitReply = CanCommitTransactionReply.fromSerializable(
+                    expectMsgClass(CanCommitTransactionReply.SERIALIZABLE_CLASS));
+            assertEquals("Can commit", true, canCommitReply.getCanCommit());
+
+            // Send the CanCommitTransaction message.
+
+            shard.tell(new CommitTransaction(txId).toSerializable(), getRef());
+            expectMsgClass(CommitTransactionReply.SERIALIZABLE_CLASS);
+
+            NormalizedNode<?, ?> actualNode = readStore(shard, TestModel.OUTER_LIST_PATH);
+            assertEquals(TestModel.OUTER_LIST_QNAME.getLocalName(), mergeData, actualNode);
+
+            shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+        }};
+    }
+
     @Test
     public void testCommitWithPersistenceDisabled() throws Throwable {
         dataStoreContextBuilder.persistent(false);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/messages/ReadyLocalTransactionSerializerTest.java
new file mode 100644 (file)
index 0000000..cc8c856
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import java.util.List;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.DataStoreVersions;
+import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
+import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
+import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+
+/**
+ * Unit tests for ReadyLocalTransactionSerializer.
+ *
+ * @author Thomas Pantelis
+ */
+public class ReadyLocalTransactionSerializerTest {
+
+    @Test
+    public void testToAndFromBinary() {
+        TipProducingDataTree dataTree = InMemoryDataTreeFactory.getInstance().create();
+        dataTree.setSchemaContext(TestModel.createTestContext());
+        DataTreeModification modification = dataTree.takeSnapshot().newModification();
+
+        ContainerNode writeData = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
+        new WriteModification(TestModel.TEST_PATH, writeData).apply(modification);
+        MapNode mergeData = ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build();
+        new MergeModification(TestModel.OUTER_LIST_PATH, mergeData).apply(modification);
+
+        String txId = "tx-id";
+        ReadyLocalTransaction readyMessage = new ReadyLocalTransaction(txId, modification, true);
+
+        ReadyLocalTransactionSerializer serializer = new ReadyLocalTransactionSerializer();
+
+        byte[] bytes = serializer.toBinary(readyMessage);
+
+        Object deserialized = serializer.fromBinary(bytes, ReadyLocalTransaction.class);
+
+        assertNotNull("fromBinary returned null", deserialized);
+        assertEquals("fromBinary return type", BatchedModifications.class, deserialized.getClass());
+        BatchedModifications batched = (BatchedModifications)deserialized;
+        assertEquals("getTransactionID", txId, batched.getTransactionID());
+        assertEquals("getVersion", DataStoreVersions.CURRENT_VERSION, batched.getVersion());
+
+        List<Modification> batchedMods = batched.getModifications();
+        assertEquals("getModifications size", 2, batchedMods.size());
+
+        Modification mod = batchedMods.get(0);
+        assertEquals("Modification type", WriteModification.class, mod.getClass());
+        assertEquals("Modification getPath", TestModel.TEST_PATH, ((WriteModification)mod).getPath());
+        assertEquals("Modification getData", writeData, ((WriteModification)mod).getData());
+
+        mod = batchedMods.get(1);
+        assertEquals("Modification type", MergeModification.class, mod.getClass());
+        assertEquals("Modification getPath", TestModel.OUTER_LIST_PATH, ((MergeModification)mod).getPath());
+        assertEquals("Modification getData", mergeData, ((MergeModification)mod).getData());
+    }
+}