From 5fd8e6506248cc34da72281a1662612f6c2b2f9a Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Wed, 29 Jun 2016 18:08:11 +0200 Subject: [PATCH] BUG-5280: implement backend message handling This patch adds message routing on the backend so messages earlier in the patch series get handled correctly. Change-Id: Ie0ecfc1c8ce3c3b52b5b9c4986dd01444c2a719a Signed-off-by: Robert Varga --- .../commands/DeadTransactionException.java | 31 ++ .../access/commands/LocalHistoryRequest.java | 2 + .../commands/ModifyTransactionSuccess.java | 40 ++ .../ModifyTransactionSuccessProxyV1.java | 36 ++ .../access/commands/NotLeaderException.java | 32 ++ .../commands/OutOfOrderRequestException.java | 31 ++ .../access/commands/TransactionSuccess.java | 4 + .../commands/UnknownHistoryException.java | 35 ++ .../access/concepts/RequestException.java | 2 +- .../concepts/RuntimeRequestException.java | 33 ++ .../concepts/UnsupportedRequestException.java | 30 ++ .../datastore/AbstractFrontendHistory.java | 88 ++++ .../AbstractShardDataTreeTransaction.java | 12 +- .../datastore/ChainedCommitCohort.java | 6 +- .../cluster/datastore/CohortEntry.java | 8 +- .../datastore/FrontendTransaction.java | 399 ++++++++++++++++++ .../datastore/LeaderFrontendState.java | 211 +++++++++ .../datastore/LocalFrontendHistory.java | 94 +++++ .../ReadWriteShardDataTreeTransaction.java | 2 +- .../controller/cluster/datastore/Shard.java | 142 ++++++- .../datastore/ShardCommitCoordinator.java | 28 +- .../cluster/datastore/ShardDataTree.java | 5 +- .../datastore/ShardDataTreeCohort.java | 3 +- .../ShardDataTreeTransactionChain.java | 21 +- .../ShardDataTreeTransactionParent.java | 6 + .../datastore/ShardReadTransaction.java | 2 +- .../datastore/ShardWriteTransaction.java | 2 +- .../datastore/SimpleShardDataTreeCohort.java | 17 +- .../datastore/StandaloneFrontendHistory.java | 53 +++ .../cluster/datastore/AbstractShardTest.java | 13 +- .../SimpleShardDataTreeCohortTest.java | 25 +- 31 files changed, 1350 insertions(+), 63 deletions(-) create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccess.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccessProxyV1.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/NotLeaderException.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/OutOfOrderRequestException.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/UnknownHistoryException.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RuntimeRequestException.java create mode 100644 opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/UnsupportedRequestException.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java new file mode 100644 index 0000000000..562c0e59b9 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/DeadTransactionException.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * A {@link RequestException} indicating that the backend has received a request to create a transaction which has + * already been purged. + * + * @author Robert Varga + */ +@Beta +public final class DeadTransactionException extends RequestException { + private static final long serialVersionUID = 1L; + + public DeadTransactionException(final long lastSeenTransaction) { + super("Transaction up to " + Long.toUnsignedString(lastSeenTransaction) + " are accounted for"); + } + + @Override + public boolean isRetriable() { + return true; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java index c74711618c..2955ed2b65 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/LocalHistoryRequest.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.access.commands; import akka.actor.ActorRef; import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.access.ABIVersion; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; @@ -28,6 +29,7 @@ public abstract class LocalHistoryRequest> exte LocalHistoryRequest(final LocalHistoryIdentifier target, final long sequence, final ActorRef replyTo) { super(target, sequence, replyTo); + Preconditions.checkArgument(target.getHistoryId() != 0, "History identifier must be non-zero"); } LocalHistoryRequest(final T request, final ABIVersion version) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccess.java new file mode 100644 index 0000000000..c4dd20d6c9 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccess.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Response to a {@link ModifyTransactionRequest} which does not have a {@link PersistenceProtocol}. + * + * @author Robert Varga + */ +@Beta +public final class ModifyTransactionSuccess extends TransactionSuccess { + private static final long serialVersionUID = 1L; + + public ModifyTransactionSuccess(final TransactionIdentifier identifier, final long sequence) { + super(identifier, sequence); + } + + private ModifyTransactionSuccess(final ModifyTransactionSuccess success, final ABIVersion version) { + super(success, version); + } + + @Override + protected AbstractTransactionSuccessProxy externalizableProxy(final ABIVersion version) { + return new ModifyTransactionSuccessProxyV1(this); + } + + @Override + protected ModifyTransactionSuccess cloneAsVersion(final ABIVersion version) { + return new ModifyTransactionSuccess(this, version); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccessProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccessProxyV1.java new file mode 100644 index 0000000000..0efff09259 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionSuccessProxyV1.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +/** + * Externalizable proxy for use with {@link ModifyTransactionSuccess}. It implements the initial (Boron) serialization + * format. + * + * @author Robert Varga + */ +final class ModifyTransactionSuccessProxyV1 extends AbstractTransactionSuccessProxy { + private static final long serialVersionUID = 1L; + + // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to + // be able to create instances via reflection. + @SuppressWarnings("checkstyle:RedundantModifier") + public ModifyTransactionSuccessProxyV1() { + // For Externalizable + } + + ModifyTransactionSuccessProxyV1(final ModifyTransactionSuccess success) { + super(success); + } + + @Override + protected ModifyTransactionSuccess createSuccess(final TransactionIdentifier target, final long sequence) { + return new ModifyTransactionSuccess(target, sequence); + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/NotLeaderException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/NotLeaderException.java new file mode 100644 index 0000000000..0864cd0cf0 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/NotLeaderException.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import akka.actor.ActorRef; +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * General error raised when the recipient of a Request is not the correct backend to talk to. This typically + * means that the backend processing has moved and the frontend needs to run rediscovery and retry the request. + * + * @author Robert Varga + */ +@Beta +public final class NotLeaderException extends RequestException { + private static final long serialVersionUID = 1L; + + public NotLeaderException(final ActorRef me) { + super("Actor " + me + " is not the current leader"); + } + + @Override + public boolean isRetriable() { + return false; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/OutOfOrderRequestException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/OutOfOrderRequestException.java new file mode 100644 index 0000000000..07a3dba620 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/OutOfOrderRequestException.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * A {@link RequestException} indicating that the backend has received an unexpected request. This would typically + * mean that messages are not arriving in the sequence they were generated by the frontend. + * + * @author Robert Varga + */ +@Beta +public final class OutOfOrderRequestException extends RequestException { + private static final long serialVersionUID = 1L; + + public OutOfOrderRequestException(final long expectedRequest) { + super("Expecting request " + Long.toUnsignedString(expectedRequest)); + } + + @Override + public boolean isRetriable() { + return true; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java index e15c2dc7c4..636a2e741b 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/TransactionSuccess.java @@ -29,6 +29,10 @@ public abstract class TransactionSuccess> super(identifier, sequence); } + TransactionSuccess(final T success, final ABIVersion version) { + super(success, version); + } + @Override protected abstract AbstractTransactionSuccessProxy externalizableProxy(ABIVersion version); } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/UnknownHistoryException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/UnknownHistoryException.java new file mode 100644 index 0000000000..196c60c0d8 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/UnknownHistoryException.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.commands; + +import com.google.common.annotations.Beta; +import org.opendaylight.controller.cluster.access.concepts.RequestException; + +/** + * A {@link RequestException} indicating that the backend has received a request referencing an unknown history. This + * typically happens when the linear history ID is newer than the highest observed {@link CreateLocalHistoryRequest}. + * + * @author Robert Varga + */ +@Beta +public final class UnknownHistoryException extends RequestException { + private static final long serialVersionUID = 1L; + + public UnknownHistoryException(final Long lastSeenHistory) { + super("Last known history is " + historyToString(lastSeenHistory)); + } + + private static String historyToString(final Long history) { + return history == null ? "null" : Long.toUnsignedString(history.longValue()); + } + + @Override + public boolean isRetriable() { + return true; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestException.java index f3b3d73421..5f525b0a9b 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestException.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestException.java @@ -24,7 +24,7 @@ public abstract class RequestException extends Exception { super(Preconditions.checkNotNull(message)); } - protected RequestException(@Nonnull final String message, @Nonnull final Exception cause) { + protected RequestException(@Nonnull final String message, @Nonnull final Throwable cause) { super(Preconditions.checkNotNull(message), Preconditions.checkNotNull(cause)); } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RuntimeRequestException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RuntimeRequestException.java new file mode 100644 index 0000000000..2cf1828da4 --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RuntimeRequestException.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +/** + * General error raised when the recipient of a {@link Request} fails to process a request. + * + * @author Robert Varga + */ +@Beta +public final class RuntimeRequestException extends RequestException { + private static final long serialVersionUID = 1L; + + public RuntimeRequestException(final String message, final Throwable cause) { + super(message, cause); + Preconditions.checkArgument(!Strings.isNullOrEmpty(message), "Exception message is mandatory"); + Preconditions.checkNotNull(cause); + } + + @Override + public boolean isRetriable() { + return false; + } +} diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/UnsupportedRequestException.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/UnsupportedRequestException.java new file mode 100644 index 0000000000..903ed59fbc --- /dev/null +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/UnsupportedRequestException.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.access.concepts; + +import com.google.common.annotations.Beta; + +/** + * General error raised when the recipient of a {@link Request} determines that it does not know how to handle + * the request. + * + * @author Robert Varga + */ +@Beta +public final class UnsupportedRequestException extends RequestException { + private static final long serialVersionUID = 1L; + + public UnsupportedRequestException(final Request request) { + super("Unsupported request " + request.getClass()); + } + + @Override + public boolean isRetriable() { + return false; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java new file mode 100644 index 0000000000..7a66eab9d0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract class for providing logical tracking of frontend local histories. This class is specialized for + * standalone transactions and chained transactions. + * + * @author Robert Varga + */ +abstract class AbstractFrontendHistory implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(AbstractFrontendHistory.class); + private static final OutOfOrderRequestException UNSEQUENCED_START = new OutOfOrderRequestException(0); + + private final Map transactions = new HashMap<>(); + private final String persistenceId; + + AbstractFrontendHistory(final String persistenceId) { + this.persistenceId = Preconditions.checkNotNull(persistenceId); + } + + final String persistenceId() { + return persistenceId; + } + + final @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope) throws RequestException { + + // FIXME: handle purging of transactions + + final TransactionIdentifier id = request.getTarget(); + FrontendTransaction tx = transactions.get(id); + if (tx == null) { + // The transaction does not exist and we are about to create it, check sequence number + if (request.getSequence() != 0) { + LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request); + throw UNSEQUENCED_START; + } + + if (request instanceof CommitLocalTransactionRequest) { + tx = createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); + LOG.debug("{}: allocated new ready transaction {}", persistenceId(), id); + } else { + tx = createOpenTransaction(id); + LOG.debug("{}: allocated new open transaction {}", persistenceId(), id); + } + + transactions.put(id, tx); + } else { + final Optional> replay = tx.replaySequence(request.getSequence()); + if (replay.isPresent()) { + return replay.get(); + } + } + + return tx.handleRequest(request, envelope); + } + + abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException; + + abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod) + throws RequestException; + + abstract ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java index f7f8af2923..222280cf3b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractShardDataTreeTransaction.java @@ -11,6 +11,7 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; /** @@ -19,17 +20,20 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; * @param Backing transaction type. */ @NotThreadSafe -abstract class AbstractShardDataTreeTransaction { - private final T snapshot; +abstract class AbstractShardDataTreeTransaction + implements Identifiable { private final TransactionIdentifier id; + private final T snapshot; + private boolean closed; - protected AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) { + AbstractShardDataTreeTransaction(final TransactionIdentifier id, final T snapshot) { this.snapshot = Preconditions.checkNotNull(snapshot); this.id = Preconditions.checkNotNull(id); } - final TransactionIdentifier getId() { + @Override + public final TransactionIdentifier getIdentifier() { return id; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java index 799dd6eb1b..d6b43c46a0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ChainedCommitCohort.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidateTip; @@ -65,8 +64,8 @@ final class ChainedCommitCohort extends ShardDataTreeCohort { } @Override - public ListenableFuture abort() { - return delegate.abort(); + public void abort(final FutureCallback callback) { + delegate.abort(callback); } @Override @@ -88,4 +87,5 @@ final class ChainedCommitCohort extends ShardDataTreeCohort { public State getState() { return delegate.getState(); } + } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java index 84535b593e..a66a49685f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java @@ -11,8 +11,6 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -33,7 +31,7 @@ final class CohortEntry { private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) { this.transaction = Preconditions.checkNotNull(transaction); - this.transactionId = transaction.getId(); + this.transactionId = transaction.getIdentifier(); this.clientVersion = clientVersion; } @@ -107,8 +105,8 @@ final class CohortEntry { cohort.commit(callback); } - void abort() throws InterruptedException, ExecutionException, TimeoutException { - cohort.abort().get(); + void abort(final FutureCallback callback) { + cohort.abort(callback); } void ready(final CohortDecorator cohortDecorator) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java new file mode 100644 index 0000000000..7b542db038 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -0,0 +1,399 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Queue; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionMerge; +import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Frontend transaction state as observed by the shard leader. + * + * @author Robert Varga + */ +@NotThreadSafe +final class FrontendTransaction { + private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class); + + private final AbstractFrontendHistory history; + private final TransactionIdentifier id; + + /** + * It is possible that after we process a request and send a response that response gets lost and the client + * initiates a retry. Since subsequent requests can mutate transaction state we need to retain the response until + * it is acknowledged by the client. + */ + private final Queue replayQueue = new ArrayDeque<>(); + private long firstReplaySequence; + private Long lastPurgedSequence; + private long expectedSequence; + + private ReadWriteShardDataTreeTransaction openTransaction; + private ModifyTransactionSuccess cachedModifySuccess; + private DataTreeModification sealedModification; + private ShardDataTreeCohort readyCohort; + + private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, + final ReadWriteShardDataTreeTransaction transaction) { + this.history = Preconditions.checkNotNull(history); + this.id = Preconditions.checkNotNull(id); + this.openTransaction = Preconditions.checkNotNull(transaction); + } + + private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, + final DataTreeModification mod) { + this.history = Preconditions.checkNotNull(history); + this.id = Preconditions.checkNotNull(id); + this.sealedModification = Preconditions.checkNotNull(mod); + } + + static FrontendTransaction createOpen(final AbstractFrontendHistory history, + final ReadWriteShardDataTreeTransaction transaction) { + return new FrontendTransaction(history, transaction.getIdentifier(), transaction); + } + + static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id, + final DataTreeModification mod) { + return new FrontendTransaction(history, id, mod); + } + + java.util.Optional> replaySequence(final long sequence) throws RequestException { + // Fast path check: if the requested sequence is the next request, bail early + if (expectedSequence == sequence) { + return java.util.Optional.empty(); + } + + // Check sequencing: we do not need to bother with future requests + if (Long.compareUnsigned(expectedSequence, sequence) < 0) { + throw new OutOfOrderRequestException(expectedSequence); + } + + // Sanity check: if we have purged sequences, this has to be newer + if (lastPurgedSequence != null && Long.compareUnsigned(lastPurgedSequence.longValue(), sequence) >= 0) { + // Client has sent a request sequence, which has already been purged. This is a hard error, which should + // never occur. Throwing an IllegalArgumentException will cause it to be wrapped in a + // RuntimeRequestException (which is not retriable) and report it back to the client. + throw new IllegalArgumentException(String.format("Invalid purged sequence %s (last purged is %s)", + sequence, lastPurgedSequence)); + } + + // At this point we have established that the requested sequence lies in the open interval + // (lastPurgedSequence, expectedSequence). That does not actually mean we have a response, as the commit + // machinery is asynchronous, hence a reply may be in the works and not available. + + long replaySequence = firstReplaySequence; + final Iterator it = replayQueue.iterator(); + while (it.hasNext()) { + final Object replay = it.next(); + if (replaySequence == sequence) { + if (replay instanceof RequestException) { + throw (RequestException) replay; + } + + Verify.verify(replay instanceof TransactionSuccess); + return java.util.Optional.of((TransactionSuccess) replay); + } + + replaySequence++; + } + + // Not found + return java.util.Optional.empty(); + } + + void purgeSequencesUpTo(final long sequence) { + // FIXME: implement this + + lastPurgedSequence = sequence; + } + + // Sequence has already been checked + @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope) + throws RequestException { + if (request instanceof ModifyTransactionRequest) { + return handleModifyTransaction((ModifyTransactionRequest) request, envelope); + } else if (request instanceof CommitLocalTransactionRequest) { + handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope); + return null; + } else if (request instanceof ExistsTransactionRequest) { + return handleExistsTransaction((ExistsTransactionRequest) request); + } else if (request instanceof ReadTransactionRequest) { + return handleReadTransaction((ReadTransactionRequest) request); + } else if (request instanceof TransactionPreCommitRequest) { + handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope); + return null; + } else if (request instanceof TransactionDoCommitRequest) { + handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope); + return null; + } else if (request instanceof TransactionAbortRequest) { + handleTransactionAbort((TransactionAbortRequest) request, envelope); + return null; + } else { + throw new UnsupportedRequestException(request); + } + } + + private void recordResponse(final long sequence, final Object response) { + if (replayQueue.isEmpty()) { + firstReplaySequence = sequence; + } + replayQueue.add(response); + expectedSequence++; + } + + private > T recordSuccess(final long sequence, final T success) { + recordResponse(sequence, success); + return success; + } + + private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess success) { + recordResponse(success.getSequence(), success); + envelope.sendSuccess(success); + } + + private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) { + recordResponse(envelope.getMessage().getSequence(), failure); + envelope.sendFailure(failure); + } + + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, + final RequestEnvelope envelope) throws RequestException { + readyCohort.preCommit(new FutureCallback() { + @Override + public void onSuccess(final DataTreeCandidate result) { + recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), + request.getSequence())); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure)); + readyCohort = null; + } + }); + } + + private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope) + throws RequestException { + readyCohort.commit(new FutureCallback() { + @Override + public void onSuccess(final UnsignedLong result) { + successfulCommit(envelope); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure)); + readyCohort = null; + } + }); + } + + private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope) + throws RequestException { + readyCohort.abort(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + readyCohort = null; + recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence())); + LOG.debug("Transaction {} aborted", id); + } + + @Override + public void onFailure(final Throwable failure) { + readyCohort = null; + LOG.warn("Transaction {} abort failed", id, failure); + recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure)); + } + }); + } + + private void coordinatedCommit(final RequestEnvelope envelope) { + readyCohort.canCommit(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), + envelope.getMessage().getSequence())); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure)); + readyCohort = null; + } + }); + } + + private void directCommit(final RequestEnvelope envelope) { + readyCohort.canCommit(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + successfulDirectCanCommit(envelope); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure)); + readyCohort = null; + } + }); + + } + + private void successfulDirectCanCommit(final RequestEnvelope envelope) { + readyCohort.preCommit(new FutureCallback() { + @Override + public void onSuccess(final DataTreeCandidate result) { + successfulDirectPreCommit(envelope); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure)); + readyCohort = null; + } + }); + } + + private void successfulDirectPreCommit(final RequestEnvelope envelope) { + readyCohort.commit(new FutureCallback() { + + @Override + public void onSuccess(final UnsignedLong result) { + successfulCommit(envelope); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure)); + readyCohort = null; + } + }); + } + + private void successfulCommit(final RequestEnvelope envelope) { + recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(), + envelope.getMessage().getSequence())); + readyCohort = null; + } + + private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, + final RequestEnvelope envelope) throws RequestException { + if (sealedModification.equals(request.getModification())) { + readyCohort = history.createReadyCohort(id, sealedModification); + + if (request.isCoordinated()) { + coordinatedCommit(envelope); + } else { + directCommit(envelope); + } + } else { + throw new UnsupportedRequestException(request); + } + } + + private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) + throws RequestException { + final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(), + data.isPresent())); + } + + private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException { + final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data)); + } + + private ModifyTransactionSuccess replyModifySuccess(final long sequence) { + if (cachedModifySuccess == null) { + cachedModifySuccess = new ModifyTransactionSuccess(id, sequence); + } + + return recordSuccess(sequence, cachedModifySuccess); + } + + private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, + final RequestEnvelope envelope) throws RequestException { + + final DataTreeModification modification = openTransaction.getSnapshot(); + for (TransactionModification m : request.getModifications()) { + if (m instanceof TransactionDelete) { + modification.delete(m.getPath()); + } else if (m instanceof TransactionWrite) { + modification.write(m.getPath(), ((TransactionWrite) m).getData()); + } else if (m instanceof TransactionMerge) { + modification.merge(m.getPath(), ((TransactionMerge) m).getData()); + } else { + LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m); + } + } + + final java.util.Optional maybeProto = request.getPersistenceProtocol(); + if (!maybeProto.isPresent()) { + return replyModifySuccess(request.getSequence()); + } + + switch (maybeProto.get()) { + case ABORT: + openTransaction.abort(); + openTransaction = null; + return replyModifySuccess(request.getSequence()); + case SIMPLE: + readyCohort = openTransaction.ready(); + openTransaction = null; + directCommit(envelope); + return null; + case THREE_PHASE: + readyCohort = openTransaction.ready(); + openTransaction = null; + coordinatedCommit(envelope); + return null; + default: + throw new UnsupportedRequestException(request); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java new file mode 100644 index 0000000000..3c65b799d1 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; +import com.google.common.primitives.UnsignedLong; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.DeadHistoryException; +import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; +import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; +import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.UnknownHistoryException; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Frontend state as observed by the shard leader. This class is responsible for tracking generations and sequencing + * in the frontend/backend conversation. + * + * @author Robert Varga + */ +@NotThreadSafe +final class LeaderFrontendState implements Identifiable { + private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class); + + // Histories which have not been purged + private final Map localHistories = new HashMap<>(); + + // RangeSet performs automatic merging, hence we keep minimal state tracking information + private final RangeSet purgedHistories = TreeRangeSet.create(); + + // Used for all standalone transactions + private final AbstractFrontendHistory standaloneHistory; + private final ShardDataTree tree; + private final ClientIdentifier clientId; + private final String persistenceId; + + private long expectedTxSequence; + private Long lastSeenHistory = null; + + + // TODO: explicit failover notification + // Record the ActorRef for the originating actor and when we switch to being a leader send a notification + // to the frontend client -- that way it can immediately start sending requests + + // TODO: add statistics: + // - number of requests processed + // - number of histories processed + // - per-RequestException throw counters + + LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + this.persistenceId = Preconditions.checkNotNull(persistenceId); + this.clientId = Preconditions.checkNotNull(clientId); + this.tree = Preconditions.checkNotNull(tree); + standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree); + } + + @Override + public ClientIdentifier getIdentifier() { + return clientId; + } + + private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException { + if (expectedTxSequence != envelope.getTxSequence()) { + throw new OutOfOrderRequestException(expectedTxSequence); + } + } + + private void expectNextRequest() { + expectedTxSequence++; + } + + @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, + final RequestEnvelope envelope) throws RequestException { + checkRequestSequence(envelope); + + try { + if (request instanceof CreateLocalHistoryRequest) { + return handleCreateHistory((CreateLocalHistoryRequest) request); + } else if (request instanceof DestroyLocalHistoryRequest) { + return handleDestroyHistory((DestroyLocalHistoryRequest) request); + } else if (request instanceof PurgeLocalHistoryRequest) { + return handlePurgeHistory((PurgeLocalHistoryRequest)request); + } else { + throw new UnsupportedRequestException(request); + } + } finally { + expectNextRequest(); + } + } + + private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException { + final LocalHistoryIdentifier id = request.getTarget(); + final AbstractFrontendHistory existing = localHistories.get(id); + if (existing != null) { + // History already exists: report success + LOG.debug("{}: history {} already exists", persistenceId, id); + return new LocalHistorySuccess(id, request.getSequence()); + } + + // We have not found the history. Before we create it we need to check history ID sequencing so that we do not + // end up resurrecting a purged history. + if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) { + LOG.debug("{}: rejecting purged request {}", persistenceId, request); + throw new DeadHistoryException(lastSeenHistory.longValue()); + } + + // Update last history we have seen + if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) { + lastSeenHistory = id.getHistoryId(); + } + + localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id))); + LOG.debug("{}: created history {}", persistenceId, id); + return new LocalHistorySuccess(id, request.getSequence()); + } + + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.get(id); + if (existing == null) { + // History does not exist: report success + LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id); + return new LocalHistorySuccess(id, request.getSequence()); + } + + return existing.destroy(request.getSequence()); + } + + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException { + final LocalHistoryIdentifier id = request.getTarget(); + final LocalFrontendHistory existing = localHistories.remove(id); + if (existing != null) { + purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId()))); + + if (!existing.isDestroyed()) { + LOG.warn("{}: purging undestroyed history {}", persistenceId, id); + existing.destroy(request.getSequence()); + } + + // FIXME: record a PURGE tombstone in the journal + + LOG.debug("{}: purged history {}", persistenceId, id); + } else { + LOG.debug("{}: history {} has already been purged", persistenceId, id); + } + + return new LocalHistorySuccess(id, request.getSequence()); + } + + @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, + final RequestEnvelope envelope) throws RequestException { + checkRequestSequence(envelope); + + try { + final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId(); + final AbstractFrontendHistory history; + + if (lhId.getHistoryId() != 0) { + history = localHistories.get(lhId); + if (history == null) { + LOG.debug("{}: rejecting unknown history request {}", persistenceId, request); + throw new UnknownHistoryException(lastSeenHistory); + } + } else { + history = standaloneHistory; + } + + return history.handleTransactionRequest(request, envelope); + } finally { + expectNextRequest(); + } + } + + void reconnect() { + expectedTxSequence = 0; + } + + void retire() { + // FIXME: flush all state + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(LeaderFrontendState.class).add("clientId", clientId) + .add("purgedHistories", purgedHistories).toString(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java new file mode 100644 index 0000000000..a03b54fbef --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; +import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Chained transaction specialization of {@link AbstractFrontendHistory}. It prevents concurrent open transactions. + * + * @author Robert Varga + */ +final class LocalFrontendHistory extends AbstractFrontendHistory { + private enum State { + OPEN, + CLOSED, + } + + private static final Logger LOG = LoggerFactory.getLogger(LocalFrontendHistory.class); + + private final ShardDataTreeTransactionChain chain; + + private Long lastSeenTransaction; + private State state = State.OPEN; + + LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) { + super(persistenceId); + this.chain = Preconditions.checkNotNull(chain); + } + + @Override + public LocalHistoryIdentifier getIdentifier() { + return chain.getIdentifier(); + } + + @Override + FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException { + checkDeadTransaction(id); + lastSeenTransaction = id.getTransactionId(); + return FrontendTransaction.createOpen(this, chain.newReadWriteTransaction(id)); + } + + @Override + FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod) + throws RequestException { + checkDeadTransaction(id); + lastSeenTransaction = id.getTransactionId(); + return FrontendTransaction.createReady(this, id, mod); + } + + @Override + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) { + return chain.createReadyCohort(id, mod); + } + + LocalHistorySuccess destroy(final long sequence) throws RequestException { + if (state != State.CLOSED) { + LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); + + // FIXME: add any finalization as needed + state = State.CLOSED; + } + + // FIXME: record a DESTROY tombstone in the journal + return new LocalHistorySuccess(getIdentifier(), sequence); + } + + boolean isDestroyed() { + return state == State.CLOSED; + } + + private void checkDeadTransaction(final TransactionIdentifier id) throws RequestException { + // FIXME: check if this history is still open + // FIXME: check if the last transaction has been submitted + + // Transaction identifiers within a local history have to have increasing IDs + if (lastSeenTransaction != null && Long.compareUnsigned(lastSeenTransaction, id.getTransactionId()) >= 0) { + throw new DeadTransactionException(lastSeenTransaction); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java index 771de8cd12..492f6ec9f6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ReadWriteShardDataTreeTransaction.java @@ -14,7 +14,7 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification public final class ReadWriteShardDataTreeTransaction extends AbstractShardDataTreeTransaction { private final ShardDataTreeTransactionParent parent; - protected ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, + ReadWriteShardDataTreeTransaction(final ShardDataTreeTransactionParent parent, final TransactionIdentifier id, final DataTreeModification modification) { super(id, modification); this.parent = Preconditions.checkNotNull(parent); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index c98e11d60a..4a1be9862b 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -12,18 +12,39 @@ import akka.actor.ActorRef; import akka.actor.ActorSelection; import akka.actor.Cancellable; import akka.actor.Props; +import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.ABIVersion; +import org.opendaylight.controller.cluster.access.commands.ConnectClientRequest; +import org.opendaylight.controller.cluster.access.commands.ConnectClientSuccess; +import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.NotLeaderException; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.Request; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.RequestSuccess; +import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; import org.opendaylight.controller.cluster.common.actor.CommonConfig; import org.opendaylight.controller.cluster.common.actor.MessageTracker; import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error; @@ -96,6 +117,17 @@ public class Shard extends RaftActor { // FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant. public static final String DEFAULT_NAME = "default"; + private static final Collection SUPPORTED_ABIVERSIONS; + + static { + final ABIVersion[] values = ABIVersion.values(); + final ABIVersion[] real = Arrays.copyOfRange(values, 1, values.length - 1); + SUPPORTED_ABIVERSIONS = ImmutableList.copyOf(real).reverse(); + } + + // FIXME: make this a dynamic property based on mailbox size and maximum number of clients + private static final int CLIENT_MAX_MESSAGES = 1000; + // The state of this Shard private final ShardDataTree store; @@ -129,6 +161,7 @@ public class Shard extends RaftActor { private final ShardTransactionMessageRetrySupport messageRetrySupport; private final FrontendMetadata frontendMetadata = new FrontendMetadata(); + private final Map knownFrontends = new HashMap<>(); protected Shard(final AbstractBuilder builder) { super(builder.getId().toString(), builder.getPeerAddresses(), @@ -219,6 +252,7 @@ public class Shard extends RaftActor { } } + @SuppressWarnings("checkstyle:IllegalCatch") @Override protected void handleNonRaftCommand(final Object message) { try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) { @@ -228,7 +262,23 @@ public class Shard extends RaftActor { maybeError.get()); } - if (CreateTransaction.isSerializedType(message)) { + if (message instanceof RequestEnvelope) { + final RequestEnvelope envelope = (RequestEnvelope)message; + try { + final RequestSuccess success = handleRequest(envelope); + if (success != null) { + envelope.sendSuccess(success); + } + } catch (RequestException e) { + LOG.debug("{}: request {} failed", persistenceId(), envelope, e); + envelope.sendFailure(e); + } catch (Exception e) { + LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e)); + } + } else if (message instanceof ConnectClientRequest) { + handleConnectClient((ConnectClientRequest)message); + } else if (CreateTransaction.isSerializedType(message)) { handleCreateTransaction(message); } else if (message instanceof BatchedModifications) { handleBatchedModifications((BatchedModifications)message); @@ -280,6 +330,86 @@ public class Shard extends RaftActor { } } + // Acquire our frontend tracking handle and verify generation matches + private LeaderFrontendState getFrontend(final ClientIdentifier clientId) throws RequestException { + final LeaderFrontendState existing = knownFrontends.get(clientId.getFrontendId()); + if (existing != null) { + final int cmp = Long.compareUnsigned(existing.getIdentifier().getGeneration(), clientId.getGeneration()); + if (cmp == 0) { + return existing; + } + if (cmp > 0) { + LOG.debug("{}: rejecting request from outdated client {}", persistenceId(), clientId); + throw new RetiredGenerationException(existing.getIdentifier().getGeneration()); + } + + LOG.info("{}: retiring state {}, outdated by request from client {}", persistenceId(), existing, clientId); + existing.retire(); + knownFrontends.remove(clientId.getFrontendId()); + } else { + LOG.debug("{}: client {} is not yet known", persistenceId(), clientId); + } + + final LeaderFrontendState ret = new LeaderFrontendState(persistenceId(), clientId, store); + knownFrontends.put(clientId.getFrontendId(), ret); + LOG.debug("{}: created state {} for client {}", persistenceId(), ret, clientId); + return ret; + } + + private static @Nonnull ABIVersion selectVersion(final ConnectClientRequest message) { + final Range clientRange = Range.closed(message.getMinVersion(), message.getMaxVersion()); + for (ABIVersion v : SUPPORTED_ABIVERSIONS) { + if (clientRange.contains(v)) { + return v; + } + } + + throw new IllegalArgumentException(String.format( + "No common version between backend versions %s and client versions %s", SUPPORTED_ABIVERSIONS, + clientRange)); + } + + @SuppressWarnings("checkstyle:IllegalCatch") + private void handleConnectClient(final ConnectClientRequest message) { + try { + if (!isLeader() || !isLeaderActive()) { + LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), message); + throw new NotLeaderException(getSelf()); + } + + final ABIVersion selectedVersion = selectVersion(message); + final LeaderFrontendState frontend = getFrontend(message.getTarget()); + frontend.reconnect(); + message.getReplyTo().tell(new ConnectClientSuccess(message.getTarget(), message.getSequence(), getSelf(), + ImmutableList.of(), store.getDataTree(), CLIENT_MAX_MESSAGES).toVersion(selectedVersion), + ActorRef.noSender()); + } catch (RequestException | RuntimeException e) { + message.getReplyTo().tell(new Failure(e), ActorRef.noSender()); + } + } + + private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope) throws RequestException { + // We are not the leader, hence we want to fail-fast. + if (!isLeader() || !isLeaderActive()) { + LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope); + throw new NotLeaderException(getSelf()); + } + + final Request request = envelope.getMessage(); + if (request instanceof TransactionRequest) { + final TransactionRequest txReq = (TransactionRequest)request; + final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId(); + return getFrontend(clientId).handleTransactionRequest(txReq, envelope); + } else if (request instanceof LocalHistoryRequest) { + final LocalHistoryRequest lhReq = (LocalHistoryRequest)request; + final ClientIdentifier clientId = lhReq.getTarget().getClientId(); + return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope); + } else { + LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request); + throw new UnsupportedRequestException(request); + } + } + private boolean hasLeader() { return getLeaderId() != null; } @@ -365,7 +495,7 @@ public class Shard extends RaftActor { } catch (Exception e) { LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(), batched.getTransactionId(), e); - sender.tell(new akka.actor.Status.Failure(e), getSelf()); + sender.tell(new Failure(e), getSelf()); } } @@ -411,7 +541,7 @@ public class Shard extends RaftActor { private boolean failIfIsolatedLeader(final ActorRef sender) { if (isIsolatedLeader()) { - sender.tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format( + sender.tell(new Failure(new NoShardLeaderException(String.format( "Shard %s was the leader but has lost contact with all of its followers. Either all" + " other follower nodes are down or this node is isolated by a network partition.", persistenceId()))), getSelf()); @@ -436,7 +566,7 @@ public class Shard extends RaftActor { } catch (Exception e) { LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), message.getTransactionId(), e); - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + getSender().tell(new Failure(e), getSelf()); } } else { ActorSelection leader = getLeader(); @@ -487,7 +617,7 @@ public class Shard extends RaftActor { } else if (getLeader() != null) { getLeader().forward(message, getContext()); } else { - getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException( + getSender().tell(new Failure(new NoShardLeaderException( "Could not create a shard transaction", persistenceId())), getSelf()); } } @@ -510,7 +640,7 @@ public class Shard extends RaftActor { getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor), createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf()); } catch (Exception e) { - getSender().tell(new akka.actor.Status.Failure(e), getSelf()); + getSender().tell(new Failure(e), getSelf()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 2773a3e3bf..33634b1d6c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -23,8 +23,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import javax.annotation.Nonnull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; @@ -370,21 +368,25 @@ final class ShardCommitCoordinator { log.debug("{}: Aborting transaction {}", name, transactionID); final ActorRef self = shard.getSelf(); - try { - cohortEntry.abort(); + cohortEntry.abort(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + if (sender != null) { + sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); + } + } - shard.getShardMBean().incrementAbortTransactionsCount(); + @Override + public void onFailure(final Throwable failure) { + log.error("{}: An exception happened during abort", name, failure); - if (sender != null) { - sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); + if (sender != null) { + sender.tell(new Failure(failure), self); + } } - } catch (InterruptedException | ExecutionException | TimeoutException e) { - log.error("{}: An exception happened during abort", name, e); + }); - if (sender != null) { - sender.tell(new Failure(e), self); - } - } + shard.getShardMBean().incrementAbortTransactionsCount(); } void checkForExpiredTransactions(final long timeout, final Shard shard) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index 7f3476090b..ef6436ae8d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -367,7 +367,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { } } - private ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { + ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier) { ShardDataTreeTransactionChain chain = transactionChains.get(localHistoryIdentifier); if (chain == null) { chain = new ShardDataTreeTransactionChain(localHistoryIdentifier, this); @@ -475,7 +475,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { final DataTreeModification snapshot = transaction.getSnapshot(); snapshot.ready(); - return createReadyCohort(transaction.getId(), snapshot); + return createReadyCohort(transaction.getIdentifier(), snapshot); } public Optional> readNode(final YangInstanceIdentifier path) { @@ -668,6 +668,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { cohortRegistry.process(sender, message); } + @Override ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) { SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, modification, txId, diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java index 0a3a6ae177..4df9dea7db 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeCohort.java @@ -10,7 +10,6 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListenableFuture; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; @@ -49,7 +48,7 @@ public abstract class ShardDataTreeCohort implements Identifiable callback); @VisibleForTesting - public abstract ListenableFuture abort(); + public abstract void abort(FutureCallback callback); @VisibleForTesting public abstract void commit(FutureCallback callback); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index 004e305f70..9a8e89eb42 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -12,6 +12,8 @@ import com.google.common.base.Preconditions; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,18 +22,19 @@ import org.slf4j.LoggerFactory; * A transaction chain attached to a Shard. */ @NotThreadSafe -final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent { +final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent + implements Identifiable { private static final Logger LOG = LoggerFactory.getLogger(ShardDataTreeTransactionChain.class); - private final ShardDataTree dataTree; private final LocalHistoryIdentifier chainId; + private final ShardDataTree dataTree; private ReadWriteShardDataTreeTransaction previousTx; private ReadWriteShardDataTreeTransaction openTransaction; private boolean closed; ShardDataTreeTransactionChain(final LocalHistoryIdentifier localHistoryIdentifier, final ShardDataTree dataTree) { - this.dataTree = Preconditions.checkNotNull(dataTree); this.chainId = Preconditions.checkNotNull(localHistoryIdentifier); + this.dataTree = Preconditions.checkNotNull(dataTree); } private DataTreeSnapshot getSnapshot() { @@ -94,9 +97,19 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent return MoreObjects.toStringHelper(this).add("id", chainId).toString(); } - void clearTransaction(ReadWriteShardDataTreeTransaction transaction) { + void clearTransaction(final ReadWriteShardDataTreeTransaction transaction) { if (transaction.equals(previousTx)) { previousTx = null; } } + + @Override + public LocalHistoryIdentifier getIdentifier() { + return chainId; + } + + @Override + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification modification) { + return dataTree.createReadyCohort(txId, modification); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java index 26e17057a7..20f50156f8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionParent.java @@ -7,8 +7,14 @@ */ package org.opendaylight.controller.cluster.datastore; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; + abstract class ShardDataTreeTransactionParent { + abstract void abortTransaction(AbstractShardDataTreeTransaction transaction); abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction); + + abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java index dfb0897b48..3978d757a1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java @@ -24,7 +24,7 @@ public class ShardReadTransaction extends ShardTransaction { public ShardReadTransaction(AbstractShardDataTreeTransaction transaction, ActorRef shardActor, ShardStats shardStats) { - super(shardActor, shardStats, transaction.getId()); + super(shardActor, shardStats, transaction.getIdentifier()); this.transaction = Preconditions.checkNotNull(transaction); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java index 9c304ecdad..56683073cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java @@ -33,7 +33,7 @@ public class ShardWriteTransaction extends ShardTransaction { public ShardWriteTransaction(ReadWriteShardDataTreeTransaction transaction, ActorRef shardActor, ShardStats shardStats) { - super(shardActor, shardStats, transaction.getId()); + super(shardActor, shardStats, transaction.getIdentifier()); this.transaction = transaction; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java index 7da174fec5..0527d013f2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohort.java @@ -15,7 +15,6 @@ import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -93,34 +92,34 @@ final class SimpleShardDataTreeCohort extends ShardDataTreeCohort { } } + @Override - public ListenableFuture abort() { + public void abort(final FutureCallback callback) { dataTree.startAbort(this); state = State.ABORTED; final Optional>> maybeAborts = userCohorts.abort(); if (!maybeAborts.isPresent()) { - return VOID_FUTURE; + callback.onSuccess(null); + return; } final Future> aborts = maybeAborts.get(); if (aborts.isCompleted()) { - return VOID_FUTURE; + callback.onSuccess(null); + return; } - final SettableFuture ret = SettableFuture.create(); aborts.onComplete(new OnComplete>() { @Override public void onComplete(final Throwable failure, final Iterable objs) { if (failure != null) { - ret.setException(failure); + callback.onFailure(failure); } else { - ret.set(null); + callback.onSuccess(null); } } }, ExecutionContexts.global()); - - return ret; } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java new file mode 100644 index 0000000000..1f2eb72560 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; + +/** + * Standalone transaction specialization of {@link AbstractFrontendHistory}. There can be multiple open transactions + * and they are submitted in any order. + * + * @author Robert Varga + */ +final class StandaloneFrontendHistory extends AbstractFrontendHistory { + private final LocalHistoryIdentifier identifier; + private final ShardDataTree tree; + + StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { + super(persistenceId); + this.identifier = new LocalHistoryIdentifier(clientId, 0); + this.tree = Preconditions.checkNotNull(tree); + } + + @Override + public LocalHistoryIdentifier getIdentifier() { + return identifier; + } + + @Override + FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException { + return FrontendTransaction.createOpen(this, tree.newReadWriteTransaction(id)); + } + + @Override + FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod) + throws RequestException { + return FrontendTransaction.createReady(this, id, mod); + } + + @Override + ShardDataTreeCohort createReadyCohort(final TransactionIdentifier id, final DataTreeModification mod) { + return tree.createReadyCohort(id, mod); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java index d0302eb6fe..ab7e8f0401 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java @@ -31,7 +31,6 @@ import akka.util.Timeout; import com.google.common.base.Optional; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.Collections; @@ -437,7 +436,7 @@ public abstract class AbstractShardTest extends AbstractActorTest { private FutureCallback preCommit; private FutureCallback commit; - public void setDelegate(ShardDataTreeCohort delegate) { + public void setDelegate(final ShardDataTreeCohort delegate) { this.delegate = delegate; } @@ -472,19 +471,19 @@ public abstract class AbstractShardTest extends AbstractActorTest { } @Override - public void canCommit(FutureCallback callback) { + public void canCommit(final FutureCallback callback) { canCommit = mockFutureCallback(callback); delegate.canCommit(canCommit); } @Override - public void preCommit(FutureCallback callback) { + public void preCommit(final FutureCallback callback) { preCommit = mockFutureCallback(callback); delegate.preCommit(preCommit); } @Override - public void commit(FutureCallback callback) { + public void commit(final FutureCallback callback) { commit = mockFutureCallback(callback); delegate.commit(commit); } @@ -506,8 +505,8 @@ public abstract class AbstractShardTest extends AbstractActorTest { } @Override - public ListenableFuture abort() { - return delegate.abort(); + public void abort(final FutureCallback callback) { + delegate.abort(callback); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java index 232d9aa618..7d2500b716 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/SimpleShardDataTreeCohortTest.java @@ -19,9 +19,10 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.ListenableFuture; import java.util.Collections; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -210,12 +211,28 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { verify(mockUserCohorts).abort(); } + private static Future abort(final ShardDataTreeCohort cohort) { + final CompletableFuture f = new CompletableFuture<>(); + cohort.abort(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + f.complete(null); + } + + @Override + public void onFailure(final Throwable failure) { + f.completeExceptionally(failure); + } + }); + + return f; + } + @Test public void testAbort() throws Exception { doNothing().when(mockShardDataTree).startAbort(cohort); - cohort.abort().get(); - + abort(cohort).get(); verify(mockShardDataTree).startAbort(cohort); } @@ -226,7 +243,7 @@ public class SimpleShardDataTreeCohortTest extends AbstractTest { final Promise> cohortFuture = akka.dispatch.Futures.promise(); doReturn(Optional.of(cohortFuture.future())).when(mockUserCohorts).abort(); - final ListenableFuture abortFuture = cohort.abort(); + final Future abortFuture = abort(cohort); cohortFuture.success(Collections.emptyList()); -- 2.36.6