From: Robert Varga Date: Sat, 18 Apr 2015 22:35:57 +0000 (+0200) Subject: CDS: switch persistence to persisting the candidates X-Git-Tag: release/lithium~235^2 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=dd174b7754c8ab975b6dd37d1891eafa3abba115 CDS: switch persistence to persisting the candidates With us being in control of the DataTree, we can now use DataTreeCandidate for replication and persistence. Add the appropriate payload and add appropriate hooks. Exposing the DataTreeCandidate from the cohort is actually pushing towards having our own cohort type -- ShardDataTreeCohort. With that we can get rid of DOMStoreThreePhaseCommitCohort, so eliminate its mention to prevent accidental leakage. Change-Id: I37a22b445f955330193c762894764feee94bebdb Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java new file mode 100644 index 0000000000..c3940e5256 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * Abstract base class for our internal implementation of {@link DataTreeCandidateNode}, + * which we instantiate from a serialized stream. We do not retain the before-image and + * do not implement {@link #getModifiedChild(PathArgument)}, as that method is only + * useful for end users. Instances based on this class should never be leaked outside of + * this component. + */ +abstract class AbstractDataTreeCandidateNode implements DataTreeCandidateNode { + private final ModificationType type; + + protected AbstractDataTreeCandidateNode(final ModificationType type) { + this.type = Preconditions.checkNotNull(type); + } + + @Override + public final DataTreeCandidateNode getModifiedChild(final PathArgument identifier) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public final ModificationType getModificationType() { + return type; + } + + @Override + public final Optional> getDataBefore() { + throw new UnsupportedOperationException("Before-image not available after serialization"); + } + + static DataTreeCandidateNode createUnmodified() { + return new AbstractDataTreeCandidateNode(ModificationType.UNMODIFIED) { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + + @Override + public Optional> getDataAfter() { + throw new UnsupportedOperationException("After-image not available after serialization"); + } + + @Override + public Collection getChildNodes() { + throw new UnsupportedOperationException("Children not available after serialization"); + } + }; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java new file mode 100644 index 0000000000..4b471cfa4a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ChainedCommitCohort extends ShardDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(ChainedCommitCohort.class); + private final ReadWriteShardDataTreeTransaction transaction; + private final ShardDataTreeTransactionChain chain; + private final ShardDataTreeCohort delegate; + + ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction, final ShardDataTreeCohort delegate) { + this.transaction = Preconditions.checkNotNull(transaction); + this.delegate = Preconditions.checkNotNull(delegate); + this.chain = Preconditions.checkNotNull(chain); + } + + @Override + public ListenableFuture commit() { + final ListenableFuture ret = delegate.commit(); + + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(Void result) { + chain.clearTransaction(transaction); + LOG.debug("Committed transaction {}", transaction); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Transaction {} commit failed, cannot recover", transaction, t); + } + }); + + return ret; + } + + @Override + public ListenableFuture canCommit() { + return delegate.canCommit(); + } + + @Override + public ListenableFuture preCommit() { + return delegate.preCommit(); + } + + @Override + public ListenableFuture abort() { + return delegate.abort(); + } + + @Override + DataTreeCandidateTip getCandidate() { + return delegate.getCandidate(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java new file mode 100644 index 0000000000..54167b2011 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.google.protobuf.GeneratedMessage.GeneratedExtension; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DataTreeCandidatePayload extends Payload implements Externalizable { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class); + private static final long serialVersionUID = 1L; + private static final byte DELETE = 0; + private static final byte SUBTREE_MODIFIED = 1; + private static final byte UNMODIFIED = 2; + private static final byte WRITE = 3; + + private transient byte[] serialized; + + public DataTreeCandidatePayload() { + // Required by Externalizable + } + + private DataTreeCandidatePayload(final byte[] serialized) { + this.serialized = Preconditions.checkNotNull(serialized); + } + + private static void writeChildren(final NormalizedNodeOutputStreamWriter writer, final DataOutput out, + final Collection children) throws IOException { + out.writeInt(children.size()); + for (DataTreeCandidateNode child : children) { + writeNode(writer, out, child); + } + } + + private static void writeNode(final NormalizedNodeOutputStreamWriter writer, final DataOutput out, + final DataTreeCandidateNode node) throws IOException { + switch (node.getModificationType()) { + case DELETE: + out.writeByte(DELETE); + writer.writePathArgument(node.getIdentifier()); + break; + case SUBTREE_MODIFIED: + out.writeByte(SUBTREE_MODIFIED); + writer.writePathArgument(node.getIdentifier()); + writeChildren(writer, out, node.getChildNodes()); + break; + case WRITE: + out.writeByte(WRITE); + writer.writeNormalizedNode(node.getDataAfter().get()); + break; + case UNMODIFIED: + throw new IllegalArgumentException("Unmodified candidate should never be in the payload"); + default: + throw new IllegalArgumentException("Unhandled node type " + node.getModificationType()); + } + } + + static DataTreeCandidatePayload create(DataTreeCandidate candidate) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try (final NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(out)) { + writer.writeYangInstanceIdentifier(candidate.getRootPath()); + + final DataTreeCandidateNode node = candidate.getRootNode(); + switch (node.getModificationType()) { + case DELETE: + out.writeByte(DELETE); + break; + case SUBTREE_MODIFIED: + out.writeByte(SUBTREE_MODIFIED); + writeChildren(writer, out, node.getChildNodes()); + break; + case UNMODIFIED: + out.writeByte(UNMODIFIED); + break; + case WRITE: + out.writeByte(WRITE); + writer.writeNormalizedNode(node.getDataAfter().get()); + break; + default: + throw new IllegalArgumentException("Unhandled node type " + node.getModificationType()); + } + + writer.close(); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Failed to serialize candidate %s", candidate), e); + } + + return new DataTreeCandidatePayload(out.toByteArray()); + } + + private static Collection readChildren(final NormalizedNodeInputStreamReader reader, + final DataInput in) throws IOException { + final int size = in.readInt(); + if (size != 0) { + final Collection ret = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + final DataTreeCandidateNode child = readNode(reader, in); + if (child != null) { + ret.add(child); + } + } + return ret; + } else { + return Collections.emptyList(); + } + } + + private static DataTreeCandidateNode readNode(final NormalizedNodeInputStreamReader reader, + final DataInput in) throws IOException { + final byte type = in.readByte(); + switch (type) { + case DELETE: + return DeletedDataTreeCandidateNode.create(reader.readPathArgument()); + case SUBTREE_MODIFIED: + final PathArgument identifier = reader.readPathArgument(); + final Collection children = readChildren(reader, in); + if (children.isEmpty()) { + LOG.debug("Modified node {} does not have any children, not instantiating it", identifier); + return null; + } else { + return ModifiedDataTreeCandidateNode.create(identifier, children); + } + case UNMODIFIED: + return null; + case WRITE: + return DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode()); + default: + throw new IllegalArgumentException("Unhandled node type " + type); + } + } + + private static DataTreeCandidate parseCandidate(final ByteArrayDataInput in) throws IOException { + final NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(in); + final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier(); + final byte type = in.readByte(); + + final DataTreeCandidateNode rootNode; + switch (type) { + case DELETE: + rootNode = DeletedDataTreeCandidateNode.create(); + break; + case SUBTREE_MODIFIED: + rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader, in)); + break; + case WRITE: + rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode()); + break; + default: + throw new IllegalArgumentException("Unhandled node type " + type); + } + + return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode); + } + + DataTreeCandidate getCandidate() throws IOException { + return parseCandidate(ByteStreams.newDataInput(serialized)); + } + + @Override + @Deprecated + @SuppressWarnings("rawtypes") + public Map encode() { + return null; + } + + @Override + @Deprecated + public Payload decode(final AppendEntries.ReplicatedLogEntry.Payload payload) { + return null; + } + + @Override + public int size() { + return serialized.length; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeByte((byte)serialVersionUID); + out.writeInt(serialized.length); + out.write(serialized); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + final long version = in.readByte(); + Preconditions.checkArgument(version == serialVersionUID, "Unsupported serialization version %s", version); + + final int length = in.readInt(); + serialized = new byte[length]; + in.readFully(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java new file mode 100644 index 0000000000..2df380b391 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * A deserialized {@link DataTreeCandidateNode} which represents a deletion. + */ +abstract class DeletedDataTreeCandidateNode extends AbstractDataTreeCandidateNode { + private DeletedDataTreeCandidateNode() { + super(ModificationType.DELETE); + } + + static DataTreeCandidateNode create() { + return new DeletedDataTreeCandidateNode() { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + }; + } + + static DataTreeCandidateNode create(final PathArgument identifier) { + return new DeletedDataTreeCandidateNode() { + @Override + public final PathArgument getIdentifier() { + return identifier; + } + }; + } + + @Override + public final Optional> getDataAfter() { + return Optional.absent(); + } + + @Override + public final Collection getChildNodes() { + // We would require the before-image to reconstruct the list of nodes which + // were deleted. + throw new UnsupportedOperationException("Children not available after serialization"); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java new file mode 100644 index 0000000000..208ec33967 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * A deserialized {@link DataTreeCandidateNode} which represents a modification in + * one of its children. + */ +abstract class ModifiedDataTreeCandidateNode extends AbstractDataTreeCandidateNode { + private final Collection children; + + private ModifiedDataTreeCandidateNode(final Collection children) { + super(ModificationType.SUBTREE_MODIFIED); + this.children = Preconditions.checkNotNull(children); + } + + static DataTreeCandidateNode create(final Collection children) { + return new ModifiedDataTreeCandidateNode(children) { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + }; + } + + static DataTreeCandidateNode create(final PathArgument identifier, final Collection children) { + return new ModifiedDataTreeCandidateNode(children) { + @Override + public final PathArgument getIdentifier() { + return identifier; + } + }; + } + + @Override + public final Optional> getDataAfter() { + throw new UnsupportedOperationException("After-image not available after serialization"); + } + + @Override + public final Collection getChildNodes() { + return children; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java index 0f3ab61041..cb17335caf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction { @@ -26,7 +25,7 @@ final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTrans parent.abortTransaction(this); } - DOMStoreThreePhaseCommitCohort ready() { + ShardDataTreeCohort ready() { Preconditions.checkState(close(), "Transaction is already closed"); return parent.finishTransaction(this); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index b53d12c0c8..62d3259a71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -63,6 +63,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -291,15 +294,21 @@ public class Shard extends RaftActor { } } + private static boolean isEmptyCommit(final DataTreeCandidate candidate) { + return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType()); + } + void continueCommit(final CohortEntry cohortEntry) throws Exception { + final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate(); + // If we do not have any followers and we are not using persistence // or if cohortEntry has no modifications // we can apply modification to the state immediately - if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){ - applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification()); + if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { + applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate); } else { Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), - new ModificationPayload(cohortEntry.getModification())); + DataTreeCandidatePayload.create(candidate)); } } @@ -309,12 +318,37 @@ public class Shard extends RaftActor { } } + private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) { + LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); + + try { + // We block on the future here so we don't have to worry about possibly accessing our + // state on a different thread outside of our dispatcher. Also, the data store + // currently uses a same thread executor anyway. + cohortEntry.getCohort().commit().get(); + + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); + + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); + + } catch (Exception e) { + sender.tell(new akka.actor.Status.Failure(e), getSelf()); + + LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), + transactionID, e); + shardMBean.incrementFailedTransactionsCount(); + } finally { + commitCoordinator.currentTransactionComplete(transactionID, true); + } + } + private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) { // With persistence enabled, this method is called via applyState by the leader strategy // after the commit has been replicated to a majority of the followers. CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); - if(cohortEntry == null) { + if (cohortEntry == null) { // The transaction is no longer the current commit. This can happen if the transaction // was aborted prior, most likely due to timeout in the front-end. We need to finish // committing the transaction though since it was successfully persisted and replicated @@ -323,7 +357,13 @@ public class Shard extends RaftActor { // transaction. cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { - commitWithNewTransaction(cohortEntry.getModification()); + try { + store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate()); + } catch (DataValidationFailedException e) { + shardMBean.incrementFailedTransactionsCount(); + LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e); + } + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication @@ -334,31 +374,8 @@ public class Shard extends RaftActor { LOG.error(ex.getMessage()); sender.tell(new akka.actor.Status.Failure(ex), getSelf()); } - - return; - } - - LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); - - try { - // We block on the future here so we don't have to worry about possibly accessing our - // state on a different thread outside of our dispatcher. Also, the data store - // currently uses a same thread executor anyway. - cohortEntry.getCohort().commit().get(); - - sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); - - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - - } catch (Exception e) { - sender.tell(new akka.actor.Status.Failure(e), getSelf()); - - LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), - transactionID, e); - shardMBean.incrementFailedTransactionsCount(); - } finally { - commitCoordinator.currentTransactionComplete(transactionID, true); + } else { + finishCommit(sender, transactionID, cohortEntry); } } @@ -556,15 +573,25 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { - - if(data instanceof ModificationPayload) { + if (data instanceof DataTreeCandidatePayload) { + if (clientActor == null) { + // No clientActor indicates a replica coming from the leader + try { + store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate()); + } catch (DataValidationFailedException | IOException e) { + LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); + } + } else { + // Replication consensus reached, proceed to commit + finishCommit(clientActor, identifier); + } + } else if (data instanceof ModificationPayload) { try { applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); } catch (ClassNotFoundException | IOException e) { LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); } - } - else if (data instanceof CompositeModificationPayload) { + } else if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); applyModificationToState(clientActor, identifier, modification); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 0eb48fd180..30947fa666 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -30,7 +30,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; /** @@ -42,7 +41,7 @@ public class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual); + ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); } private final Cache cohortCache; @@ -413,8 +412,7 @@ public class ShardCommitCoordinator { static class CohortEntry { private final String transactionID; - private DOMStoreThreePhaseCommitCohort cohort; - private final MutableCompositeModification compositeModification; + private ShardDataTreeCohort cohort; private final ReadWriteShardDataTreeTransaction transaction; private ActorRef replySender; private Shard shard; @@ -422,16 +420,14 @@ public class ShardCommitCoordinator { private boolean doImmediateCommit; CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { - this.compositeModification = new MutableCompositeModification(); this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; } - CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort, + CohortEntry(String transactionID, ShardDataTreeCohort cohort, MutableCompositeModification compositeModification) { this.transactionID = transactionID; this.cohort = cohort; - this.compositeModification = compositeModification; this.transaction = null; } @@ -447,17 +443,12 @@ public class ShardCommitCoordinator { return transactionID; } - DOMStoreThreePhaseCommitCohort getCohort() { + ShardDataTreeCohort getCohort() { return cohort; } - MutableCompositeModification getModification() { - return compositeModification; - } - void applyModifications(Iterable modifications) { - for(Modification modification: modifications) { - compositeModification.addModification(modification); + for (Modification modification : modifications) { modification.apply(transaction.getSnapshot()); } } @@ -498,9 +489,5 @@ public class ShardCommitCoordinator { void setShard(Shard shard) { this.shard = shard; } - - boolean hasModifications(){ - return compositeModification.getModifications().size() > 0; - } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 373bf499e0..fbe699223c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -21,14 +21,13 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -93,7 +92,7 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent { return ensureTransactionChain(chainId).newReadWriteTransaction(txId); } - void notifyListeners(final DataTreeCandidateTip candidate) { + void notifyListeners(final DataTreeCandidate candidate) { LOG.debug("Notifying listeners on candidate {}", candidate); // DataTreeChanges first, as they are more light-weight @@ -152,15 +151,30 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent { return new SimpleEntry<>(reg, event); } + void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { + LOG.debug("Applying foreign transaction {}", identifier); + + final DataTreeModification mod = dataTree.takeSnapshot().newModification(); + DataTreeCandidates.applyToModification(mod, foreign); + mod.ready(); + + LOG.trace("Applying foreign modification {}", mod); + dataTree.validate(mod); + final DataTreeCandidate candidate = dataTree.prepare(mod); + dataTree.commit(candidate); + notifyListeners(candidate); + } + @Override void abortTransaction(final AbstractShardDataTreeTransaction transaction) { // Intentional no-op } @Override - DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return new ShardDataTreeCohort(this, snapshot); + return new SimpleShardDataTreeCohort(this, snapshot); } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 11b3ca8ed7..213e36a570 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -7,72 +7,23 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFuture; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort { - private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class); - private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); - private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); - private final DataTreeModification transaction; - private final ShardDataTree dataTree; - private DataTreeCandidateTip candidate; - - ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { - this.dataTree = Preconditions.checkNotNull(dataTree); - this.transaction = Preconditions.checkNotNull(transaction); - } - - @Override - public ListenableFuture canCommit() { - try { - dataTree.getDataTree().validate(transaction); - LOG.debug("Transaction {} validated", transaction); - return TRUE_FUTURE; - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture preCommit() { - try { - candidate = dataTree.getDataTree().prepare(transaction); - /* - * FIXME: this is the place where we should be interacting with persistence, specifically by invoking - * persist on the candidate (which gives us a Future). - */ - LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); - return VOID_FUTURE; - } catch (Exception e) { - LOG.debug("Transaction {} failed to prepare", transaction, e); - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture abort() { - // No-op, really - return VOID_FUTURE; +public abstract class ShardDataTreeCohort { + ShardDataTreeCohort() { + // Prevent foreign instantiation } - @Override - public ListenableFuture commit() { - try { - dataTree.getDataTree().commit(candidate); - } catch (Exception e) { - LOG.error("Transaction {} failed to commit", transaction, e); - return Futures.immediateFailedFuture(e); - } + abstract DataTreeCandidateTip getCandidate(); - LOG.debug("Transaction {} committed, proceeding to notify", transaction); - dataTree.notifyListeners(candidate); - return VOID_FUTURE; - } + @VisibleForTesting + public abstract ListenableFuture canCommit(); + @VisibleForTesting + public abstract ListenableFuture preCommit(); + @VisibleForTesting + public abstract ListenableFuture abort(); + @VisibleForTesting + public abstract ListenableFuture commit(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index 780d940128..183c2192e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -9,12 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,17 +72,17 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent } @Override - protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction); // dataTree is finalizing ready the transaction, we just record it for the next // transaction in chain - final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction); + final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction); openTransaction = null; previousTx = transaction; LOG.debug("Committing transaction {}", transaction); - return new CommitCohort(transaction, delegate); + return new ChainedCommitCohort(this, transaction, delegate); } @Override @@ -95,40 +90,9 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent return MoreObjects.toStringHelper(this).add("id", chainId).toString(); } - private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort { - private final ReadWriteShardDataTreeTransaction transaction; - private final DOMStoreThreePhaseCommitCohort delegate; - - CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) { - this.transaction = Preconditions.checkNotNull(transaction); - this.delegate = Preconditions.checkNotNull(delegate); - } - - @Override - protected DOMStoreThreePhaseCommitCohort delegate() { - return delegate; - } - - @Override - public ListenableFuture commit() { - final ListenableFuture ret = super.commit(); - - Futures.addCallback(ret, new FutureCallback() { - @Override - public void onSuccess(Void result) { - if (transaction.equals(previousTx)) { - previousTx = null; - } - LOG.debug("Committed transaction {}", transaction); - } - - @Override - public void onFailure(Throwable t) { - LOG.error("Transaction {} commit failed, cannot recover", transaction, t); - } - }); - - return ret; + void clearTransaction(ReadWriteShardDataTreeTransaction transaction) { + if (transaction.equals(previousTx)) { + previousTx = null; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java index 6cc1408eae..ee04aff515 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java @@ -7,9 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; - abstract class ShardDataTreeTransactionParent { abstract void abortTransaction(AbstractShardDataTreeTransaction transaction); - abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); + abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index f9d3050015..797641978d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,9 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.collect.Lists; import java.io.IOException; -import java.util.List; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; @@ -17,10 +15,10 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; @@ -36,52 +34,55 @@ import org.slf4j.Logger; */ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build(); - private final ShardDataTree store; - private List currentLogRecoveryBatch; + private final DataTree store; private final String shardName; private final Logger log; + private DataTreeModification transaction; + private int size; ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) { - this.store = store; + this.store = store.getDataTree(); this.shardName = shardName; this.log = log; } @Override public void startLogRecoveryBatch(int maxBatchSize) { - currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); + transaction = store.takeSnapshot().newModification(); + size = 0; } @Override public void appendRecoveredLogEntry(Payload payload) { try { - if(payload instanceof ModificationPayload) { - currentLogRecoveryBatch.add((ModificationPayload) payload); + if (payload instanceof DataTreeCandidatePayload) { + DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate()); + size++; + } else if (payload instanceof ModificationPayload) { + MutableCompositeModification.fromSerializable( + ((ModificationPayload) payload).getModification()).apply(transaction); + size++; } else if (payload instanceof CompositeModificationPayload) { - currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( - ((CompositeModificationPayload) payload).getModification()))); + MutableCompositeModification.fromSerializable( + ((CompositeModificationPayload) payload).getModification()).apply(transaction); + size++; } else if (payload instanceof CompositeModificationByteStringPayload) { - currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( - ((CompositeModificationByteStringPayload) payload).getModification()))); + MutableCompositeModification.fromSerializable( + ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction); + size++; } else { log.error("{}: Unknown payload {} received during recovery", shardName, payload); } - } catch (IOException e) { + } catch (IOException | ClassNotFoundException e) { log.error("{}: Error extracting ModificationPayload", shardName, e); } - } - private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); - try { - commitCohort.preCommit().get(); - commitCohort.commit().get(); - } catch (Exception e) { - log.error("{}: Failed to commit Tx on recovery", shardName, e); - } + private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException { + tx.ready(); + store.validate(tx); + store.commit(store.prepare(tx)); } /** @@ -89,21 +90,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { */ @Override public void applyCurrentLogRecoveryBatch() { - log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); - - ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null); - DataTreeModification snapshot = writeTx.getSnapshot(); - for (ModificationPayload payload : currentLogRecoveryBatch) { - try { - MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot); - } catch (Exception e) { - log.error("{}: Error extracting ModificationPayload", shardName, e); - } + log.debug("{}: Applying current log recovery batch with size {}", shardName, size); + try { + commitTransaction(transaction); + } catch (DataValidationFailedException e) { + log.error("{}: Failed to apply recovery batch", shardName, e); } - - commitTransaction(writeTx); - - currentLogRecoveryBatch = null; + transaction = null; } /** @@ -115,19 +108,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { public void applyRecoverySnapshot(final byte[] snapshotBytes) { log.debug("{}: Applying recovered snapshot", shardName); - // Intentionally bypass normal transaction to side-step persistence/replication - final DataTree tree = store.getDataTree(); - DataTreeModification writeTx = tree.takeSnapshot().newModification(); - - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - - writeTx.write(ROOT, node); - writeTx.ready(); + final NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + final DataTreeModification tx = store.takeSnapshot().newModification(); + tx.write(ROOT, node); try { - tree.validate(writeTx); - tree.commit(tree.prepare(writeTx)); + commitTransaction(tx); } catch (DataValidationFailedException e) { - log.error("{}: Failed to validate recovery snapshot", shardName, e); + log.error("{}: Failed to apply recovery snapshot", shardName, e); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 35d8e922f2..600509a26b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -14,7 +14,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -86,7 +85,7 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction) throws ExecutionException, InterruptedException { - DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); + ShardDataTreeCohort commitCohort = store.finishTransaction(transaction); commitCohort.preCommit().get(); commitCohort.commit().get(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 69a696f294..365f97dd3f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; /** * @author: syedbahm @@ -197,7 +196,7 @@ public class ShardWriteTransaction extends ShardTransaction { LOG.debug("readyTransaction : {}", transactionID); - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + ShardDataTreeCohort cohort = transaction.ready(); getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(), cohort, compositeModification, returnSerialized, doImmediateCommit), getContext()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java new file mode 100644 index 0000000000..9f22ce8a73 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); + private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); + private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final DataTreeModification transaction; + private final ShardDataTree dataTree; + private DataTreeCandidateTip candidate; + + SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.transaction = Preconditions.checkNotNull(transaction); + } + + @Override + DataTreeCandidateTip getCandidate() { + return candidate; + } + + @Override + public ListenableFuture canCommit() { + try { + dataTree.getDataTree().validate(transaction); + LOG.debug("Transaction {} validated", transaction); + return TRUE_FUTURE; + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture preCommit() { + try { + candidate = dataTree.getDataTree().prepare(transaction); + /* + * FIXME: this is the place where we should be interacting with persistence, specifically by invoking + * persist on the candidate (which gives us a Future). + */ + LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); + return VOID_FUTURE; + } catch (Exception e) { + LOG.debug("Transaction {} failed to prepare", transaction, e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture abort() { + // No-op, really + return VOID_FUTURE; + } + + @Override + public ListenableFuture commit() { + try { + dataTree.getDataTree().commit(candidate); + } catch (Exception e) { + LOG.error("Transaction {} failed to commit", transaction, e); + return Futures.immediateFailedFuture(e); + } + + LOG.debug("Transaction {} committed, proceeding to notify", transaction); + dataTree.notifyListeners(candidate); + return VOID_FUTURE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index cdd7859a30..2f48ab9d1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -7,8 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; /** * Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction. @@ -17,14 +17,14 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh */ public class ForwardedReadyTransaction { private final String transactionID; - private final DOMStoreThreePhaseCommitCohort cohort; + private final ShardDataTreeCohort cohort; private final Modification modification; private final boolean returnSerialized; private final boolean doImmediateCommit; private final short txnClientVersion; public ForwardedReadyTransaction(String transactionID, short txnClientVersion, - DOMStoreThreePhaseCommitCohort cohort, Modification modification, + ShardDataTreeCohort cohort, Modification modification, boolean returnSerialized, boolean doImmediateCommit) { this.transactionID = transactionID; this.cohort = cohort; @@ -38,7 +38,7 @@ public class ForwardedReadyTransaction { return transactionID; } - public DOMStoreThreePhaseCommitCohort getCohort() { + public ShardDataTreeCohort getCohort() { return cohort; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 03f2bb7ad0..1100f3a7fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -21,7 +21,6 @@ import akka.japi.Creator; import akka.testkit.TestActorRef; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; @@ -42,10 +41,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -53,6 +48,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; @@ -172,49 +168,35 @@ public abstract class AbstractShardTest extends AbstractActorTest{ Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied)); } - protected NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { - DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); - CheckedFuture>, ReadFailedException> read = - transaction.read(YangInstanceIdentifier.builder().build()); - - Optional> optional = read.checkedGet(); - - NormalizedNode normalizedNode = optional.get(); - - transaction.close(); - - return normalizedNode; - } - - protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification) { return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null); } - protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification, - final Function> preCommit) { + final Function> preCommit) { ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null); tx.getSnapshot().write(path, data); - DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); + ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); modification.addModification(new WriteModification(path, data)); return cohort; } - protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, - final DOMStoreThreePhaseCommitCohort actual) { + protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, + final ShardDataTreeCohort actual) { return createDelegatingMockCohort(cohortName, actual, null); } - protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, - final DOMStoreThreePhaseCommitCohort actual, - final Function> preCommit) { - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName); + protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, + final ShardDataTreeCohort actual, + final Function> preCommit) { + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName); doAnswer(new Answer>() { @Override @@ -248,6 +230,13 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } }).when(cohort).abort(); + doAnswer(new Answer() { + @Override + public DataTreeCandidateTip answer(final InvocationOnMock invocation) { + return actual.getCandidate(); + } + }).when(cohort).getCandidate(); + return cohort; } @@ -275,7 +264,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null); transaction.getSnapshot().write(id, node); - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + ShardDataTreeCohort cohort = transaction.ready(); cohort.canCommit().get(); cohort.preCommit().get(); cohort.commit(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java new file mode 100644 index 0000000000..781c3dba71 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.util.Collection; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; + +public class DataTreeCandidatePayloadTest { + private DataTreeCandidate candidate; + + private static DataTreeCandidateNode findNode(final Collection nodes, final PathArgument arg) { + for (DataTreeCandidateNode node : nodes) { + if (arg.equals(node.getIdentifier())) { + return node; + } + } + return null; + } + + private static void assertChildrenEquals(final Collection expected, + final Collection actual) { + // Make sure all expected nodes are there + for (DataTreeCandidateNode exp : expected) { + final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier()); + assertNotNull("missing expected child", act); + assertCandidateNodeEquals(exp, act); + } + // Make sure no nodes are present which are not in the expected set + for (DataTreeCandidateNode act : actual) { + final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier()); + assertNull("unexpected child", exp); + } + } + + private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) { + assertEquals("root path", expected.getRootPath(), actual.getRootPath()); + + final DataTreeCandidateNode expRoot = expected.getRootNode(); + final DataTreeCandidateNode actRoot = expected.getRootNode(); + assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType()); + + switch (actRoot.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes()); + break; + default: + fail("Unexpect root type " + actRoot.getModificationType()); + break; + } + + assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode()); + } + + private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) { + assertEquals("child type", expected.getModificationType(), actual.getModificationType()); + assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier()); + + switch (actual.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("child data", expected.getDataAfter(), actual.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes()); + break; + default: + fail("Unexpect root type " + actual.getModificationType()); + break; + } + } + + @Before + public void setUp() { + final YangInstanceIdentifier writePath = TestModel.TEST_PATH; + final NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData); + } + + @Test + public void testCandidateSerialization() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertEquals("payload size", 141, payload.size()); + } + + @Test + public void testCandidateSerDes() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertCandidateEquals(candidate, payload.getCandidate()); + } + + @Test + public void testPayloadSerDes() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 22ce50b90d..3d28672c9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -25,7 +25,6 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; @@ -34,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -88,11 +86,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -100,6 +96,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -108,6 +111,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { + private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); @Test public void testRegisterChangeListener() throws Exception { @@ -379,10 +383,25 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testRecovery() throws Exception { + public void testApplyStateWithCandidatePayload() throws Exception { - // Set up the InMemorySnapshotStore. + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); + + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + DataTreeCandidatePayload.create(candidate))); + + shard.underlyingActor().onReceiveCommand(applyState); + + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + DataTree setupInMemorySnapshotStore() throws DataValidationFailedException { DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); testStore.setSchemaContext(SCHEMA_CONTEXT); @@ -393,6 +412,55 @@ public class ShardTest extends AbstractShardTest { InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( SerializationUtils.serializeNormalizedNode(root), Collections.emptyList(), 0, 1, -1, -1)); + return testStore; + } + + private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException { + source.validate(mod); + final DataTreeCandidate candidate = source.prepare(mod); + source.commit(candidate); + return DataTreeCandidatePayload.create(candidate); + } + + @Test + public void testDataTreeCandidateRecovery() throws Exception { + // Set up the InMemorySnapshotStore. + final DataTree source = setupInMemorySnapshotStore(); + + final DataTreeModification writeMod = source.takeSnapshot().newModification(); + writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + // Set up the InMemoryJournal. + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + + // Add some ModificationPayload entries + for (int i = 1; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + + final DataTreeModification mod = source.takeSnapshot().newModification(); + mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); + + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + payloadForModification(source, mod))); + } + + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + new ApplyJournalEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @Test + public void testModicationRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + setupInMemorySnapshotStore(); // Set up the InMemoryJournal. @@ -420,7 +488,7 @@ public class ShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { + private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); @@ -429,7 +497,6 @@ public class ShardTest extends AbstractShardTest { return new ModificationPayload(compMod); } - @SuppressWarnings({ "unchecked" }) @Test public void testConcurrentThreePhaseCommits() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -445,19 +512,19 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -605,12 +672,12 @@ public class ShardTest extends AbstractShardTest { }}; } - private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, + private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); } - private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, + private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); @@ -631,10 +698,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -699,10 +766,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -745,7 +812,7 @@ public class ShardTest extends AbstractShardTest { } @SuppressWarnings("unchecked") - private void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { + private static void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", @@ -818,6 +885,8 @@ public class ShardTest extends AbstractShardTest { final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); new ShardTestKit(getSystem()) {{ Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override public Shard create() throws Exception { return new Shard(shardID, Collections.emptyMap(), @@ -867,7 +936,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -909,7 +978,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -945,6 +1014,25 @@ public class ShardTest extends AbstractShardTest { }}; } + private static DataTreeCandidateTip mockCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType(); + doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + + private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + @Test public void testCommitWhenTransactionHasNoModifications(){ // Note that persistence is enabled which would normally result in the entry getting written to the journal @@ -959,10 +1047,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1014,10 +1103,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build())); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1070,14 +1160,15 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1146,13 +1237,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1222,7 +1313,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1270,7 +1361,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1320,7 +1411,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1339,6 +1430,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1362,7 +1458,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1381,6 +1477,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1404,10 +1505,10 @@ public class ShardTest extends AbstractShardTest { ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final String transactionID = "tx1"; - Function> preCommit = - new Function>() { + Function> preCommit = + new Function>() { @Override - public ListenableFuture apply(final DOMStoreThreePhaseCommitCohort cohort) { + public ListenableFuture apply(final ShardDataTreeCohort cohort) { ListenableFuture preCommitFuture = cohort.preCommit(); // Simulate an AbortTransaction message occurring during replication, after @@ -1425,7 +1526,7 @@ public class ShardTest extends AbstractShardTest { }; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); @@ -1475,7 +1576,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -1487,7 +1588,7 @@ public class ShardTest extends AbstractShardTest { MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, listNodePath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), modification2); @@ -1546,19 +1647,19 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); // Ready the Tx's @@ -1620,13 +1721,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1782,29 +1883,29 @@ public class ShardTest extends AbstractShardTest { /** * This test simply verifies that the applySnapShot logic will work * @throws ReadFailedException + * @throws DataValidationFailedException */ @Test - public void testInMemoryDataStoreRestore() throws ReadFailedException { - InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor()); - - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); - DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction(); + DataTreeModification putTransaction = store.takeSnapshot().newModification(); putTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - commitTransaction(putTransaction); + commitTransaction(store, putTransaction); - NormalizedNode expected = readStore(store); + NormalizedNode expected = readStore(store, YangInstanceIdentifier.builder().build()); - DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + DataTreeModification writeTransaction = store.takeSnapshot().newModification(); writeTransaction.delete(YangInstanceIdentifier.builder().build()); writeTransaction.write(YangInstanceIdentifier.builder().build(), expected); - commitTransaction(writeTransaction); + commitTransaction(store, writeTransaction); - NormalizedNode actual = readStore(store); + NormalizedNode actual = readStore(store, YangInstanceIdentifier.builder().build()); assertEquals(expected, actual); } @@ -1913,15 +2014,9 @@ public class ShardTest extends AbstractShardTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - private void commitTransaction(final DOMStoreWriteTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - ListenableFuture future = - commitCohort.preCommit(); - try { - future.get(); - future = commitCohort.commit(); - future.get(); - } catch (InterruptedException | ExecutionException e) { - } + private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException { + modification.ready(); + store.validate(modification); + store.commit(store.prepare(modification)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 96cd3e45eb..a2309be48f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -32,6 +32,7 @@ import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; @@ -57,7 +58,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -219,19 +219,19 @@ public class PreLithiumShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java index bbfff70e2d..7016ada525 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.cluster.datastore.modification; import static org.junit.Assert.assertEquals; -import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.lang3.SerializationUtils; import org.junit.Test; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -43,8 +43,7 @@ public class ModificationPayloadTest { assertEquals("getPath", writePath, write.getPath()); assertEquals("getData", writeData, write.getData()); - ModificationPayload cloned = - (ModificationPayload) SerializationUtils.clone(payload); + ModificationPayload cloned = SerializationUtils.clone(payload); deserialized = (MutableCompositeModification) payload.getModification();