BUG-5280: add CommitTransactionPayload 83/39283/46
authorRobert Varga <rovarga@cisco.com>
Mon, 23 May 2016 18:10:01 +0000 (20:10 +0200)
committerRobert Varga <rovarga@cisco.com>
Tue, 14 Jun 2016 16:42:42 +0000 (18:42 +0200)
This adds the base payload, which is to be used for propagating
transaction effects instead of DataTreeCandidatePayload. Also adds
the abort case counterpart.

Change-Id: I621e0be4f26509fae9f04e230c6a7e145938d7e6
Signed-off-by: Robert Varga <rovarga@cisco.com>
14 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractThreePhaseCommitMessage.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java [new file with mode: 0644]

index 8da70c2..25a7ee8 100644 (file)
@@ -14,14 +14,20 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-final class DataTreeCandidatePayload extends Payload implements Externalizable {
-    private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class);
+/**
+ * @deprecated Deprecated in Boron in favor of CommitTransactionPayload
+ */
+@Deprecated
+final class DataTreeCandidatePayload extends Payload implements DataTreeCandidateSupplier, Externalizable {
     private static final long serialVersionUID = 1L;
 
     private transient byte[] serialized;
@@ -34,6 +40,10 @@ final class DataTreeCandidatePayload extends Payload implements Externalizable {
         this.serialized = Preconditions.checkNotNull(serialized);
     }
 
+    /**
+     * @deprecated Use CommitTransactionPayload instead
+     */
+    @Deprecated
     static DataTreeCandidatePayload create(final DataTreeCandidate candidate) {
         final ByteArrayDataOutput out = ByteStreams.newDataOutput();
         try {
@@ -46,8 +56,10 @@ final class DataTreeCandidatePayload extends Payload implements Externalizable {
     }
 
 
-    DataTreeCandidate getCandidate() throws IOException {
-        return DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized));
+    @Override
+    public Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException {
+        return new SimpleImmutableEntry<>(Optional.empty(),
+                DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized)));
     }
 
     @Override
index 02a1402..77584eb 100644 (file)
@@ -16,6 +16,7 @@ import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -51,6 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
 import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
 import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
 import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
@@ -62,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.RaftState;
 import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
 import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
 import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.concepts.Identifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
@@ -321,16 +325,27 @@ public class Shard extends RaftActor {
 
     void continueCommit(final CohortEntry cohortEntry) {
         final DataTreeCandidate candidate = cohortEntry.getCandidate();
+        final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
 
         // If we do not have any followers and we are not using persistence
         // or if cohortEntry has no modifications
         // we can apply modification to the state immediately
         if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
-            applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
-        } else {
-            persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
-                    DataTreeCandidatePayload.create(candidate));
+            applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+            return;
         }
