From ba87ed620f13823ee798fda4241a2c1db37e2f33 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Mon, 23 May 2016 20:10:01 +0200 Subject: [PATCH] BUG-5280: add CommitTransactionPayload This adds the base payload, which is to be used for propagating transaction effects instead of DataTreeCandidatePayload. Also adds the abort case counterpart. Change-Id: I621e0be4f26509fae9f04e230c6a7e145938d7e6 Signed-off-by: Robert Varga --- .../datastore/DataTreeCandidatePayload.java | 24 ++- .../controller/cluster/datastore/Shard.java | 27 ++- .../datastore/ShardRecoveryCoordinator.java | 18 +- .../AbstractThreePhaseCommitMessage.java | 3 +- .../persisted/AbortTransactionPayload.java | 61 ++++++ .../AbstractIdentifiablePayload.java | 89 +++++++++ .../persisted/CommitTransactionPayload.java | 96 +++++++++ .../persisted/DataTreeCandidateSupplier.java | 25 +++ .../cluster/datastore/AbstractShardTest.java | 7 +- .../DataTreeCandidatePayloadTest.java | 13 +- .../ShardRecoveryCoordinatorTest.java | 19 +- .../cluster/datastore/ShardTest.java | 12 +- .../AbortTransactionPayloadTest.java | 23 +++ .../CommitTransactionPayloadTest.java | 188 ++++++++++++++++++ 14 files changed, 574 insertions(+), 31 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java index 8da70c2e29..25a7ee8622 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java @@ -14,14 +14,20 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Map.Entry; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -final class DataTreeCandidatePayload extends Payload implements Externalizable { - private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class); +/** + * @deprecated Deprecated in Boron in favor of CommitTransactionPayload + */ +@Deprecated +final class DataTreeCandidatePayload extends Payload implements DataTreeCandidateSupplier, Externalizable { private static final long serialVersionUID = 1L; private transient byte[] serialized; @@ -34,6 +40,10 @@ final class DataTreeCandidatePayload extends Payload implements Externalizable { this.serialized = Preconditions.checkNotNull(serialized); } + /** + * @deprecated Use CommitTransactionPayload instead + */ + @Deprecated static DataTreeCandidatePayload create(final DataTreeCandidate candidate) { final ByteArrayDataOutput out = ByteStreams.newDataOutput(); try { @@ -46,8 +56,10 @@ final class DataTreeCandidatePayload extends Payload implements Externalizable { } - DataTreeCandidate getCandidate() throws IOException { - return DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized)); + @Override + public Entry, DataTreeCandidate> getCandidate() throws IOException { + return new SimpleImmutableEntry<>(Optional.empty(), + DataTreeCandidateInputOutput.readDataTreeCandidate(ByteStreams.newDataInput(serialized))); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 02a14022c4..77584eb63b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -16,6 +16,7 @@ import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -51,6 +52,8 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; +import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; import org.opendaylight.controller.cluster.datastore.utils.Dispatchers; import org.opendaylight.controller.cluster.notifications.LeaderStateChanged; import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener; @@ -62,6 +65,7 @@ import org.opendaylight.controller.cluster.raft.RaftState; import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus; import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.messages.ServerRemoved; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.concepts.Identifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; @@ -321,16 +325,27 @@ public class Shard extends RaftActor { void continueCommit(final CohortEntry cohortEntry) { final DataTreeCandidate candidate = cohortEntry.getCandidate(); + final TransactionIdentifier transactionId = cohortEntry.getTransactionID(); // If we do not have any followers and we are not using persistence // or if cohortEntry has no modifications // we can apply modification to the state immediately if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { - applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate); - } else { - persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), - DataTreeCandidatePayload.create(candidate)); + applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate); + return; } + + final Payload payload; + try { + payload = CommitTransactionPayload.create(transactionId, candidate); + } catch (IOException e) { + LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate, + e); + // TODO: do we need to do something smarter here? + throw Throwables.propagate(e); + } + + persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload); } private void handleCommitTransaction(final CommitTransaction commit) { @@ -666,11 +681,11 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) { - if (data instanceof DataTreeCandidatePayload) { + if (data instanceof DataTreeCandidateSupplier) { if (clientActor == null) { // No clientActor indicates a replica coming from the leader try { - store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate()); + store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue()); } catch (DataValidationFailedException | IOException e) { LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index f3cbc8649d..624e68dd86 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -10,6 +10,10 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; +import java.util.Map.Entry; +import java.util.Optional; +import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput; import org.opendaylight.controller.cluster.datastore.utils.NormalizedNodeXMLOutput; import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification; @@ -18,6 +22,7 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -62,14 +67,21 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { Preconditions.checkState(transaction != null, "call startLogRecovery before calling appendRecoveredLogEntry"); try { - if (payload instanceof DataTreeCandidatePayload) { - DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate()); + if (payload instanceof DataTreeCandidateSupplier) { + final Entry, DataTreeCandidate> e = + ((DataTreeCandidateSupplier)payload).getCandidate(); + + DataTreeCandidates.applyToModification(transaction, e.getValue()); size++; + + if (e.getKey().isPresent()) { + // FIXME: BUG-5280: propagate transaction state + } } else { log.error("{}: Unknown payload {} received during recovery", shardName, payload); } } catch (IOException e) { - log.error("{}: Error extracting ModificationPayload", shardName, e); + log.error("{}: Error extracting payload", shardName, e); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractThreePhaseCommitMessage.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractThreePhaseCommitMessage.java index 9068228ee5..e69bc957bd 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractThreePhaseCommitMessage.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/AbstractThreePhaseCommitMessage.java @@ -49,7 +49,6 @@ public abstract class AbstractThreePhaseCommitMessage extends VersionedExternali @Override public String toString() { - return getClass().getSimpleName() + " [transactionID=" + transactionID + ", version=" + getVersion() - + "]"; + return getClass().getSimpleName() + " [transactionID=" + transactionID + ", version=" + getVersion() + "]"; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java new file mode 100644 index 0000000000..5c3723e2aa --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayload.java @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.IOException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Payload persisted when a transaction is aborted. It contains the transaction identifier. + * + * @author Robert Varga + */ +public final class AbortTransactionPayload extends AbstractIdentifiablePayload { + private static final class Proxy extends AbstractProxy { + private static final long serialVersionUID = 1L; + + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + super(serialized); + } + + @Override + protected TransactionIdentifier readIdentifier(final DataInput in) throws IOException { + return TransactionIdentifier.readFrom(in); + } + + @Override + protected AbortTransactionPayload createObject(final TransactionIdentifier identifier, + final byte[] serialized) { + return new AbortTransactionPayload(identifier, serialized); + } + } + + private static final long serialVersionUID = 1L; + + AbortTransactionPayload(final TransactionIdentifier transactionId, final byte[] serialized) { + super(transactionId, serialized); + } + + public static AbortTransactionPayload create(final TransactionIdentifier transactionId) throws IOException { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + transactionId.writeTo(out); + return new AbortTransactionPayload(transactionId, out.toByteArray()); + } + + @Override + protected Proxy externalizableProxy(final byte[] serialized) { + return new Proxy(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java new file mode 100644 index 0000000000..122c96938e --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/AbstractIdentifiablePayload.java @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import javax.annotation.Nonnull; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.concepts.Identifier; + +/** + * Abstract base class for {@link Payload}s which hold a single {@link Identifier}. + * + * @author Robert Varga + */ +public abstract class AbstractIdentifiablePayload extends Payload implements Identifiable, Serializable { + protected abstract static class AbstractProxy implements Externalizable { + private static final long serialVersionUID = 1L; + private byte[] serialized; + private T identifier; + + public AbstractProxy() { + // For Externalizable + } + + protected AbstractProxy(final byte[] serialized) { + this.serialized = Preconditions.checkNotNull(serialized); + } + + @Override + public final void writeExternal(final ObjectOutput out) throws IOException { + out.writeInt(serialized.length); + out.write(serialized); + } + + @Override + public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + final int length = in.readInt(); + serialized = new byte[length]; + in.readFully(serialized); + identifier = Verify.verifyNotNull(readIdentifier(ByteStreams.newDataInput(serialized))); + } + + protected final Object readResolve() { + return Verify.verifyNotNull(createObject(identifier, serialized)); + } + + protected abstract @Nonnull T readIdentifier(@Nonnull DataInput in) throws IOException; + protected abstract @Nonnull Identifiable createObject(@Nonnull T identifier, @Nonnull byte[] serialized); + } + + private static final long serialVersionUID = 1L; + private final byte[] serialized; + private final T identifier; + + AbstractIdentifiablePayload(final @Nonnull T identifier, final @Nonnull byte[] serialized) { + this.identifier = Preconditions.checkNotNull(identifier); + this.serialized = Preconditions.checkNotNull(serialized); + } + + @Override + public final T getIdentifier() { + return identifier; + } + + @Override + public final int size() { + return serialized.length; + } + + protected final Object writeReplace() { + return Verify.verifyNotNull(externalizableProxy(serialized)); + } + + protected abstract @Nonnull AbstractProxy externalizableProxy(@Nonnull byte[] serialized); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java new file mode 100644 index 0000000000..dd27b4e629 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayload.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import java.io.DataInput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.io.Serializable; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.Map.Entry; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Payload persisted when a transaction commits. It contains the transaction identifier and the + * {@link DataTreeCandidate} + * + * @author Robert Varga + */ +@Beta +public final class CommitTransactionPayload extends Payload implements DataTreeCandidateSupplier, Serializable { + private static final class Proxy implements Externalizable { + private static final long serialVersionUID = 1L; + private byte[] serialized; + + public Proxy() { + // For Externalizable + } + + Proxy(final byte[] serialized) { + this.serialized = Preconditions.checkNotNull(serialized); + } + + @Override + public void writeExternal(final ObjectOutput out) throws IOException { + out.writeInt(serialized.length); + out.write(serialized); + } + + @Override + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + final int length = in.readInt(); + serialized = new byte[length]; + in.readFully(serialized); + } + + private Object readResolve() { + return new CommitTransactionPayload(serialized); + } + } + + private static final long serialVersionUID = 1L; + + private final byte[] serialized; + + CommitTransactionPayload(final byte[] serialized) { + this.serialized = Preconditions.checkNotNull(serialized); + } + + public static CommitTransactionPayload create(final TransactionIdentifier transactionId, + final DataTreeCandidate candidate) throws IOException { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + transactionId.writeTo(out); + DataTreeCandidateInputOutput.writeDataTreeCandidate(out, candidate); + return new CommitTransactionPayload(out.toByteArray()); + } + + @Override + public Entry, DataTreeCandidate> getCandidate() throws IOException { + final DataInput in = ByteStreams.newDataInput(serialized); + return new SimpleImmutableEntry<>(Optional.of(TransactionIdentifier.readFrom(in)), + DataTreeCandidateInputOutput.readDataTreeCandidate(in)); + } + + @Override + public int size() { + return serialized.length; + } + + private Object writeReplace() { + return new Proxy(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java new file mode 100644 index 0000000000..4cd1c4a975 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DataTreeCandidateSupplier.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import com.google.common.annotations.Beta; +import java.io.IOException; +import java.util.Map.Entry; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; + +/** + * Interim interface for consolidating DataTreeCandidatePayload and {@link CommitTransactionPayload}. + * + * @author Robert Varga + */ +@Beta +public interface DataTreeCandidateSupplier { + Entry, DataTreeCandidate> getCandidate() throws IOException; +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 4209201548..b6f464a0e8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -30,6 +30,7 @@ import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; +import java.io.IOException; import java.util.Collections; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -50,6 +51,7 @@ import org.opendaylight.controller.cluster.datastore.messages.BatchedModificatio import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; +import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry; import org.opendaylight.controller.cluster.raft.Snapshot; @@ -403,11 +405,12 @@ public abstract class AbstractShardTest extends AbstractActorTest{ return testStore; } - static DataTreeCandidatePayload payloadForModification(final DataTree source, final DataTreeModification mod) throws DataValidationFailedException { + static CommitTransactionPayload payloadForModification(final DataTree source, final DataTreeModification mod, + final TransactionIdentifier transactionId) throws DataValidationFailedException, IOException { source.validate(mod); final DataTreeCandidate candidate = source.prepare(mod); source.commit(candidate); - return DataTreeCandidatePayload.create(candidate); + return CommitTransactionPayload.create(transactionId, candidate); } static BatchedModifications newBatchedModifications(final TransactionIdentifier transactionID, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java index 07ccce25ca..cce9bddde0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java @@ -30,6 +30,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.Builders; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +@Deprecated public class DataTreeCandidatePayloadTest { static final QName LEAF_SET = QName.create(TestModel.TEST_QNAME, "leaf-set"); @@ -118,13 +119,13 @@ public class DataTreeCandidatePayloadTest { @Test public void testCandidateSerDes() throws IOException { final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate()); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); } @Test public void testPayloadSerDes() throws IOException { final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate()); + assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -139,7 +140,7 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetEntryPath, leafSetEntryNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate()); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -155,7 +156,7 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate()); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -171,7 +172,7 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate()); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); } @Test @@ -182,6 +183,6 @@ public class DataTreeCandidatePayloadTest { DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafPath, leafNode); DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); - assertCandidateEquals(candidate, payload.getCandidate()); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java index 5cb74090db..4a8363c866 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinatorTest.java @@ -11,8 +11,10 @@ package org.opendaylight.controller.cluster.datastore; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import com.google.common.base.Optional; +import java.io.IOException; import org.junit.Before; import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.md.cluster.datastore.model.CarsModel; import org.opendaylight.controller.md.cluster.datastore.model.PeopleModel; @@ -29,7 +31,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.tree.SchemaValidationFai import org.opendaylight.yangtools.yang.model.api.SchemaContext; import org.slf4j.LoggerFactory; -public class ShardRecoveryCoordinatorTest { +public class ShardRecoveryCoordinatorTest extends AbstractTest { private ShardDataTree peopleDataTree; private SchemaContext peopleSchemaContext; @@ -43,6 +45,7 @@ public class ShardRecoveryCoordinatorTest { peopleDataTree = new ShardDataTree(peopleSchemaContext, TreeType.OPERATIONAL); } + @Deprecated @Test public void testAppendRecoveredLogEntryDataTreeCandidatePayload(){ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, @@ -57,6 +60,20 @@ public class ShardRecoveryCoordinatorTest { coordinator.applyCurrentLogRecoveryBatch(); } + @Test + public void testAppendRecoveredLogEntryCommitTransactionPayload() throws IOException { + final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, + peopleSchemaContext, null, "foobar", LoggerFactory.getLogger("foo")); + coordinator.startLogRecoveryBatch(10); + try { + coordinator.appendRecoveredLogEntry(CommitTransactionPayload.create(nextTransactionId(), createCar())); + } catch(final SchemaValidationFailedException e){ + fail("SchemaValidationFailedException should not happen if pruning is done"); + } + + coordinator.applyCurrentLogRecoveryBatch(); + } + @Test public void testApplyRecoverySnapshot(){ final ShardRecoveryCoordinator coordinator = new ShardRecoveryCoordinator(peopleDataTree, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 2082dbcdd9..a7e7e8fb98 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -104,7 +104,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.yangtools.util.StringIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; import org.opendaylight.yangtools.yang.data.api.schema.MapNode; @@ -448,8 +447,9 @@ public class ShardTest extends AbstractShardTest { writeMod.write(TestModel.TEST_PATH, node); writeMod.ready(); - final ApplyState applyState = new ApplyState(null, new StringIdentifier("test"), - new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod))); + final TransactionIdentifier tx = nextTransactionId(); + final ApplyState applyState = new ApplyState(null, tx, + new ReplicatedLogImplEntry(1, 2, payloadForModification(source, writeMod, tx))); shard.tell(applyState, shard); @@ -478,7 +478,8 @@ public class ShardTest extends AbstractShardTest { InMemoryJournal.addEntry(shardID.toString(), 0, DUMMY_DATA); // Set up the InMemoryJournal. - InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + InMemoryJournal.addEntry(shardID.toString(), 1, new ReplicatedLogImplEntry(0, 1, + payloadForModification(source, writeMod, nextTransactionId()))); final int nListEntries = 16; final Set listEntryKeys = new HashSet<>(); @@ -493,8 +494,9 @@ public class ShardTest extends AbstractShardTest { final DataTreeModification mod = source.takeSnapshot().newModification(); mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); mod.ready(); + InMemoryJournal.addEntry(shardID.toString(), i+1, new ReplicatedLogImplEntry(i, 1, - payloadForModification(source, mod))); + payloadForModification(source, mod, nextTransactionId()))); } InMemoryJournal.addEntry(shardID.toString(), nListEntries + 2, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java new file mode 100644 index 0000000000..c0f63a5ce8 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/AbortTransactionPayloadTest.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static org.junit.Assert.assertEquals; +import java.io.IOException; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractTest; + +public class AbortTransactionPayloadTest extends AbstractTest { + @Test + public void testPayloadSerDes() throws IOException { + final AbortTransactionPayload template = AbortTransactionPayload.create(nextTransactionId()); + final AbortTransactionPayload cloned = SerializationUtils.clone(template); + assertEquals(template.getIdentifier(), cloned.getIdentifier()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java new file mode 100644 index 0000000000..3f47e28149 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/persisted/CommitTransactionPayloadTest.java @@ -0,0 +1,188 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.persisted; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.util.Collection; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractTest; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.LeafNode; +import org.opendaylight.yangtools.yang.data.api.schema.LeafSetEntryNode; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.impl.schema.Builders; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; + +public class CommitTransactionPayloadTest extends AbstractTest { + static final QName LEAF_SET = QName.create(TestModel.TEST_QNAME, "leaf-set"); + + private DataTreeCandidate candidate; + + private static DataTreeCandidateNode findNode(final Collection nodes, final PathArgument arg) { + for (DataTreeCandidateNode node : nodes) { + if (arg.equals(node.getIdentifier())) { + return node; + } + } + return null; + } + + private static void assertChildrenEquals(final Collection expected, + final Collection actual) { + // Make sure all expected nodes are there + for (DataTreeCandidateNode exp : expected) { + final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier()); + assertNotNull("missing expected child", act); + assertCandidateNodeEquals(exp, act); + } + // Make sure no nodes are present which are not in the expected set + for (DataTreeCandidateNode act : actual) { + final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier()); + assertNull("unexpected child", exp); + } + } + + private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) { + assertEquals("root path", expected.getRootPath(), actual.getRootPath()); + + final DataTreeCandidateNode expRoot = expected.getRootNode(); + final DataTreeCandidateNode actRoot = expected.getRootNode(); + assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType()); + + switch (actRoot.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes()); + break; + default: + fail("Unexpect root type " + actRoot.getModificationType()); + break; + } + + assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode()); + } + + private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) { + assertEquals("child type", expected.getModificationType(), actual.getModificationType()); + assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier()); + + switch (actual.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("child data", expected.getDataAfter(), actual.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes()); + break; + default: + fail("Unexpect root type " + actual.getModificationType()); + break; + } + } + + @Before + public void setUp() { + final YangInstanceIdentifier writePath = TestModel.TEST_PATH; + final NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData); + } + + @Test + public void testCandidateSerialization() throws IOException { + final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate); + assertEquals("payload size", 181, payload.size()); + } + + @Test + public void testCandidateSerDes() throws IOException { + final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); + } + + @Test + public void testPayloadSerDes() throws IOException { + final CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate); + assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate().getValue()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testLeafSetEntryNodeCandidate() throws Exception { + YangInstanceIdentifier.NodeWithValue entryPathArg = new YangInstanceIdentifier.NodeWithValue(LEAF_SET, "one"); + YangInstanceIdentifier leafSetEntryPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(LEAF_SET) + .node(entryPathArg).build(); + + NormalizedNode leafSetEntryNode = Builders.leafSetEntryBuilder(). + withNodeIdentifier(entryPathArg).withValue("one").build(); + + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetEntryPath, leafSetEntryNode); + CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testLeafSetNodeCandidate() throws Exception { + YangInstanceIdentifier.NodeWithValue entryPathArg = new YangInstanceIdentifier.NodeWithValue(LEAF_SET, "one"); + YangInstanceIdentifier leafSetPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(LEAF_SET).build(); + + LeafSetEntryNode leafSetEntryNode = Builders.leafSetEntryBuilder(). + withNodeIdentifier(entryPathArg).withValue("one").build(); + NormalizedNode leafSetNode = Builders.leafSetBuilder().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(LEAF_SET)).withChild(leafSetEntryNode).build(); + + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode); + CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testOrderedLeafSetNodeCandidate() throws Exception { + YangInstanceIdentifier.NodeWithValue entryPathArg = new YangInstanceIdentifier.NodeWithValue(LEAF_SET, "one"); + YangInstanceIdentifier leafSetPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(LEAF_SET).build(); + + LeafSetEntryNode leafSetEntryNode = Builders.leafSetEntryBuilder(). + withNodeIdentifier(entryPathArg).withValue("one").build(); + NormalizedNode leafSetNode = Builders.orderedLeafSetBuilder().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(LEAF_SET)).withChild(leafSetEntryNode).build(); + + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafSetPath, leafSetNode); + CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); + } + + @Test + public void testLeafNodeCandidate() throws Exception { + YangInstanceIdentifier leafPath = YangInstanceIdentifier.builder(TestModel.TEST_PATH).node(TestModel.DESC_QNAME).build(); + LeafNode leafNode = Builders.leafBuilder().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.DESC_QNAME)).withValue("test").build(); + + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(leafPath, leafNode); + CommitTransactionPayload payload = CommitTransactionPayload.create(nextTransactionId(), candidate); + assertCandidateEquals(candidate, payload.getCandidate().getValue()); + } +} -- 2.36.6