From: Tony Tkacik Date: Thu, 23 Apr 2015 18:01:05 +0000 (+0000) Subject: Merge "IMDS: trim down commit overhead" X-Git-Tag: release/lithium~232 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=53090b106117e873f6a69a04290ee3bdb2cdf975;hp=7fde47bbf398e73144edfd14437ca72fd12e1daa Merge "IMDS: trim down commit overhead" --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java index 847954816c..9a916625c9 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java +++ b/opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java @@ -201,14 +201,16 @@ public class SnapshotManager implements SnapshotState { LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber); + SnapshotManager.this.currentState = CREATING; + try { createSnapshotProcedure.apply(null); } catch (Exception e) { + SnapshotManager.this.currentState = IDLE; LOG.error("Error creating snapshot", e); return false; } - SnapshotManager.this.currentState = CREATING; return true; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java new file mode 100644 index 0000000000..c3940e5256 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataTreeCandidateNode.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * Abstract base class for our internal implementation of {@link DataTreeCandidateNode}, + * which we instantiate from a serialized stream. We do not retain the before-image and + * do not implement {@link #getModifiedChild(PathArgument)}, as that method is only + * useful for end users. Instances based on this class should never be leaked outside of + * this component. + */ +abstract class AbstractDataTreeCandidateNode implements DataTreeCandidateNode { + private final ModificationType type; + + protected AbstractDataTreeCandidateNode(final ModificationType type) { + this.type = Preconditions.checkNotNull(type); + } + + @Override + public final DataTreeCandidateNode getModifiedChild(final PathArgument identifier) { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public final ModificationType getModificationType() { + return type; + } + + @Override + public final Optional> getDataBefore() { + throw new UnsupportedOperationException("Before-image not available after serialization"); + } + + static DataTreeCandidateNode createUnmodified() { + return new AbstractDataTreeCandidateNode(ModificationType.UNMODIFIED) { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + + @Override + public Optional> getDataAfter() { + throw new UnsupportedOperationException("After-image not available after serialization"); + } + + @Override + public Collection getChildNodes() { + throw new UnsupportedOperationException("Children not available after serialization"); + } + }; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java new file mode 100644 index 0000000000..4b471cfa4a --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ChainedCommitCohort extends ShardDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(ChainedCommitCohort.class); + private final ReadWriteShardDataTreeTransaction transaction; + private final ShardDataTreeTransactionChain chain; + private final ShardDataTreeCohort delegate; + + ChainedCommitCohort(final ShardDataTreeTransactionChain chain, final ReadWriteShardDataTreeTransaction transaction, final ShardDataTreeCohort delegate) { + this.transaction = Preconditions.checkNotNull(transaction); + this.delegate = Preconditions.checkNotNull(delegate); + this.chain = Preconditions.checkNotNull(chain); + } + + @Override + public ListenableFuture commit() { + final ListenableFuture ret = delegate.commit(); + + Futures.addCallback(ret, new FutureCallback() { + @Override + public void onSuccess(Void result) { + chain.clearTransaction(transaction); + LOG.debug("Committed transaction {}", transaction); + } + + @Override + public void onFailure(Throwable t) { + LOG.error("Transaction {} commit failed, cannot recover", transaction, t); + } + }); + + return ret; + } + + @Override + public ListenableFuture canCommit() { + return delegate.canCommit(); + } + + @Override + public ListenableFuture preCommit() { + return delegate.preCommit(); + } + + @Override + public ListenableFuture abort() { + return delegate.abort(); + } + + @Override + DataTreeCandidateTip getCandidate() { + return delegate.getCandidate(); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java new file mode 100644 index 0000000000..54167b2011 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayload.java @@ -0,0 +1,222 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.io.ByteArrayDataInput; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; +import com.google.protobuf.GeneratedMessage.GeneratedExtension; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeInputStreamReader; +import org.opendaylight.controller.cluster.datastore.node.utils.stream.NormalizedNodeOutputStreamWriter; +import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; +import org.opendaylight.controller.protobuff.messages.cluster.raft.AppendEntriesMessages.AppendEntries; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNodes; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DataTreeCandidatePayload extends Payload implements Externalizable { + private static final Logger LOG = LoggerFactory.getLogger(DataTreeCandidatePayload.class); + private static final long serialVersionUID = 1L; + private static final byte DELETE = 0; + private static final byte SUBTREE_MODIFIED = 1; + private static final byte UNMODIFIED = 2; + private static final byte WRITE = 3; + + private transient byte[] serialized; + + public DataTreeCandidatePayload() { + // Required by Externalizable + } + + private DataTreeCandidatePayload(final byte[] serialized) { + this.serialized = Preconditions.checkNotNull(serialized); + } + + private static void writeChildren(final NormalizedNodeOutputStreamWriter writer, final DataOutput out, + final Collection children) throws IOException { + out.writeInt(children.size()); + for (DataTreeCandidateNode child : children) { + writeNode(writer, out, child); + } + } + + private static void writeNode(final NormalizedNodeOutputStreamWriter writer, final DataOutput out, + final DataTreeCandidateNode node) throws IOException { + switch (node.getModificationType()) { + case DELETE: + out.writeByte(DELETE); + writer.writePathArgument(node.getIdentifier()); + break; + case SUBTREE_MODIFIED: + out.writeByte(SUBTREE_MODIFIED); + writer.writePathArgument(node.getIdentifier()); + writeChildren(writer, out, node.getChildNodes()); + break; + case WRITE: + out.writeByte(WRITE); + writer.writeNormalizedNode(node.getDataAfter().get()); + break; + case UNMODIFIED: + throw new IllegalArgumentException("Unmodified candidate should never be in the payload"); + default: + throw new IllegalArgumentException("Unhandled node type " + node.getModificationType()); + } + } + + static DataTreeCandidatePayload create(DataTreeCandidate candidate) { + final ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try (final NormalizedNodeOutputStreamWriter writer = new NormalizedNodeOutputStreamWriter(out)) { + writer.writeYangInstanceIdentifier(candidate.getRootPath()); + + final DataTreeCandidateNode node = candidate.getRootNode(); + switch (node.getModificationType()) { + case DELETE: + out.writeByte(DELETE); + break; + case SUBTREE_MODIFIED: + out.writeByte(SUBTREE_MODIFIED); + writeChildren(writer, out, node.getChildNodes()); + break; + case UNMODIFIED: + out.writeByte(UNMODIFIED); + break; + case WRITE: + out.writeByte(WRITE); + writer.writeNormalizedNode(node.getDataAfter().get()); + break; + default: + throw new IllegalArgumentException("Unhandled node type " + node.getModificationType()); + } + + writer.close(); + } catch (IOException e) { + throw new IllegalArgumentException(String.format("Failed to serialize candidate %s", candidate), e); + } + + return new DataTreeCandidatePayload(out.toByteArray()); + } + + private static Collection readChildren(final NormalizedNodeInputStreamReader reader, + final DataInput in) throws IOException { + final int size = in.readInt(); + if (size != 0) { + final Collection ret = new ArrayList<>(size); + for (int i = 0; i < size; ++i) { + final DataTreeCandidateNode child = readNode(reader, in); + if (child != null) { + ret.add(child); + } + } + return ret; + } else { + return Collections.emptyList(); + } + } + + private static DataTreeCandidateNode readNode(final NormalizedNodeInputStreamReader reader, + final DataInput in) throws IOException { + final byte type = in.readByte(); + switch (type) { + case DELETE: + return DeletedDataTreeCandidateNode.create(reader.readPathArgument()); + case SUBTREE_MODIFIED: + final PathArgument identifier = reader.readPathArgument(); + final Collection children = readChildren(reader, in); + if (children.isEmpty()) { + LOG.debug("Modified node {} does not have any children, not instantiating it", identifier); + return null; + } else { + return ModifiedDataTreeCandidateNode.create(identifier, children); + } + case UNMODIFIED: + return null; + case WRITE: + return DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode()); + default: + throw new IllegalArgumentException("Unhandled node type " + type); + } + } + + private static DataTreeCandidate parseCandidate(final ByteArrayDataInput in) throws IOException { + final NormalizedNodeInputStreamReader reader = new NormalizedNodeInputStreamReader(in); + final YangInstanceIdentifier rootPath = reader.readYangInstanceIdentifier(); + final byte type = in.readByte(); + + final DataTreeCandidateNode rootNode; + switch (type) { + case DELETE: + rootNode = DeletedDataTreeCandidateNode.create(); + break; + case SUBTREE_MODIFIED: + rootNode = ModifiedDataTreeCandidateNode.create(readChildren(reader, in)); + break; + case WRITE: + rootNode = DataTreeCandidateNodes.fromNormalizedNode(reader.readNormalizedNode()); + break; + default: + throw new IllegalArgumentException("Unhandled node type " + type); + } + + return DataTreeCandidates.newDataTreeCandidate(rootPath, rootNode); + } + + DataTreeCandidate getCandidate() throws IOException { + return parseCandidate(ByteStreams.newDataInput(serialized)); + } + + @Override + @Deprecated + @SuppressWarnings("rawtypes") + public Map encode() { + return null; + } + + @Override + @Deprecated + public Payload decode(final AppendEntries.ReplicatedLogEntry.Payload payload) { + return null; + } + + @Override + public int size() { + return serialized.length; + } + + @Override + public void writeExternal(ObjectOutput out) throws IOException { + out.writeByte((byte)serialVersionUID); + out.writeInt(serialized.length); + out.write(serialized); + } + + @Override + public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + final long version = in.readByte(); + Preconditions.checkArgument(version == serialVersionUID, "Unsupported serialization version %s", version); + + final int length = in.readInt(); + serialized = new byte[length]; + in.readFully(serialized); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java new file mode 100644 index 0000000000..2df380b391 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DeletedDataTreeCandidateNode.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * A deserialized {@link DataTreeCandidateNode} which represents a deletion. + */ +abstract class DeletedDataTreeCandidateNode extends AbstractDataTreeCandidateNode { + private DeletedDataTreeCandidateNode() { + super(ModificationType.DELETE); + } + + static DataTreeCandidateNode create() { + return new DeletedDataTreeCandidateNode() { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + }; + } + + static DataTreeCandidateNode create(final PathArgument identifier) { + return new DeletedDataTreeCandidateNode() { + @Override + public final PathArgument getIdentifier() { + return identifier; + } + }; + } + + @Override + public final Optional> getDataAfter() { + return Optional.absent(); + } + + @Override + public final Collection getChildNodes() { + // We would require the before-image to reconstruct the list of nodes which + // were deleted. + throw new UnsupportedOperationException("Children not available after serialization"); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java new file mode 100644 index 0000000000..208ec33967 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ModifiedDataTreeCandidateNode.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import java.util.Collection; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; + +/** + * A deserialized {@link DataTreeCandidateNode} which represents a modification in + * one of its children. + */ +abstract class ModifiedDataTreeCandidateNode extends AbstractDataTreeCandidateNode { + private final Collection children; + + private ModifiedDataTreeCandidateNode(final Collection children) { + super(ModificationType.SUBTREE_MODIFIED); + this.children = Preconditions.checkNotNull(children); + } + + static DataTreeCandidateNode create(final Collection children) { + return new ModifiedDataTreeCandidateNode(children) { + @Override + public PathArgument getIdentifier() { + throw new UnsupportedOperationException("Root node does not have an identifier"); + } + }; + } + + static DataTreeCandidateNode create(final PathArgument identifier, final Collection children) { + return new ModifiedDataTreeCandidateNode(children) { + @Override + public final PathArgument getIdentifier() { + return identifier; + } + }; + } + + @Override + public final Optional> getDataAfter() { + throw new UnsupportedOperationException("After-image not available after serialization"); + } + + @Override + public final Collection getChildNodes() { + return children; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java index 0f3ab61041..cb17335caf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java @@ -8,7 +8,6 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction { @@ -26,7 +25,7 @@ final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTrans parent.abortTransaction(this); } - DOMStoreThreePhaseCommitCohort ready() { + ShardDataTreeCohort ready() { Preconditions.checkState(close(), "Transaction is already closed"); return parent.finishTransaction(this); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index b53d12c0c8..62d3259a71 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -63,6 +63,9 @@ import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyn import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.model.api.SchemaContext; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; @@ -291,15 +294,21 @@ public class Shard extends RaftActor { } } + private static boolean isEmptyCommit(final DataTreeCandidate candidate) { + return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType()); + } + void continueCommit(final CohortEntry cohortEntry) throws Exception { + final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate(); + // If we do not have any followers and we are not using persistence // or if cohortEntry has no modifications // we can apply modification to the state immediately - if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){ - applyModificationToState(getSender(), cohortEntry.getTransactionID(), cohortEntry.getModification()); + if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) { + applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate); } else { Shard.this.persistData(getSender(), cohortEntry.getTransactionID(), - new ModificationPayload(cohortEntry.getModification())); + DataTreeCandidatePayload.create(candidate)); } } @@ -309,12 +318,37 @@ public class Shard extends RaftActor { } } + private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) { + LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); + + try { + // We block on the future here so we don't have to worry about possibly accessing our + // state on a different thread outside of our dispatcher. Also, the data store + // currently uses a same thread executor anyway. + cohortEntry.getCohort().commit().get(); + + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); + + shardMBean.incrementCommittedTransactionCount(); + shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); + + } catch (Exception e) { + sender.tell(new akka.actor.Status.Failure(e), getSelf()); + + LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), + transactionID, e); + shardMBean.incrementFailedTransactionsCount(); + } finally { + commitCoordinator.currentTransactionComplete(transactionID, true); + } + } + private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) { // With persistence enabled, this method is called via applyState by the leader strategy // after the commit has been replicated to a majority of the followers. CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID); - if(cohortEntry == null) { + if (cohortEntry == null) { // The transaction is no longer the current commit. This can happen if the transaction // was aborted prior, most likely due to timeout in the front-end. We need to finish // committing the transaction though since it was successfully persisted and replicated @@ -323,7 +357,13 @@ public class Shard extends RaftActor { // transaction. cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID); if(cohortEntry != null) { - commitWithNewTransaction(cohortEntry.getModification()); + try { + store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate()); + } catch (DataValidationFailedException e) { + shardMBean.incrementFailedTransactionsCount(); + LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e); + } + sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); } else { // This really shouldn't happen - it likely means that persistence or replication @@ -334,31 +374,8 @@ public class Shard extends RaftActor { LOG.error(ex.getMessage()); sender.tell(new akka.actor.Status.Failure(ex), getSelf()); } - - return; - } - - LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID()); - - try { - // We block on the future here so we don't have to worry about possibly accessing our - // state on a different thread outside of our dispatcher. Also, the data store - // currently uses a same thread executor anyway. - cohortEntry.getCohort().commit().get(); - - sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf()); - - shardMBean.incrementCommittedTransactionCount(); - shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis()); - - } catch (Exception e) { - sender.tell(new akka.actor.Status.Failure(e), getSelf()); - - LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(), - transactionID, e); - shardMBean.incrementFailedTransactionsCount(); - } finally { - commitCoordinator.currentTransactionComplete(transactionID, true); + } else { + finishCommit(sender, transactionID, cohortEntry); } } @@ -556,15 +573,25 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { - - if(data instanceof ModificationPayload) { + if (data instanceof DataTreeCandidatePayload) { + if (clientActor == null) { + // No clientActor indicates a replica coming from the leader + try { + store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate()); + } catch (DataValidationFailedException | IOException e) { + LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e); + } + } else { + // Replication consensus reached, proceed to commit + finishCommit(clientActor, identifier); + } + } else if (data instanceof ModificationPayload) { try { applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); } catch (ClassNotFoundException | IOException e) { LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e); } - } - else if (data instanceof CompositeModificationPayload) { + } else if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); applyModificationToState(clientActor, identifier, modification); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 0eb48fd180..30947fa666 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -30,7 +30,6 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.slf4j.Logger; /** @@ -42,7 +41,7 @@ public class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the DOMStoreThreePhaseCommitCohorts. public interface CohortDecorator { - DOMStoreThreePhaseCommitCohort decorate(String transactionID, DOMStoreThreePhaseCommitCohort actual); + ShardDataTreeCohort decorate(String transactionID, ShardDataTreeCohort actual); } private final Cache cohortCache; @@ -413,8 +412,7 @@ public class ShardCommitCoordinator { static class CohortEntry { private final String transactionID; - private DOMStoreThreePhaseCommitCohort cohort; - private final MutableCompositeModification compositeModification; + private ShardDataTreeCohort cohort; private final ReadWriteShardDataTreeTransaction transaction; private ActorRef replySender; private Shard shard; @@ -422,16 +420,14 @@ public class ShardCommitCoordinator { private boolean doImmediateCommit; CohortEntry(String transactionID, ReadWriteShardDataTreeTransaction transaction) { - this.compositeModification = new MutableCompositeModification(); this.transaction = Preconditions.checkNotNull(transaction); this.transactionID = transactionID; } - CohortEntry(String transactionID, DOMStoreThreePhaseCommitCohort cohort, + CohortEntry(String transactionID, ShardDataTreeCohort cohort, MutableCompositeModification compositeModification) { this.transactionID = transactionID; this.cohort = cohort; - this.compositeModification = compositeModification; this.transaction = null; } @@ -447,17 +443,12 @@ public class ShardCommitCoordinator { return transactionID; } - DOMStoreThreePhaseCommitCohort getCohort() { + ShardDataTreeCohort getCohort() { return cohort; } - MutableCompositeModification getModification() { - return compositeModification; - } - void applyModifications(Iterable modifications) { - for(Modification modification: modifications) { - compositeModification.addModification(modification); + for (Modification modification : modifications) { modification.apply(transaction.getSnapshot()); } } @@ -498,9 +489,5 @@ public class ShardCommitCoordinator { void setShard(Shard shard) { this.shard = shard; } - - boolean hasModifications(){ - return compositeModification.getModifications().size() > 0; - } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 373bf499e0..fbe699223c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -21,14 +21,13 @@ import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.DOMImmutableDataChangeEvent; import org.opendaylight.controller.md.sal.dom.store.impl.ResolveDataChangeEventsTask; import org.opendaylight.controller.md.sal.dom.store.impl.tree.ListenerTree; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.opendaylight.yangtools.yang.data.api.schema.tree.TipProducingDataTree; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -93,7 +92,7 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent { return ensureTransactionChain(chainId).newReadWriteTransaction(txId); } - void notifyListeners(final DataTreeCandidateTip candidate) { + void notifyListeners(final DataTreeCandidate candidate) { LOG.debug("Notifying listeners on candidate {}", candidate); // DataTreeChanges first, as they are more light-weight @@ -152,15 +151,30 @@ public final class ShardDataTree extends ShardDataTreeTransactionParent { return new SimpleEntry<>(reg, event); } + void applyForeignCandidate(final String identifier, final DataTreeCandidate foreign) throws DataValidationFailedException { + LOG.debug("Applying foreign transaction {}", identifier); + + final DataTreeModification mod = dataTree.takeSnapshot().newModification(); + DataTreeCandidates.applyToModification(mod, foreign); + mod.ready(); + + LOG.trace("Applying foreign modification {}", mod); + dataTree.validate(mod); + final DataTreeCandidate candidate = dataTree.prepare(mod); + dataTree.commit(candidate); + notifyListeners(candidate); + } + @Override void abortTransaction(final AbstractShardDataTreeTransaction transaction) { // Intentional no-op } @Override - DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return new ShardDataTreeCohort(this, snapshot); + return new SimpleShardDataTreeCohort(this, snapshot); } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 11b3ca8ed7..213e36a570 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -7,72 +7,23 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.Futures; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFuture; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -final class ShardDataTreeCohort implements DOMStoreThreePhaseCommitCohort { - private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeCohort.class); - private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); - private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); - private final DataTreeModification transaction; - private final ShardDataTree dataTree; - private DataTreeCandidateTip candidate; - - ShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { - this.dataTree = Preconditions.checkNotNull(dataTree); - this.transaction = Preconditions.checkNotNull(transaction); - } - - @Override - public ListenableFuture canCommit() { - try { - dataTree.getDataTree().validate(transaction); - LOG.debug("Transaction {} validated", transaction); - return TRUE_FUTURE; - } catch (Exception e) { - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture preCommit() { - try { - candidate = dataTree.getDataTree().prepare(transaction); - /* - * FIXME: this is the place where we should be interacting with persistence, specifically by invoking - * persist on the candidate (which gives us a Future). - */ - LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); - return VOID_FUTURE; - } catch (Exception e) { - LOG.debug("Transaction {} failed to prepare", transaction, e); - return Futures.immediateFailedFuture(e); - } - } - - @Override - public ListenableFuture abort() { - // No-op, really - return VOID_FUTURE; +public abstract class ShardDataTreeCohort { + ShardDataTreeCohort() { + // Prevent foreign instantiation } - @Override - public ListenableFuture commit() { - try { - dataTree.getDataTree().commit(candidate); - } catch (Exception e) { - LOG.error("Transaction {} failed to commit", transaction, e); - return Futures.immediateFailedFuture(e); - } + abstract DataTreeCandidateTip getCandidate(); - LOG.debug("Transaction {} committed, proceeding to notify", transaction); - dataTree.notifyListeners(candidate); - return VOID_FUTURE; - } + @VisibleForTesting + public abstract ListenableFuture canCommit(); + @VisibleForTesting + public abstract ListenableFuture preCommit(); + @VisibleForTesting + public abstract ListenableFuture abort(); + @VisibleForTesting + public abstract ListenableFuture commit(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index 780d940128..183c2192e4 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -9,12 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.ForwardingDOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,17 +72,17 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent } @Override - protected DOMStoreThreePhaseCommitCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { + protected ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) { Preconditions.checkState(openTransaction != null, "Attempted to finish transaction %s while none is outstanding", transaction); // dataTree is finalizing ready the transaction, we just record it for the next // transaction in chain - final DOMStoreThreePhaseCommitCohort delegate = dataTree.finishTransaction(transaction); + final ShardDataTreeCohort delegate = dataTree.finishTransaction(transaction); openTransaction = null; previousTx = transaction; LOG.debug("Committing transaction {}", transaction); - return new CommitCohort(transaction, delegate); + return new ChainedCommitCohort(this, transaction, delegate); } @Override @@ -95,40 +90,9 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent return MoreObjects.toStringHelper(this).add("id", chainId).toString(); } - private final class CommitCohort extends ForwardingDOMStoreThreePhaseCommitCohort { - private final ReadWriteShardDataTreeTransaction transaction; - private final DOMStoreThreePhaseCommitCohort delegate; - - CommitCohort(final ReadWriteShardDataTreeTransaction transaction, final DOMStoreThreePhaseCommitCohort delegate) { - this.transaction = Preconditions.checkNotNull(transaction); - this.delegate = Preconditions.checkNotNull(delegate); - } - - @Override - protected DOMStoreThreePhaseCommitCohort delegate() { - return delegate; - } - - @Override - public ListenableFuture commit() { - final ListenableFuture ret = super.commit(); - - Futures.addCallback(ret, new FutureCallback() { - @Override - public void onSuccess(Void result) { - if (transaction.equals(previousTx)) { - previousTx = null; - } - LOG.debug("Committed transaction {}", transaction); - } - - @Override - public void onFailure(Throwable t) { - LOG.error("Transaction {} commit failed, cannot recover", transaction, t); - } - }); - - return ret; + void clearTransaction(ReadWriteShardDataTreeTransaction transaction) { + if (transaction.equals(previousTx)) { + previousTx = null; } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java index 6cc1408eae..ee04aff515 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java @@ -7,9 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; - abstract class ShardDataTreeTransactionParent { abstract void abortTransaction(AbstractShardDataTreeTransaction transaction); - abstract DOMStoreThreePhaseCommitCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); + abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java index f9d3050015..797641978d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardRecoveryCoordinator.java @@ -7,9 +7,7 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.collect.Lists; import java.io.IOException; -import java.util.List; import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; @@ -17,10 +15,10 @@ import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; import org.slf4j.Logger; @@ -36,52 +34,55 @@ import org.slf4j.Logger; */ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { private static final YangInstanceIdentifier ROOT = YangInstanceIdentifier.builder().build(); - private final ShardDataTree store; - private List currentLogRecoveryBatch; + private final DataTree store; private final String shardName; private final Logger log; + private DataTreeModification transaction; + private int size; ShardRecoveryCoordinator(ShardDataTree store, String shardName, Logger log) { - this.store = store; + this.store = store.getDataTree(); this.shardName = shardName; this.log = log; } @Override public void startLogRecoveryBatch(int maxBatchSize) { - currentLogRecoveryBatch = Lists.newArrayListWithCapacity(maxBatchSize); - log.debug("{}: starting log recovery batch with max size {}", shardName, maxBatchSize); + transaction = store.takeSnapshot().newModification(); + size = 0; } @Override public void appendRecoveredLogEntry(Payload payload) { try { - if(payload instanceof ModificationPayload) { - currentLogRecoveryBatch.add((ModificationPayload) payload); + if (payload instanceof DataTreeCandidatePayload) { + DataTreeCandidates.applyToModification(transaction, ((DataTreeCandidatePayload)payload).getCandidate()); + size++; + } else if (payload instanceof ModificationPayload) { + MutableCompositeModification.fromSerializable( + ((ModificationPayload) payload).getModification()).apply(transaction); + size++; } else if (payload instanceof CompositeModificationPayload) { - currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( - ((CompositeModificationPayload) payload).getModification()))); + MutableCompositeModification.fromSerializable( + ((CompositeModificationPayload) payload).getModification()).apply(transaction); + size++; } else if (payload instanceof CompositeModificationByteStringPayload) { - currentLogRecoveryBatch.add(new ModificationPayload(MutableCompositeModification.fromSerializable( - ((CompositeModificationByteStringPayload) payload).getModification()))); + MutableCompositeModification.fromSerializable( + ((CompositeModificationByteStringPayload) payload).getModification()).apply(transaction); + size++; } else { log.error("{}: Unknown payload {} received during recovery", shardName, payload); } - } catch (IOException e) { + } catch (IOException | ClassNotFoundException e) { log.error("{}: Error extracting ModificationPayload", shardName, e); } - } - private void commitTransaction(ReadWriteShardDataTreeTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); - try { - commitCohort.preCommit().get(); - commitCohort.commit().get(); - } catch (Exception e) { - log.error("{}: Failed to commit Tx on recovery", shardName, e); - } + private void commitTransaction(DataTreeModification tx) throws DataValidationFailedException { + tx.ready(); + store.validate(tx); + store.commit(store.prepare(tx)); } /** @@ -89,21 +90,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { */ @Override public void applyCurrentLogRecoveryBatch() { - log.debug("{}: Applying current log recovery batch with size {}", shardName, currentLogRecoveryBatch.size()); - - ReadWriteShardDataTreeTransaction writeTx = store.newReadWriteTransaction(shardName + "-recovery", null); - DataTreeModification snapshot = writeTx.getSnapshot(); - for (ModificationPayload payload : currentLogRecoveryBatch) { - try { - MutableCompositeModification.fromSerializable(payload.getModification()).apply(snapshot); - } catch (Exception e) { - log.error("{}: Error extracting ModificationPayload", shardName, e); - } + log.debug("{}: Applying current log recovery batch with size {}", shardName, size); + try { + commitTransaction(transaction); + } catch (DataValidationFailedException e) { + log.error("{}: Failed to apply recovery batch", shardName, e); } - - commitTransaction(writeTx); - - currentLogRecoveryBatch = null; + transaction = null; } /** @@ -115,19 +108,13 @@ class ShardRecoveryCoordinator implements RaftActorRecoveryCohort { public void applyRecoverySnapshot(final byte[] snapshotBytes) { log.debug("{}: Applying recovered snapshot", shardName); - // Intentionally bypass normal transaction to side-step persistence/replication - final DataTree tree = store.getDataTree(); - DataTreeModification writeTx = tree.takeSnapshot().newModification(); - - NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); - - writeTx.write(ROOT, node); - writeTx.ready(); + final NormalizedNode node = SerializationUtils.deserializeNormalizedNode(snapshotBytes); + final DataTreeModification tx = store.takeSnapshot().newModification(); + tx.write(ROOT, node); try { - tree.validate(writeTx); - tree.commit(tree.prepare(writeTx)); + commitTransaction(tx); } catch (DataValidationFailedException e) { - log.error("{}: Failed to validate recovery snapshot", shardName, e); + log.error("{}: Failed to apply recovery snapshot", shardName, e); } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java index 35d8e922f2..600509a26b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardSnapshotCohort.java @@ -14,7 +14,6 @@ import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactio import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils; import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.slf4j.Logger; @@ -86,7 +85,7 @@ class ShardSnapshotCohort implements RaftActorSnapshotCohort { void syncCommitTransaction(final ReadWriteShardDataTreeTransaction transaction) throws ExecutionException, InterruptedException { - DOMStoreThreePhaseCommitCohort commitCohort = store.finishTransaction(transaction); + ShardDataTreeCohort commitCohort = store.finishTransaction(transaction); commitCohort.preCommit().get(); commitCohort.commit().get(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 69a696f294..365f97dd3f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -31,7 +31,6 @@ import org.opendaylight.controller.cluster.datastore.modification.MergeModificat import org.opendaylight.controller.cluster.datastore.modification.Modification; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.modification.WriteModification; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; /** * @author: syedbahm @@ -197,7 +196,7 @@ public class ShardWriteTransaction extends ShardTransaction { LOG.debug("readyTransaction : {}", transactionID); - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + ShardDataTreeCohort cohort = transaction.ready(); getShardActor().forward(new ForwardedReadyTransaction(transactionID, getClientTxVersion(), cohort, compositeModification, returnSerialized, doImmediateCommit), getContext()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java new file mode 100644 index 0000000000..9f22ce8a73 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { + private static final Logger LOG = LoggerFactory.getLogger(SimpleShardDataTreeCohort.class); + private static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); + private static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final DataTreeModification transaction; + private final ShardDataTree dataTree; + private DataTreeCandidateTip candidate; + + SimpleShardDataTreeCohort(final ShardDataTree dataTree, final DataTreeModification transaction) { + this.dataTree = Preconditions.checkNotNull(dataTree); + this.transaction = Preconditions.checkNotNull(transaction); + } + + @Override + DataTreeCandidateTip getCandidate() { + return candidate; + } + + @Override + public ListenableFuture canCommit() { + try { + dataTree.getDataTree().validate(transaction); + LOG.debug("Transaction {} validated", transaction); + return TRUE_FUTURE; + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture preCommit() { + try { + candidate = dataTree.getDataTree().prepare(transaction); + /* + * FIXME: this is the place where we should be interacting with persistence, specifically by invoking + * persist on the candidate (which gives us a Future). + */ + LOG.debug("Transaction {} prepared candidate {}", transaction, candidate); + return VOID_FUTURE; + } catch (Exception e) { + LOG.debug("Transaction {} failed to prepare", transaction, e); + return Futures.immediateFailedFuture(e); + } + } + + @Override + public ListenableFuture abort() { + // No-op, really + return VOID_FUTURE; + } + + @Override + public ListenableFuture commit() { + try { + dataTree.getDataTree().commit(candidate); + } catch (Exception e) { + LOG.error("Transaction {} failed to commit", transaction, e); + return Futures.immediateFailedFuture(e); + } + + LOG.debug("Transaction {} committed, proceeding to notify", transaction); + dataTree.notifyListeners(candidate); + return VOID_FUTURE; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java index cdd7859a30..2f48ab9d1b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/ForwardedReadyTransaction.java @@ -7,8 +7,8 @@ */ package org.opendaylight.controller.cluster.datastore.messages; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.modification.Modification; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; /** * Transaction ReadyTransaction message that is forwarded to the local Shard from the ShardTransaction. @@ -17,14 +17,14 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh */ public class ForwardedReadyTransaction { private final String transactionID; - private final DOMStoreThreePhaseCommitCohort cohort; + private final ShardDataTreeCohort cohort; private final Modification modification; private final boolean returnSerialized; private final boolean doImmediateCommit; private final short txnClientVersion; public ForwardedReadyTransaction(String transactionID, short txnClientVersion, - DOMStoreThreePhaseCommitCohort cohort, Modification modification, + ShardDataTreeCohort cohort, Modification modification, boolean returnSerialized, boolean doImmediateCommit) { this.transactionID = transactionID; this.cohort = cohort; @@ -38,7 +38,7 @@ public class ForwardedReadyTransaction { return transactionID; } - public DOMStoreThreePhaseCommitCohort getCohort() { + public ShardDataTreeCohort getCohort() { return cohort; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index 03f2bb7ad0..1100f3a7fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -21,7 +21,6 @@ import akka.japi.Creator; import akka.testkit.TestActorRef; import com.google.common.base.Function; import com.google.common.base.Optional; -import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.util.Collections; @@ -42,10 +41,6 @@ import org.opendaylight.controller.cluster.datastore.modification.WriteModificat import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; -import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -53,6 +48,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; @@ -172,49 +168,35 @@ public abstract class AbstractShardTest extends AbstractActorTest{ Assert.fail(String.format("Expected last applied: %d, Actual: %d", expectedValue, lastApplied)); } - protected NormalizedNode readStore(final InMemoryDOMDataStore store) throws ReadFailedException { - DOMStoreReadTransaction transaction = store.newReadOnlyTransaction(); - CheckedFuture>, ReadFailedException> read = - transaction.read(YangInstanceIdentifier.builder().build()); - - Optional> optional = read.checkedGet(); - - NormalizedNode normalizedNode = optional.get(); - - transaction.close(); - - return normalizedNode; - } - - protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification) { return setupMockWriteTransaction(cohortName, dataStore, path, data, modification, null); } - protected DOMStoreThreePhaseCommitCohort setupMockWriteTransaction(final String cohortName, + protected ShardDataTreeCohort setupMockWriteTransaction(final String cohortName, final ShardDataTree dataStore, final YangInstanceIdentifier path, final NormalizedNode data, final MutableCompositeModification modification, - final Function> preCommit) { + final Function> preCommit) { ReadWriteShardDataTreeTransaction tx = dataStore.newReadWriteTransaction("setup-mock-" + cohortName, null); tx.getSnapshot().write(path, data); - DOMStoreThreePhaseCommitCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); + ShardDataTreeCohort cohort = createDelegatingMockCohort(cohortName, dataStore.finishTransaction(tx), preCommit); modification.addModification(new WriteModification(path, data)); return cohort; } - protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, - final DOMStoreThreePhaseCommitCohort actual) { + protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, + final ShardDataTreeCohort actual) { return createDelegatingMockCohort(cohortName, actual, null); } - protected DOMStoreThreePhaseCommitCohort createDelegatingMockCohort(final String cohortName, - final DOMStoreThreePhaseCommitCohort actual, - final Function> preCommit) { - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, cohortName); + protected ShardDataTreeCohort createDelegatingMockCohort(final String cohortName, + final ShardDataTreeCohort actual, + final Function> preCommit) { + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, cohortName); doAnswer(new Answer>() { @Override @@ -248,6 +230,13 @@ public abstract class AbstractShardTest extends AbstractActorTest{ } }).when(cohort).abort(); + doAnswer(new Answer() { + @Override + public DataTreeCandidateTip answer(final InvocationOnMock invocation) { + return actual.getCandidate(); + } + }).when(cohort).getCandidate(); + return cohort; } @@ -275,7 +264,7 @@ public abstract class AbstractShardTest extends AbstractActorTest{ ReadWriteShardDataTreeTransaction transaction = store.newReadWriteTransaction("writeToStore", null); transaction.getSnapshot().write(id, node); - DOMStoreThreePhaseCommitCohort cohort = transaction.ready(); + ShardDataTreeCohort cohort = transaction.ready(); cohort.canCommit().get(); cohort.preCommit().get(); cohort.commit(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java new file mode 100644 index 0000000000..781c3dba71 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCandidatePayloadTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import java.io.IOException; +import java.util.Collection; +import org.apache.commons.lang3.SerializationUtils; +import org.junit.Before; +import org.junit.Test; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; +import org.opendaylight.yangtools.yang.data.impl.schema.builder.impl.ImmutableContainerNodeBuilder; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; + +public class DataTreeCandidatePayloadTest { + private DataTreeCandidate candidate; + + private static DataTreeCandidateNode findNode(final Collection nodes, final PathArgument arg) { + for (DataTreeCandidateNode node : nodes) { + if (arg.equals(node.getIdentifier())) { + return node; + } + } + return null; + } + + private static void assertChildrenEquals(final Collection expected, + final Collection actual) { + // Make sure all expected nodes are there + for (DataTreeCandidateNode exp : expected) { + final DataTreeCandidateNode act = findNode(actual, exp.getIdentifier()); + assertNotNull("missing expected child", act); + assertCandidateNodeEquals(exp, act); + } + // Make sure no nodes are present which are not in the expected set + for (DataTreeCandidateNode act : actual) { + final DataTreeCandidateNode exp = findNode(expected, act.getIdentifier()); + assertNull("unexpected child", exp); + } + } + + private static void assertCandidateEquals(final DataTreeCandidate expected, final DataTreeCandidate actual) { + assertEquals("root path", expected.getRootPath(), actual.getRootPath()); + + final DataTreeCandidateNode expRoot = expected.getRootNode(); + final DataTreeCandidateNode actRoot = expected.getRootNode(); + assertEquals("root type", expRoot.getModificationType(), actRoot.getModificationType()); + + switch (actRoot.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("root data", expRoot.getDataAfter(), actRoot.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expRoot.getChildNodes(), actRoot.getChildNodes()); + break; + default: + fail("Unexpect root type " + actRoot.getModificationType()); + break; + } + + assertCandidateNodeEquals(expected.getRootNode(), actual.getRootNode()); + } + + private static void assertCandidateNodeEquals(final DataTreeCandidateNode expected, final DataTreeCandidateNode actual) { + assertEquals("child type", expected.getModificationType(), actual.getModificationType()); + assertEquals("child identifier", expected.getIdentifier(), actual.getIdentifier()); + + switch (actual.getModificationType()) { + case DELETE: + case WRITE: + assertEquals("child data", expected.getDataAfter(), actual.getDataAfter()); + break; + case SUBTREE_MODIFIED: + assertChildrenEquals(expected.getChildNodes(), actual.getChildNodes()); + break; + default: + fail("Unexpect root type " + actual.getModificationType()); + break; + } + } + + @Before + public void setUp() { + final YangInstanceIdentifier writePath = TestModel.TEST_PATH; + final NormalizedNode writeData = ImmutableContainerNodeBuilder.create().withNodeIdentifier( + new YangInstanceIdentifier.NodeIdentifier(TestModel.TEST_QNAME)). + withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo")).build(); + candidate = DataTreeCandidates.fromNormalizedNode(writePath, writeData); + } + + @Test + public void testCandidateSerialization() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertEquals("payload size", 141, payload.size()); + } + + @Test + public void testCandidateSerDes() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertCandidateEquals(candidate, payload.getCandidate()); + } + + @Test + public void testPayloadSerDes() throws IOException { + final DataTreeCandidatePayload payload = DataTreeCandidatePayload.create(candidate); + assertCandidateEquals(candidate, SerializationUtils.clone(payload).getCandidate()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index 22ce50b90d..3d28672c9f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -25,7 +25,6 @@ import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; @@ -34,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -88,11 +86,9 @@ import org.opendaylight.controller.md.cluster.datastore.model.SchemaContextHelpe import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; -import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.protobuff.messages.cohort3pc.ThreePhaseCommitCohortMessages; import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages.CreateTransactionReply; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; +import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode; @@ -100,6 +96,13 @@ import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; import org.opendaylight.yangtools.yang.data.api.schema.MapEntryNode; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidates; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType; import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes; import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -108,6 +111,7 @@ import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; public class ShardTest extends AbstractShardTest { + private static final QName CARS_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test:cars", "2014-03-13", "cars"); @Test public void testRegisterChangeListener() throws Exception { @@ -379,10 +383,25 @@ public class ShardTest extends AbstractShardTest { } @Test - public void testRecovery() throws Exception { + public void testApplyStateWithCandidatePayload() throws Exception { - // Set up the InMemorySnapshotStore. + TestActorRef shard = TestActorRef.create(getSystem(), newShardProps(), "testApplyState"); + + NormalizedNode node = ImmutableNodes.containerNode(TestModel.TEST_QNAME); + DataTreeCandidate candidate = DataTreeCandidates.fromNormalizedNode(TestModel.TEST_PATH, node); + + ApplyState applyState = new ApplyState(null, "test", new ReplicatedLogImplEntry(1, 2, + DataTreeCandidatePayload.create(candidate))); + + shard.underlyingActor().onReceiveCommand(applyState); + + NormalizedNode actual = readStore(shard, TestModel.TEST_PATH); + assertEquals("Applied state", node, actual); + + shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + DataTree setupInMemorySnapshotStore() throws DataValidationFailedException { DataTree testStore = InMemoryDataTreeFactory.getInstance().create(); testStore.setSchemaContext(SCHEMA_CONTEXT); @@ -393,6 +412,55 @@ public class ShardTest extends AbstractShardTest { InMemorySnapshotStore.addSnapshot(shardID.toString(), Snapshot.create( SerializationUtils.serializeNormalizedNode(root), Collections.emptyList(), 0, 1, -1, -1)); + return testStore; + } + + private static DataTreeCandidatePayload payloadForModification(DataTree source, DataTreeModification mod) throws DataValidationFailedException { + source.validate(mod); + final DataTreeCandidate candidate = source.prepare(mod); + source.commit(candidate); + return DataTreeCandidatePayload.create(candidate); + } + + @Test + public void testDataTreeCandidateRecovery() throws Exception { + // Set up the InMemorySnapshotStore. + final DataTree source = setupInMemorySnapshotStore(); + + final DataTreeModification writeMod = source.takeSnapshot().newModification(); + writeMod.write(TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build()); + + // Set up the InMemoryJournal. + InMemoryJournal.addEntry(shardID.toString(), 0, new ReplicatedLogImplEntry(0, 1, payloadForModification(source, writeMod))); + + int nListEntries = 16; + Set listEntryKeys = new HashSet<>(); + + // Add some ModificationPayload entries + for (int i = 1; i <= nListEntries; i++) { + listEntryKeys.add(Integer.valueOf(i)); + + YangInstanceIdentifier path = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) + .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i).build(); + + final DataTreeModification mod = source.takeSnapshot().newModification(); + mod.merge(path, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, i)); + + InMemoryJournal.addEntry(shardID.toString(), i, new ReplicatedLogImplEntry(i, 1, + payloadForModification(source, mod))); + } + + InMemoryJournal.addEntry(shardID.toString(), nListEntries + 1, + new ApplyJournalEntries(nListEntries)); + + testRecovery(listEntryKeys); + } + + @Test + public void testModicationRecovery() throws Exception { + + // Set up the InMemorySnapshotStore. + setupInMemorySnapshotStore(); // Set up the InMemoryJournal. @@ -420,7 +488,7 @@ public class ShardTest extends AbstractShardTest { testRecovery(listEntryKeys); } - private ModificationPayload newModificationPayload(final Modification... mods) throws IOException { + private static ModificationPayload newModificationPayload(final Modification... mods) throws IOException { MutableCompositeModification compMod = new MutableCompositeModification(); for(Modification mod: mods) { compMod.addModification(mod); @@ -429,7 +497,6 @@ public class ShardTest extends AbstractShardTest { return new ModificationPayload(compMod); } - @SuppressWarnings({ "unchecked" }) @Test public void testConcurrentThreePhaseCommits() throws Throwable { new ShardTestKit(getSystem()) {{ @@ -445,19 +512,19 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -605,12 +672,12 @@ public class ShardTest extends AbstractShardTest { }}; } - private BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, + private static BatchedModifications newBatchedModifications(String transactionID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { return newBatchedModifications(transactionID, null, path, data, ready, doCommitOnReady); } - private BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, + private static BatchedModifications newBatchedModifications(String transactionID, String transactionChainID, YangInstanceIdentifier path, NormalizedNode data, boolean ready, boolean doCommitOnReady) { BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION, transactionChainID); batched.addModification(new WriteModification(path, data)); @@ -631,10 +698,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -699,10 +766,10 @@ public class ShardTest extends AbstractShardTest { final String transactionID = "tx"; FiniteDuration duration = duration("5 seconds"); - final AtomicReference mockCohort = new AtomicReference<>(); + final AtomicReference mockCohort = new AtomicReference<>(); ShardCommitCoordinator.CohortDecorator cohortDecorator = new ShardCommitCoordinator.CohortDecorator() { @Override - public DOMStoreThreePhaseCommitCohort decorate(String txID, DOMStoreThreePhaseCommitCohort actual) { + public ShardDataTreeCohort decorate(String txID, ShardDataTreeCohort actual) { if(mockCohort.get() == null) { mockCohort.set(createDelegatingMockCohort("cohort", actual)); } @@ -745,7 +812,7 @@ public class ShardTest extends AbstractShardTest { } @SuppressWarnings("unchecked") - private void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { + private static void verifyOuterListEntry(final TestActorRef shard, Object expIDValue) throws Exception { NormalizedNode outerList = readStore(shard, TestModel.OUTER_LIST_PATH); assertNotNull(TestModel.OUTER_LIST_QNAME.getLocalName() + " not found", outerList); assertTrue(TestModel.OUTER_LIST_QNAME.getLocalName() + " value is not Iterable", @@ -818,6 +885,8 @@ public class ShardTest extends AbstractShardTest { final AtomicBoolean overrideLeaderCalls = new AtomicBoolean(); new ShardTestKit(getSystem()) {{ Creator creator = new Creator() { + private static final long serialVersionUID = 1L; + @Override public Shard create() throws Exception { return new Shard(shardID, Collections.emptyMap(), @@ -867,7 +936,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -909,7 +978,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx"; MutableCompositeModification modification = new MutableCompositeModification(); NormalizedNode containerNode = ImmutableNodes.containerNode(TestModel.TEST_QNAME); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort", dataStore, TestModel.TEST_PATH, containerNode, modification); FiniteDuration duration = duration("5 seconds"); @@ -945,6 +1014,25 @@ public class ShardTest extends AbstractShardTest { }}; } + private static DataTreeCandidateTip mockCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.WRITE).when(mockCandidateNode).getModificationType(); + doReturn(Optional.of(ImmutableNodes.containerNode(CARS_QNAME))).when(mockCandidateNode).getDataAfter(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + + private static DataTreeCandidateTip mockUnmodifiedCandidate(final String name) { + DataTreeCandidateTip mockCandidate = mock(DataTreeCandidateTip.class, name); + DataTreeCandidateNode mockCandidateNode = mock(DataTreeCandidateNode.class, name + "-node"); + doReturn(ModificationType.UNMODIFIED).when(mockCandidateNode).getModificationType(); + doReturn(YangInstanceIdentifier.builder().build()).when(mockCandidate).getRootPath(); + doReturn(mockCandidateNode).when(mockCandidate).getRootNode(); + return mockCandidate; + } + @Test public void testCommitWhenTransactionHasNoModifications(){ // Note that persistence is enabled which would normally result in the entry getting written to the journal @@ -959,10 +1047,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockUnmodifiedCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1014,10 +1103,11 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); modification.addModification(new DeleteModification(YangInstanceIdentifier.builder().build())); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort).getCandidate(); FiniteDuration duration = duration("5 seconds"); @@ -1070,14 +1160,15 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).preCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).commit(); + doReturn(mockCandidate("cohort1-candidate")).when(cohort1).getCandidate(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1146,13 +1237,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort1).preCommit(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1222,7 +1313,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1270,7 +1361,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1320,7 +1411,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFailedFuture(new IllegalStateException("mock"))).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1339,6 +1430,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1362,7 +1458,7 @@ public class ShardTest extends AbstractShardTest { String transactionID = "tx1"; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.FALSE)).when(cohort).canCommit(); // Simulate the ForwardedReadyTransaction messages that would be sent @@ -1381,6 +1477,11 @@ public class ShardTest extends AbstractShardTest { doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).preCommit(); doReturn(Futures.immediateFuture(null)).when(cohort).commit(); + DataTreeCandidateTip candidate = mock(DataTreeCandidateTip.class); + DataTreeCandidateNode candidateRoot = mock(DataTreeCandidateNode.class); + doReturn(ModificationType.UNMODIFIED).when(candidateRoot).getModificationType(); + doReturn(candidateRoot).when(candidate).getRootNode(); + doReturn(candidate).when(cohort).getCandidate(); shard.tell(new ForwardedReadyTransaction(transactionID2, CURRENT_VERSION, cohort, modification, true, true), getRef()); @@ -1404,10 +1505,10 @@ public class ShardTest extends AbstractShardTest { ShardDataTree dataStore = shard.underlyingActor().getDataStore(); final String transactionID = "tx1"; - Function> preCommit = - new Function>() { + Function> preCommit = + new Function>() { @Override - public ListenableFuture apply(final DOMStoreThreePhaseCommitCohort cohort) { + public ListenableFuture apply(final ShardDataTreeCohort cohort) { ListenableFuture preCommitFuture = cohort.preCommit(); // Simulate an AbortTransaction message occurring during replication, after @@ -1425,7 +1526,7 @@ public class ShardTest extends AbstractShardTest { }; MutableCompositeModification modification = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification, preCommit); @@ -1475,7 +1576,7 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), @@ -1487,7 +1588,7 @@ public class ShardTest extends AbstractShardTest { MutableCompositeModification modification2 = new MutableCompositeModification(); YangInstanceIdentifier listNodePath = YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2).build(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort3", dataStore, listNodePath, ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 2), modification2); @@ -1546,19 +1647,19 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification3); // Ready the Tx's @@ -1620,13 +1721,13 @@ public class ShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort1"); + ShardDataTreeCohort cohort1 = mock(ShardDataTreeCohort.class, "cohort1"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort1).canCommit(); doReturn(Futures.immediateFuture(null)).when(cohort1).abort(); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = mock(DOMStoreThreePhaseCommitCohort.class, "cohort2"); + ShardDataTreeCohort cohort2 = mock(ShardDataTreeCohort.class, "cohort2"); doReturn(Futures.immediateFuture(Boolean.TRUE)).when(cohort2).canCommit(); FiniteDuration duration = duration("5 seconds"); @@ -1782,29 +1883,29 @@ public class ShardTest extends AbstractShardTest { /** * This test simply verifies that the applySnapShot logic will work * @throws ReadFailedException + * @throws DataValidationFailedException */ @Test - public void testInMemoryDataStoreRestore() throws ReadFailedException { - InMemoryDOMDataStore store = new InMemoryDOMDataStore("test", MoreExecutors.sameThreadExecutor()); - - store.onGlobalContextUpdated(SCHEMA_CONTEXT); + public void testInMemoryDataTreeRestore() throws ReadFailedException, DataValidationFailedException { + DataTree store = InMemoryDataTreeFactory.getInstance().create(); + store.setSchemaContext(SCHEMA_CONTEXT); - DOMStoreWriteTransaction putTransaction = store.newWriteOnlyTransaction(); + DataTreeModification putTransaction = store.takeSnapshot().newModification(); putTransaction.write(TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME)); - commitTransaction(putTransaction); + commitTransaction(store, putTransaction); - NormalizedNode expected = readStore(store); + NormalizedNode expected = readStore(store, YangInstanceIdentifier.builder().build()); - DOMStoreWriteTransaction writeTransaction = store.newWriteOnlyTransaction(); + DataTreeModification writeTransaction = store.takeSnapshot().newModification(); writeTransaction.delete(YangInstanceIdentifier.builder().build()); writeTransaction.write(YangInstanceIdentifier.builder().build(), expected); - commitTransaction(writeTransaction); + commitTransaction(store, writeTransaction); - NormalizedNode actual = readStore(store); + NormalizedNode actual = readStore(store, YangInstanceIdentifier.builder().build()); assertEquals(expected, actual); } @@ -1913,15 +2014,9 @@ public class ShardTest extends AbstractShardTest { shard.tell(PoisonPill.getInstance(), ActorRef.noSender()); } - private void commitTransaction(final DOMStoreWriteTransaction transaction) { - DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready(); - ListenableFuture future = - commitCohort.preCommit(); - try { - future.get(); - future = commitCohort.commit(); - future.get(); - } catch (InterruptedException | ExecutionException e) { - } + private static void commitTransaction(DataTree store, final DataTreeModification modification) throws DataValidationFailedException { + modification.ready(); + store.validate(modification); + store.commit(store.prepare(modification)); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java index 96cd3e45eb..a2309be48f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/compat/PreLithiumShardTest.java @@ -32,6 +32,7 @@ import org.mockito.InOrder; import org.opendaylight.controller.cluster.datastore.AbstractShardTest; import org.opendaylight.controller.cluster.datastore.Shard; import org.opendaylight.controller.cluster.datastore.ShardDataTree; +import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort; import org.opendaylight.controller.cluster.datastore.ShardTestKit; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction; import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply; @@ -57,7 +58,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal; import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild; @@ -219,19 +219,19 @@ public class PreLithiumShardTest extends AbstractShardTest { String transactionID1 = "tx1"; MutableCompositeModification modification1 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, + ShardDataTreeCohort cohort1 = setupMockWriteTransaction("cohort1", dataStore, TestModel.TEST_PATH, ImmutableNodes.containerNode(TestModel.TEST_QNAME), modification1); String transactionID2 = "tx2"; MutableCompositeModification modification2 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, + ShardDataTreeCohort cohort2 = setupMockWriteTransaction("cohort2", dataStore, TestModel.OUTER_LIST_PATH, ImmutableNodes.mapNodeBuilder(TestModel.OUTER_LIST_QNAME).build(), modification2); String transactionID3 = "tx3"; MutableCompositeModification modification3 = new MutableCompositeModification(); - DOMStoreThreePhaseCommitCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, + ShardDataTreeCohort cohort3 = setupMockWriteTransaction("cohort3", dataStore, YangInstanceIdentifier.builder(TestModel.OUTER_LIST_PATH) .nodeWithKey(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1).build(), ImmutableNodes.mapEntry(TestModel.OUTER_LIST_QNAME, TestModel.ID_QNAME, 1), diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java index bbfff70e2d..7016ada525 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/modification/ModificationPayloadTest.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.cluster.datastore.modification; import static org.junit.Assert.assertEquals; -import org.apache.commons.lang.SerializationUtils; +import org.apache.commons.lang3.SerializationUtils; import org.junit.Test; import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; @@ -43,8 +43,7 @@ public class ModificationPayloadTest { assertEquals("getPath", writePath, write.getPath()); assertEquals("getData", writeData, write.getData()); - ModificationPayload cloned = - (ModificationPayload) SerializationUtils.clone(payload); + ModificationPayload cloned = SerializationUtils.clone(payload); deserialized = (MutableCompositeModification) payload.getModification(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java index 4240608036..60420dcf23 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/md/cluster/datastore/model/TestModel.java @@ -7,54 +7,54 @@ */ package org.opendaylight.controller.md.cluster.datastore.model; +import com.google.common.io.Resources; +import java.io.IOException; import java.io.InputStream; import java.util.Collections; -import java.util.Set; import org.opendaylight.yangtools.yang.common.QName; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; -import org.opendaylight.yangtools.yang.model.api.Module; import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.parser.api.YangSyntaxErrorException; import org.opendaylight.yangtools.yang.parser.impl.YangParserImpl; public class TestModel { - public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", - "test"); - - public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13", - "junk"); - - - public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); - public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); - public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); - public static final QName ID_QNAME = QName.create(TEST_QNAME, "id"); - public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name"); - public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc"); - public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value"); - private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; - - public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); - public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME); - public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). - node(OUTER_LIST_QNAME).build(); - public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). - node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build(); - public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two"); - public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three"); - - - public static final InputStream getDatastoreTestInputStream() { - return getInputStream(DATASTORE_TEST_YANG); - } - - private static InputStream getInputStream(final String resourceName) { - return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG); - } - - public static SchemaContext createTestContext() { - YangParserImpl parser = new YangParserImpl(); - Set modules = parser.parseYangModelsFromStreams(Collections.singletonList(getDatastoreTestInputStream())); - return parser.resolveSchemaContext(modules); - } + public static final QName TEST_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:test", "2014-03-13", + "test"); + + public static final QName JUNK_QNAME = QName.create("urn:opendaylight:params:xml:ns:yang:controller:md:sal:dom:store:junk", "2014-03-13", + "junk"); + + + public static final QName OUTER_LIST_QNAME = QName.create(TEST_QNAME, "outer-list"); + public static final QName INNER_LIST_QNAME = QName.create(TEST_QNAME, "inner-list"); + public static final QName OUTER_CHOICE_QNAME = QName.create(TEST_QNAME, "outer-choice"); + public static final QName ID_QNAME = QName.create(TEST_QNAME, "id"); + public static final QName NAME_QNAME = QName.create(TEST_QNAME, "name"); + public static final QName DESC_QNAME = QName.create(TEST_QNAME, "desc"); + public static final QName VALUE_QNAME = QName.create(TEST_QNAME, "value"); + private static final String DATASTORE_TEST_YANG = "/odl-datastore-test.yang"; + + public static final YangInstanceIdentifier TEST_PATH = YangInstanceIdentifier.of(TEST_QNAME); + public static final YangInstanceIdentifier JUNK_PATH = YangInstanceIdentifier.of(JUNK_QNAME); + public static final YangInstanceIdentifier OUTER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). + node(OUTER_LIST_QNAME).build(); + public static final YangInstanceIdentifier INNER_LIST_PATH = YangInstanceIdentifier.builder(TEST_PATH). + node(OUTER_LIST_QNAME).node(INNER_LIST_QNAME).build(); + public static final QName TWO_QNAME = QName.create(TEST_QNAME,"two"); + public static final QName THREE_QNAME = QName.create(TEST_QNAME,"three"); + + + public static final InputStream getDatastoreTestInputStream() { + return TestModel.class.getResourceAsStream(DATASTORE_TEST_YANG); + } + + public static SchemaContext createTestContext() { + YangParserImpl parser = new YangParserImpl(); + try { + return parser.parseSources(Collections.singleton(Resources.asByteSource(TestModel.class.getResource(DATASTORE_TEST_YANG)))); + } catch (IOException | YangSyntaxErrorException e) { + throw new ExceptionInInitializerError(e); + } + } }