+
+        final Payload payload;
+        try {
+            payload = CommitTransactionPayload.create(transactionId, candidate);
+        } catch (IOException e) {
+            LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+                e);
+            // TODO: do we need to do something smarter here?
+            throw Throwables.propagate(e);
+        }
+
+        persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {
@@ -666,11 +681,11 @@ public class Shard extends RaftActor {
 
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
-        if (data instanceof DataTreeCandidatePayload) {
+        if (data instanceof DataTreeCandidateSupplier) {
             if (clientActor == null) {
                 // No clientActor indicates a replica coming from the leader
                 try {
-                    store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+                    store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
                 } catch (DataValidationFailedException | IOException e) {
                     LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
                 }
index f3cbc86..624e68d 100644 (file)
@@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
@@ -18,6 +22,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
@@ -62,14 +67,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort {
         Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry");
 
         try {
-            if (payload instanceof DataTreeCandidatePayload) {
-                DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate());
+            if (payload instanceof DataTreeCandidateSupplier) {
+                final Entry<Optional<TransactionIdentifier>, DataTreeCandidate> e =
+                        ((DataTreeCandidateSupplier)payload).getCandidate();
+
+                DataTreeCandidates.applyToModification(transaction, e.getValue());
                 size++;
+
+                if (e.getKey().isPresent()) {
+                    // FIXME: BUG-5280: propagate transaction state
+                }
             } else {
                 log.error("{}: Unknown payload {} received during recovery", shardName, payload);
             }
         } catch (IOException e) {
-            log.error("{}: Error extracting ModificationPayload", shardName, e);
+            log.error("{}: Error extracting payload", shardName, e);
         }
     }
 
index 9068228..e69bc95 100644 (file)
@@ -49,7 +49,6 @@ public abstract class AbstractThreePhaseCommitMessage extends VersionedExternali
 
     @Override
     public String toString() {
-        return getClass().getSimpleName() + " [transactionID=" + transactionID + ", version=" + getVersion()
-                + "]";
+        return getClass().getSimpleName() + " [transactionID=" + transactionID + ", version=" + getVersion() + "]";
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java
new file mode 100644 (file)
index 0000000..5c3723e
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Payload persisted when a transaction is aborted. It contains the transaction identifier.
+ *
+ * @author Robert Varga
+ */
+public final class AbortTransactionPayload extends AbstractIdentifiablePayload<TransactionIdentifier> {
+    private static final class Proxy extends AbstractProxy<TransactionIdentifier> {
+        private static final long serialVersionUID = 1L;
+
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected TransactionIdentifier readIdentifier(final DataInput in) throws IOException {
+            return TransactionIdentifier.readFrom(in);
+        }
+
+        @Override
+        protected AbortTransactionPayload createObject(final TransactionIdentifier identifier,
+                final byte[] serialized) {
+            return new AbortTransactionPayload(identifier, serialized);
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) {
+        super(transactionId, serialized);
+    }
+
+    public static AbortTransactionPayload create(final TransactionIdentifier transactionId) throws IOException {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        transactionId.writeTo(out);
+        return new AbortTransactionPayload(transactionId, out.toByteArray());
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java
new file mode 100644 (file)
index 0000000..122c969
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Verify;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.concepts.Identifier;
+
+/**
+ * Abstract base class for {@link Payload}s which hold a single {@link Identifier}.
+ *
+ * @author Robert Varga
+ */
+public abstract class AbstractIdentifiablePayload<T extends Identifier> extends Payload implements Identifiable<T>, Serializable {
+    protected abstract static class AbstractProxy<T extends Identifier> implements Externalizable {
+        private static final long serialVersionUID = 1L;
+        private byte[] serialized;
+        private T identifier;
+
+        public AbstractProxy() {
+            // For Externalizable
+        }
+
+        protected AbstractProxy(final byte[] serialized) {
+            this.serialized = Preconditions.checkNotNull(serialized);
+        }
+
+        @Override
+        public final void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeInt(serialized.length);
+            out.write(serialized);
+        }
+
+        @Override
+        public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            final int length = in.readInt();
+            serialized = new byte[length];
+            in.readFully(serialized);
+            identifier = Verify.verifyNotNull(readIdentifier(ByteStreams.newDataInput(serialized)));
+        }
+
+        protected final Object readResolve() {
+            return Verify.verifyNotNull(createObject(identifier, serialized));
+        }
+
+        protected abstract @Nonnull T readIdentifier(@Nonnull DataInput in) throws IOException;
+        protected abstract @Nonnull Identifiable<T> createObject(@Nonnull T identifier, @Nonnull byte[] serialized);
+    }
+
+    private static final long serialVersionUID = 1L;
+    private final byte[] serialized;
+    private final T identifier;
+
+    AbstractIdentifiablePayload(final @Nonnull T identifier, final @Nonnull byte[] serialized) {
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.serialized = Preconditions.checkNotNull(serialized);
+    }
+
+    @Override
+    public final T getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public final int size() {
+        return serialized.length;
+    }
+
+    protected final Object writeReplace() {
+        return Verify.verifyNotNull(externalizableProxy(serialized));
+    }
+
+    protected abstract @Nonnull AbstractProxy<T> externalizableProxy(@Nonnull byte[] serialized);
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java
new file mode 100644 (file)
index 0000000..dd27b4e
--- /dev/null
@@ -0,0 +1,96 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Payload persisted when a transaction commits. It contains the transaction identifier and the
+ * {@link DataTreeCandidate}
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class CommitTransactionPayload extends Payload implements DataTreeCandidateSupplier, Serializable {
+    private static final class Proxy implements Externalizable {
+        private static final long serialVersionUID = 1L;
+        private byte[] serialized;
+
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            this.serialized = Preconditions.checkNotNull(serialized);
+        }
+
+        @Override
+        public void writeExternal(final ObjectOutput out) throws IOException {
+            out.writeInt(serialized.length);
+            out.write(serialized);
+        }
+
+        @Override
+        public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+            final int length = in.readInt();
+            serialized = new byte[length];
+            in.readFully(serialized);
+        }
+
+        private Object readResolve() {
+            return new CommitTransactionPayload(serialized);
+        }
+    }
+
+    private static final long serialVersionUID = 1L;
+
+    private final byte[] serialized;
+
+    CommitTransactionPayload(final byte[] serialized) {
+        this.serialized = Preconditions.checkNotNull(serialized);
+    }
+
+    public static CommitTransactionPayload create(final TransactionIdentifier transactionId,
+            final DataTreeCandidate candidate) throws IOException {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput();
+        transactionId.writeTo(out);
+        DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate);
+        return new CommitTransactionPayload(out.toByteArray());
+    }
+
+    @Override
+    public Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException {
+        final DataInput in = ByteStreams.newDataInput(serialized);
+        return new SimpleImmutableEntry<>(Optional.of(TransactionIdentifier.readFrom(in)),
+                DataTreeCandidateInputOutput.readDataTreeCandidate(in));
+    }
+
+    @Override
+    public int size() {
+        return serialized.length;
+    }
+
+    private Object writeReplace() {
+        return new Proxy(serialized);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java
new file mode 100644 (file)
index 0000000..4cd1c4a
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.annotations.Beta;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Interim interface for consolidating DataTreeCandidatePayload and {@link CommitTransactionPayload}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public interface DataTreeCandidateSupplier {
+    Entry<Optional<TransactionIdentifier>, DataTreeCandidate> getCandidate() throws IOException;
+}
index 4209201..b6f464a 100644 (file)
@@ -30,6 +30,7 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -50,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio
 import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.cluster.raft.Snapshot;
@@ -403,11 +405,12 @@ public abstract class AbstractShardTest extends AbstractActorTest{
         return testStore;
     }
 
-    static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException {
+    static CommitTransactionPayload payloadForModification(final DataTree source, final DataTreeModification mod,
+            final TransactionIdentifier transactionId) throws DataValidationFailedException, IOException {
         source.validate(mod);
         final DataTreeCandidate candidate = source.prepare(mod);
         source.commit(candidate);
-        return DataTreeCandidatePayload.create(candidate);
+        return CommitTransactionPayload.create(transactionId, candidate);
     }
 
     static BatchedModifications newBatchedModifications(final TransactionIdentifier transactionID,
index 07ccce2..cce9bdd 100644 (file)
@@ -30,6 +30,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
 
+@Deprecated
 public class DataTreeCandidatePayloadTest {
     static final QName LEAF_SET = QName.create(TestModel.TEST_QNAME, "leaf-set");
 
@@ -118,13 +119,13 @@ public class DataTreeCandidatePayloadTest {
     @Test
     public void testCandidateSerDes() throws IOException {
         final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate());
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
     }
 
     @Test
     public void testPayloadSerDes() throws IOException {
         final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate());
+        assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue());
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -139,7 +140,7 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetEntryPath, leafSetEntryNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate());
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -155,7 +156,7 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate());
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
     }
 
     @SuppressWarnings({ "rawtypes", "unchecked" })
@@ -171,7 +172,7 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate());
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
     }
 
     @Test
@@ -182,6 +183,6 @@ public class DataTreeCandidatePayloadTest {
 
         DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafPath, leafNode);
         DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate);
-        assertCandidateEquals(candidate, payload.getCandidate());
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
     }
 }
index 5cb7409..4a8363c 100644 (file)
@@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.datastore;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import com.google.common.base.Optional;
+import java.io.IOException;
 import org.junit.Before;
 import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel;
@@ -29,7 +31,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFai
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.LoggerFactory;
 
-public class ShardRecoveryCoordinatorTest {
+public class ShardRecoveryCoordinatorTest extends AbstractTest {
 
     private ShardDataTree peopleDataTree;
     private SchemaContext peopleSchemaContext;
@@ -43,6 +45,7 @@ public class ShardRecoveryCoordinatorTest {
         peopleDataTree = new ShardDataTree(peopleSchemaContext, TreeType.OPERATIONAL);
     }
 
+    @Deprecated
     @Test
     public void testAppendRecoveredLogEntryDataTreeCandidatePayload(){
         final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
@@ -57,6 +60,20 @@ public class ShardRecoveryCoordinatorTest {
         coordinator.applyCurrentLogRecoveryBatch();
     }
 
+    @Test
+    public void testAppendRecoveredLogEntryCommitTransactionPayload() throws IOException {
+        final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
+                peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo"));
+        coordinator.startLogRecoveryBatch(10);
+        try {
+            coordinator.appendRecoveredLogEntry(CommitTransactionPayload.create(nextTransactionId(), createCar()));
+        } catch(final SchemaValidationFailedException e){
+            fail("SchemaValidationFailedException should not happen if pruning is done");
+        }
+
+        coordinator.applyCurrentLogRecoveryBatch();
+    }
+
     @Test
     public void testApplyRecoverySnapshot(){
         final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree,
index 2082dbc..a7e7e8f 100644 (file)
@@ -104,7 +104,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
-import org.opendaylight.yangtools.util.StringIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
 import org.opendaylight.yangtools.yang.data.api.schema.MapNode;
@@ -448,8 +447,9 @@ public class ShardTest extends AbstractShardTest {
         writeMod.write(TestModel.TEST_PATH, node);
         writeMod.ready();
 
-        final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"),
-            new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod)));
+        final TransactionIdentifier tx = nextTransactionId();
+        final ApplyState applyState = new ApplyState(null, tx,
+            new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx)));
 
         shard.tell(applyState, shard);
 
@@ -478,7 +478,8 @@ public class ShardTest extends AbstractShardTest {
         InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA);
 
         // Set up the InMemoryJournal.
-        InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod)));
+        InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1,
+            payloadForModification(source, writeMod, nextTransactionId())));
 
         final int nListEntries = 16;
         final Set<Integer> listEntryKeys = new HashSet<>();
@@ -493,8 +494,9 @@ public class ShardTest extends AbstractShardTest {
             final DataTreeModification mod = source.takeSnapshot().newModification();
             mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i));
             mod.ready();
+
             InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1,
-                payloadForModification(source, mod)));
+                payloadForModification(source, mod, nextTransactionId())));
         }
 
         InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2,
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java
new file mode 100644 (file)
index 0000000..c0f63a5
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
+
+public class AbortTransactionPayloadTest extends AbstractTest {
+    @Test
+    public void testPayloadSerDes() throws IOException {
+        final AbortTransactionPayload template = AbortTransactionPayload.create(nextTransactionId());
+        final AbortTransactionPayload cloned = SerializationUtils.clone(template);
+        assertEquals(template.getIdentifier(), cloned.getIdentifier());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java
new file mode 100644 (file)
index 0000000..3f47e28
--- /dev/null
@@ -0,0 +1,188 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.commons.lang3.SerializationUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractTest;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafNode;
+import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates;
+import org.opendaylight.yangtools.yang.data.impl.schema.Builders;
+import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder;
+
+public class CommitTransactionPayloadTest extends AbstractTest {
+    static final QName LEAF_SET = QName.create(TestModel.TEST_QNAME, "leaf-set");
+
+    private DataTreeCandidate candidate;
+
+    private static DataTreeCandidateNode findNode(final Collection<DataTreeCandidateNode> nodes, final PathArgument arg) {
+        for (DataTreeCandidateNode node : nodes) {
+            if (arg.equals(node.getIdentifier())) {
+                return node;
+            }
+        }
+        return null;
+    }
+
+    private static void assertChildrenEquals(final Collection<DataTreeCandidateNode> expected,
+            final Collection<DataTreeCandidateNode> actual) {
+        // Make sure all expected nodes are there
+        for (DataTreeCandidateNode exp : expected) {
+            final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier());
+            assertNotNull("missing expected child", act);
+            assertCandidateNodeEquals(exp, act);
+        }
+        // Make sure no nodes are present which are not in the expected set
+        for (DataTreeCandidateNode act : actual) {
+            final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier());
+            assertNull("unexpected child", exp);
+        }
+    }
+
+    private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) {
+        assertEquals("root path", expected.getRootPath(), actual.getRootPath());
+
+        final DataTreeCandidateNode expRoot = expected.getRootNode();
+        final DataTreeCandidateNode actRoot = expected.getRootNode();
+        assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType());
+
+        switch (actRoot.getModificationType()) {
+        case DELETE:
+        case WRITE:
+            assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter());
+            break;
+        case SUBTREE_MODIFIED:
+            assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes());
+            break;
+        default:
+            fail("Unexpect root type " + actRoot.getModificationType());
+            break;
+        }
+
+        assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode());
+    }
+
+    private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) {
+        assertEquals("child type", expected.getModificationType(), actual.getModificationType());
+        assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier());
+
+        switch (actual.getModificationType()) {
+        case DELETE:
+        case WRITE:
+            assertEquals("child data", expected.getDataAfter(), actual.getDataAfter());
+            break;
+        case SUBTREE_MODIFIED:
+            assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes());
+            break;
+        default:
+            fail("Unexpect root type " + actual.getModificationType());
+            break;
+        }
+    }
+
+    @Before
+    public void setUp() {
+        final YangInstanceIdentifier writePath = TestModel.TEST_PATH;
+        final NormalizedNode<?, ?> writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)).
+                withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build();
+        candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData);
+    }
+
+    @Test
+    public void testCandidateSerialization() throws IOException {
+        final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
+        assertEquals("payload size", 181, payload.size());
+    }
+
+    @Test
+    public void testCandidateSerDes() throws IOException {
+        final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+    }
+
+    @Test
+    public void testPayloadSerDes() throws IOException {
+        final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
+        assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue());
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testLeafSetEntryNodeCandidate() throws Exception {
+        YangInstanceIdentifier.NodeWithValue entryPathArg = new YangInstanceIdentifier.NodeWithValue(LEAF_SET, "one");
+        YangInstanceIdentifier leafSetEntryPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(LEAF_SET)
+                .node(entryPathArg).build();
+
+        NormalizedNode<?, ?> leafSetEntryNode = Builders.leafSetEntryBuilder().
+                withNodeIdentifier(entryPathArg).withValue("one").build();
+
+        DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetEntryPath, leafSetEntryNode);
+        CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testLeafSetNodeCandidate() throws Exception {
+        YangInstanceIdentifier.NodeWithValue entryPathArg = new YangInstanceIdentifier.NodeWithValue(LEAF_SET, "one");
+        YangInstanceIdentifier leafSetPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(LEAF_SET).build();
+
+        LeafSetEntryNode leafSetEntryNode = Builders.leafSetEntryBuilder().
+                withNodeIdentifier(entryPathArg).withValue("one").build();
+        NormalizedNode<?, ?> leafSetNode = Builders.leafSetBuilder().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(LEAF_SET)).withChild(leafSetEntryNode).build();
+
+        DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
+        CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+    }
+
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    @Test
+    public void testOrderedLeafSetNodeCandidate() throws Exception {
+        YangInstanceIdentifier.NodeWithValue entryPathArg = new YangInstanceIdentifier.NodeWithValue(LEAF_SET, "one");
+        YangInstanceIdentifier leafSetPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(LEAF_SET).build();
+
+        LeafSetEntryNode leafSetEntryNode = Builders.leafSetEntryBuilder().
+                withNodeIdentifier(entryPathArg).withValue("one").build();
+        NormalizedNode<?, ?> leafSetNode = Builders.orderedLeafSetBuilder().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(LEAF_SET)).withChild(leafSetEntryNode).build();
+
+        DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode);
+        CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+    }
+
+    @Test
+    public void testLeafNodeCandidate() throws Exception {
+        YangInstanceIdentifier leafPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.DESC_QNAME).build();
+        LeafNode<Object> leafNode = Builders.leafBuilder().withNodeIdentifier(
+                new YangInstanceIdentifier.NodeIdentifier(TestModel.DESC_QNAME)).withValue("test").build();
+
+        DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafPath, leafNode);
+        CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate);
+        assertCandidateEquals(candidate, payload.getCandidate().getValue());
+    }
+}