From 5cb0787412ab63a3aa5dcc044511e1ce569662cf Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 25 Nov 2016 13:55:13 +0100 Subject: [PATCH] BUG-5280: add basic concept of ClientSnapshot In order to accurately read-only transactions with ClientLocalHistory, we need to differentiate between Transactions and Snapshots. This patch introduces the concept, its API and backend signalling/implementation. State keeping is reworked so it requires only a single field, which is manipulated via an atonic updater, with null signifying state has already been closed (or is in process of being taken care of). Change-Id: I2f8fd5ffdff366d1948538299b96721b756c620c Signed-off-by: Robert Varga --- .../AbstractReadTransactionRequest.java | 9 +- ...AbstractReadTransactionRequestProxyV1.java | 8 +- .../commands/ExistsTransactionRequest.java | 4 +- .../ExistsTransactionRequestProxyV1.java | 4 +- .../commands/ReadTransactionRequest.java | 4 +- .../ReadTransactionRequestProxyV1.java | 4 +- .../actors/dds/AbstractClientHandle.java | 119 +++++++ .../actors/dds/AbstractClientHistory.java | 67 +++- .../dds/AbstractDataStoreClientBehavior.java | 5 + .../actors/dds/AbstractProxyTransaction.java | 15 + .../actors/dds/ClientLocalHistory.java | 36 +- .../databroker/actors/dds/ClientSnapshot.java | 47 +++ .../actors/dds/ClientTransaction.java | 117 ++----- .../actors/dds/DataStoreClient.java | 7 + .../actors/dds/LocalProxyTransaction.java | 157 +-------- .../dds/LocalReadOnlyProxyTransaction.java | 133 ++++++++ .../dds/LocalReadWriteProxyTransaction.java | 257 +++++++++++++++ .../databroker/actors/dds/ProxyHistory.java | 56 ++-- .../actors/dds/RemoteProxyTransaction.java | 22 +- .../actors/dds/SingleClientHistory.java | 8 + .../datastore/AbstractFrontendHistory.java | 29 +- .../FrontendReadOnlyTransaction.java | 81 +++++ .../FrontendReadWriteTransaction.java | 310 ++++++++++++++++++ .../datastore/FrontendTransaction.java | 296 +---------------- .../datastore/LocalFrontendHistory.java | 11 +- .../datastore/StandaloneFrontendHistory.java | 9 +- 26 files changed, 1214 insertions(+), 601 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHandle.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java index 17054f78a2..6510d5b5b4 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequest.java @@ -33,16 +33,19 @@ public abstract class AbstractReadTransactionRequest { private static final long serialVersionUID = 1L; private final YangInstanceIdentifier path; + private final boolean snapshotOnly; AbstractReadTransactionRequest(final TransactionIdentifier identifier, final long sequence, final ActorRef replyTo, - final YangInstanceIdentifier path) { + final YangInstanceIdentifier path, final boolean snapshotOnly) { super(identifier, sequence, replyTo); this.path = Preconditions.checkNotNull(path); + this.snapshotOnly = snapshotOnly; } AbstractReadTransactionRequest(final T request, final ABIVersion version) { super(request, version); this.path = request.getPath(); + this.snapshotOnly = request.isSnapshotOnly(); } @Nonnull @@ -50,6 +53,10 @@ public abstract class AbstractReadTransactionRequest { private static final long serialVersionUID = 1L; private YangInstanceIdentifier path; + private boolean snapshotOnly; protected AbstractReadTransactionRequestProxyV1() { // For Externalizable @@ -36,6 +37,7 @@ abstract class AbstractReadTransactionRequestProxyV1 extends LocalAbortable + implements Identifiable { + /* + * Our state consist of the the proxy map, hence we just subclass ConcurrentHashMap directly. + */ + private static final class State extends ConcurrentHashMap { + private static final long serialVersionUID = 1L; + } + + private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHandle.class); + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(AbstractClientHandle.class, State.class, "state"); + + private final TransactionIdentifier transactionId; + private final AbstractClientHistory parent; + + private volatile State state = new State<>(); + + // Hidden to prevent outside instantiation + AbstractClientHandle(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { + this.transactionId = Preconditions.checkNotNull(transactionId); + this.parent = Preconditions.checkNotNull(parent); + } + + @Override + public final TransactionIdentifier getIdentifier() { + return transactionId; + } + + /** + * Release all state associated with this transaction. + * + * @return True if this transaction became closed during this call + */ + public final boolean abort() { + if (commonAbort()) { + parent.onTransactionAbort(this); + return true; + } + + return false; + } + + private boolean commonAbort() { + final Collection toClose = ensureClosed(); + if (toClose == null) { + return false; + } + + toClose.forEach(AbstractProxyTransaction::abort); + return true; + } + + @Override + final void localAbort(final Throwable cause) { + LOG.debug("Local abort of transaction {}", getIdentifier(), cause); + commonAbort(); + } + + /** + * Make sure this snapshot is closed. If it became closed as the effect of this call, return a collection of + * {@link AbstractProxyTransaction} handles which need to be closed, too. + * + * @return null if this snapshot has already been closed, otherwise a collection of proxies, which need to be + * closed, too. + */ + @Nullable final Collection ensureClosed() { + @SuppressWarnings("unchecked") + final State local = STATE_UPDATER.getAndSet(this, null); + return local == null ? null : local.values(); + } + + final T ensureProxy(final YangInstanceIdentifier path, final Function createProxy) { + final Map local = getState(); + final Long shard = parent.resolveShardForPath(path); + + return local.computeIfAbsent(shard, createProxy); + } + + final AbstractClientHistory parent() { + return parent; + } + + private State getState() { + final State local = state; + Preconditions.checkState(local != null, "Transaction %s is closed", transactionId); + return local; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 519763ac02..1be8464335 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -49,7 +49,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state"); @GuardedBy("this") - private final Map openTransactions = new HashMap<>(); + private final Map> openTransactions = new HashMap<>(); @GuardedBy("this") private final Map readyTransactions = new HashMap<>(); @@ -99,7 +99,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.debug("Force-closing history {}", getIdentifier(), cause); synchronized (this) { - for (ClientTransaction t : openTransactions.values()) { + for (AbstractClientHandle t : openTransactions.values()) { t.localAbort(cause); } openTransactions.clear(); @@ -136,32 +136,41 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia LOG.debug("Create history response {}", response); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + private ProxyHistory ensureHistoryProxy(final TransactionIdentifier transactionId, final Long shard) { while (true) { - final ProxyHistory history; try { - history = histories.computeIfAbsent(shard, this::createHistoryProxy); + return histories.computeIfAbsent(shard, this::createHistoryProxy); } catch (InversibleLockException e) { LOG.trace("Waiting for transaction {} shard {} connection to resolve", transactionId, shard); e.awaitResolution(); LOG.trace("Retrying transaction {} shard {} connection", transactionId, shard); - continue; } + } + } + + final AbstractProxyTransaction createSnapshotProxy(final TransactionIdentifier transactionId, final Long shard) { + return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, true); + } + + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { + return ensureHistoryProxy(transactionId, shard).createTransactionProxy(transactionId, false); + } - return history.createTransactionProxy(transactionId); + private void checkNotClosed() { + if (state == State.CLOSED) { + throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); } } /** - * Allocate a {@link ClientTransaction}. + * Allocate a new {@link ClientTransaction}. * * @return A new {@link ClientTransaction} * @throws TransactionChainClosedException if this history is closed + * @throws IllegalStateException if a previous dependent transaction has not been closed */ public final ClientTransaction createTransaction() { - if (state == State.CLOSED) { - throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); - } + checkNotClosed(); synchronized (this) { final ClientTransaction ret = doCreateTransaction(); @@ -170,6 +179,26 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } } + /** + * Create a new {@link ClientSnapshot}. + * + * @return A new {@link ClientSnapshot} + * @throws TransactionChainClosedException if this history is closed + * @throws IllegalStateException if a previous dependent transaction has not been closed + */ + public final ClientSnapshot takeSnapshot() { + checkNotClosed(); + + synchronized (this) { + final ClientSnapshot ret = doCreateSnapshot(); + openTransactions.put(ret.getIdentifier(), ret); + return ret; + } + } + + @GuardedBy("this") + abstract ClientSnapshot doCreateSnapshot(); + @GuardedBy("this") abstract ClientTransaction doCreateTransaction(); @@ -179,10 +208,12 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * @param txId Transaction identifier * @param cohort Transaction commit cohort */ - synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, + synchronized AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx, final AbstractTransactionCommitCohort cohort) { - final ClientTransaction tx = openTransactions.remove(txId); - Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId); + final TransactionIdentifier txId = tx.getIdentifier(); + if (openTransactions.remove(txId) == null) { + LOG.warn("Transaction {} not recorded, proceeding with readiness", txId); + } final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort); Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", @@ -196,11 +227,11 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching * backend. * - * @param txId transaction identifier + * @param snapshot transaction identifier */ - synchronized void onTransactionAbort(final TransactionIdentifier txId) { - if (openTransactions.remove(txId) == null) { - LOG.warn("Could not find aborting transaction {}", txId); + synchronized void onTransactionAbort(final AbstractClientHandle snapshot) { + if (openTransactions.remove(snapshot.getIdentifier()) == null) { + LOG.warn("Could not find aborting transaction {}", snapshot.getIdentifier()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java index 3dc4dbf146..202fe5b8fa 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractDataStoreClientBehavior.java @@ -190,6 +190,11 @@ abstract class AbstractDataStoreClientBehavior extends ClientActorBehavior data) { + checkReadWrite(); checkNotSealed(); doMerge(path, data); } final void write(final YangInstanceIdentifier path, final NormalizedNode data) { + checkReadWrite(); checkNotSealed(); doWrite(path, data); } @@ -265,6 +268,12 @@ abstract class AbstractProxyTransaction implements Identifiable req) { successfulRequests.add(Verify.verifyNotNull(req)); } @@ -317,6 +326,7 @@ abstract class AbstractProxyTransaction implements Identifiable directCommit() { + checkReadWrite(); checkSealed(); // Precludes startReconnect() from interfering with the fast path @@ -346,6 +356,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { + checkReadWrite(); checkSealed(); // Precludes startReconnect() from interfering with the fast path @@ -379,6 +390,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { + checkReadWrite(); checkSealed(); final TransactionRequest req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), @@ -398,6 +410,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { + checkReadWrite(); checkSealed(); sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> { @@ -505,6 +518,8 @@ abstract class AbstractProxyTransaction implements Identifiable data); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index ac1872835a..26b03e39b0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -39,38 +39,50 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A } } - @Override - ClientTransaction doCreateTransaction() { + private State ensureIdleState() { final State local = state(); Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local); - updateState(local, State.TX_OPEN); + return local; + } + + @Override + ClientSnapshot doCreateSnapshot() { + ensureIdleState(); + return new ClientSnapshot(this, new TransactionIdentifier(getIdentifier(), nextTx())); + } + @Override + ClientTransaction doCreateTransaction() { + updateState(ensureIdleState(), State.TX_OPEN); return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx())); } @Override - void onTransactionAbort(final TransactionIdentifier txId) { - final State local = state(); - if (local == State.TX_OPEN) { - updateState(local, State.IDLE); + void onTransactionAbort(final AbstractClientHandle snap) { + if (snap instanceof ClientTransaction) { + final State local = state(); + if (local == State.TX_OPEN) { + updateState(local, State.IDLE); + } } - super.onTransactionAbort(txId); + super.onTransactionAbort(snap); } @Override - AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, + AbstractTransactionCommitCohort onTransactionReady(final ClientTransaction tx, final AbstractTransactionCommitCohort cohort) { + final State local = state(); switch (local) { case CLOSED: - return super.onTransactionReady(txId, cohort); + return super.onTransactionReady(tx, cohort); case IDLE: throw new IllegalStateException(String.format("Local history %s is idle when readying transaction %s", - this, txId)); + this, tx.getIdentifier())); case TX_OPEN: updateState(local, State.IDLE); - return super.onTransactionReady(txId, cohort); + return super.onTransactionReady(tx, cohort); default: throw new IllegalStateException(String.format("Local history %s in unhandled state %s", this, local)); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java new file mode 100644 index 0000000000..482d1a5b03 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientSnapshot.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.annotations.Beta; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.CheckedFuture; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.mdsal.common.api.ReadFailedException; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Snapshot of the datastore state. Note this snapshot is not consistent across shards because sub-shard snapshots are + * created lazily. + * + * @author Robert Varga + */ +@Beta +public class ClientSnapshot extends AbstractClientHandle { + // Hidden to prevent outside instantiation + ClientSnapshot(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { + super(parent, transactionId); + } + + private AbstractProxyTransaction createProxy(final Long shard) { + return parent().createSnapshotProxy(getIdentifier(), shard); + } + + private AbstractProxyTransaction ensureSnapshotProxy(final YangInstanceIdentifier path) { + return ensureProxy(path, this::createProxy); + } + + public final CheckedFuture exists(final YangInstanceIdentifier path) { + return ensureSnapshotProxy(path).exists(path); + } + + public final CheckedFuture>, ReadFailedException> read( + final YangInstanceIdentifier path) { + return ensureSnapshotProxy(path).read(path); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index abb1345269..334ab71d58 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -12,20 +12,15 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.CheckedFuture; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.Collection; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort; -import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Client-side view of a free-standing transaction. + * Client-side view of a transaction. * *

* This interface is used by the world outside of the actor system and in the actor system it is manifested via @@ -55,127 +50,61 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @Beta -public final class ClientTransaction extends LocalAbortable implements Identifiable { - private static final Logger LOG = LoggerFactory.getLogger(ClientTransaction.class); - private static final AtomicIntegerFieldUpdater STATE_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(ClientTransaction.class, "state"); - private static final int OPEN_STATE = 0; - private static final int CLOSED_STATE = 1; - - private final Map proxies = new ConcurrentHashMap<>(); - private final TransactionIdentifier transactionId; - private final AbstractClientHistory parent; - - private volatile int state = OPEN_STATE; +public final class ClientTransaction extends AbstractClientHandle { ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { - this.transactionId = Preconditions.checkNotNull(transactionId); - this.parent = Preconditions.checkNotNull(parent); + super(parent, transactionId); } - private void checkNotClosed() { - Preconditions.checkState(state == OPEN_STATE, "Transaction %s is closed", transactionId); - } private AbstractProxyTransaction createProxy(final Long shard) { - return parent.createTransactionProxy(transactionId, shard); - } - - private AbstractProxyTransaction ensureProxy(final YangInstanceIdentifier path) { - checkNotClosed(); - - final Long shard = parent.resolveShardForPath(path); - return proxies.computeIfAbsent(shard, this::createProxy); + return parent().createTransactionProxy(getIdentifier(), shard); } - @Override - public TransactionIdentifier getIdentifier() { - return transactionId; + private AbstractProxyTransaction ensureTransactionProxy(final YangInstanceIdentifier path) { + return ensureProxy(path, this::createProxy); } public CheckedFuture exists(final YangInstanceIdentifier path) { - return ensureProxy(path).exists(path); + return ensureTransactionProxy(path).exists(path); } - public CheckedFuture>, ReadFailedException> read(final YangInstanceIdentifier path) { - return ensureProxy(path).read(path); + public CheckedFuture>, ReadFailedException> read( + final YangInstanceIdentifier path) { + return ensureTransactionProxy(path).read(path); } public void delete(final YangInstanceIdentifier path) { - ensureProxy(path).delete(path); + ensureTransactionProxy(path).delete(path); } public void merge(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).merge(path, data); + ensureTransactionProxy(path).merge(path, data); } public void write(final YangInstanceIdentifier path, final NormalizedNode data) { - ensureProxy(path).write(path, data); - } - - private boolean ensureClosed() { - final int local = state; - if (local == CLOSED_STATE) { - return false; - } - - final boolean success = STATE_UPDATER.compareAndSet(this, OPEN_STATE, CLOSED_STATE); - Preconditions.checkState(success, "Transaction %s raced during close", this); - return true; + ensureTransactionProxy(path).write(path, data); } public DOMStoreThreePhaseCommitCohort ready() { - Preconditions.checkState(ensureClosed(), "Attempted to submit a closed transaction %s", this); - - for (AbstractProxyTransaction p : proxies.values()) { - p.seal(); - } + final Collection toReady = ensureClosed(); + Preconditions.checkState(toReady != null, "Attempted to submit a closed transaction %s", this); + toReady.forEach(AbstractProxyTransaction::seal); final AbstractTransactionCommitCohort cohort; - switch (proxies.size()) { + switch (toReady.size()) { case 0: - cohort = new EmptyTransactionCommitCohort(parent, transactionId); + cohort = new EmptyTransactionCommitCohort(parent(), getIdentifier()); break; case 1: - cohort = new DirectTransactionCommitCohort(parent, transactionId, - Iterables.getOnlyElement(proxies.values())); + cohort = new DirectTransactionCommitCohort(parent(), getIdentifier(), + Iterables.getOnlyElement(toReady)); break; default: - cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values()); + cohort = new ClientTransactionCommitCohort(parent(), getIdentifier(), toReady); break; } - return parent.onTransactionReady(transactionId, cohort); - } - - /** - * Release all state associated with this transaction. - */ - public void abort() { - if (commonAbort()) { - parent.onTransactionAbort(transactionId); - } - } - - private boolean commonAbort() { - if (!ensureClosed()) { - return false; - } - - for (AbstractProxyTransaction proxy : proxies.values()) { - proxy.abort(); - } - proxies.clear(); - return true; - } - - @Override - void localAbort(final Throwable cause) { - LOG.debug("Local abort of transaction {}", getIdentifier(), cause); - commonAbort(); - } - - Map getProxies() { - return proxies; + return parent().onTransactionReady(this, cohort); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java index 63dc87b4b5..2032b54984 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DataStoreClient.java @@ -36,6 +36,13 @@ public interface DataStoreClient extends Identifiable, AutoClo */ @Nonnull ClientLocalHistory createLocalHistory(); + /** + * Create a new free-standing snapshot. + * + * @return Client snapshot handle + */ + @Nonnull ClientSnapshot createSnapshot(); + /** * Create a new free-standing transaction. * diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index e9941179c7..f75d443fc7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -9,7 +9,6 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import java.util.function.Consumer; @@ -20,17 +19,9 @@ import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactio import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionDelete; -import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionMerge; -import org.opendaylight.controller.cluster.access.commands.TransactionModification; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; @@ -38,9 +29,7 @@ import org.opendaylight.mdsal.common.api.ReadFailedException; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,161 +50,56 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ @NotThreadSafe -final class LocalProxyTransaction extends AbstractProxyTransaction { +abstract class LocalProxyTransaction extends AbstractProxyTransaction { private static final Logger LOG = LoggerFactory.getLogger(LocalProxyTransaction.class); private final TransactionIdentifier identifier; - private CursorAwareDataTreeModification modification; - private CursorAwareDataTreeModification sealedModification; - - LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, - final CursorAwareDataTreeModification modification) { + LocalProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { super(parent); this.identifier = Preconditions.checkNotNull(identifier); - this.modification = Preconditions.checkNotNull(modification); } @Override - public TransactionIdentifier getIdentifier() { + public final TransactionIdentifier getIdentifier() { return identifier; } - @Override - void doDelete(final YangInstanceIdentifier path) { - modification.delete(path); - } + abstract DataTreeSnapshot readOnlyView(); - @Override - void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.merge(path, data); - } + abstract void applyModifyTransactionRequest(ModifyTransactionRequest request, + @Nullable Consumer> callback); @Override - void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { - modification.write(path, data); + final CheckedFuture doExists(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(readOnlyView().readNode(path).isPresent()); } @Override - CheckedFuture doExists(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path).isPresent()); + final CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { + return Futures.immediateCheckedFuture(readOnlyView().readNode(path)); } @Override - CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { - return Futures.immediateCheckedFuture(modification.readNode(path)); - } - - private RuntimeException abortedException() { - return new IllegalStateException("Tracker " + identifier + " has been aborted"); - } - - private RuntimeException submittedException() { - return new IllegalStateException("Tracker " + identifier + " has been submitted"); - } - - @Override - void doAbort() { + final void doAbort() { sendAbort(new AbortLocalTransactionRequest(identifier, localActor()), response -> { LOG.debug("Transaction {} abort completed with {}", identifier, response); }); } - @Override - CommitLocalTransactionRequest commitRequest(final boolean coordinated) { - final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(), - localActor(), modification, coordinated); - modification = new FailedDataTreeModification(this::submittedException); - return ret; - } - - @Override - void doSeal() { - modification.ready(); - sealedModification = modification; - } - - @Override - void flushState(final AbstractProxyTransaction successor) { - sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { - @Override - public void write(final PathArgument child, final NormalizedNode data) { - successor.write(current().node(child), data); - } - - @Override - public void merge(final PathArgument child, final NormalizedNode data) { - successor.merge(current().node(child), data); - } - - @Override - public void delete(final PathArgument child) { - successor.delete(current().node(child)); - } - }); - } - - DataTreeSnapshot getSnapshot() { - Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", identifier); - return sealedModification; - } - - private void applyModifyTransactionRequest(final ModifyTransactionRequest request, - final @Nullable Consumer> callback) { - for (TransactionModification mod : request.getModifications()) { - if (mod instanceof TransactionWrite) { - modification.write(mod.getPath(), ((TransactionWrite)mod).getData()); - } else if (mod instanceof TransactionMerge) { - modification.merge(mod.getPath(), ((TransactionMerge)mod).getData()); - } else if (mod instanceof TransactionDelete) { - modification.delete(mod.getPath()); - } else { - throw new IllegalArgumentException("Unsupported modification " + mod); - } - } - - final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); - if (maybeProtocol.isPresent()) { - seal(); - Verify.verify(callback != null, "Request {} has null callback", request); - - switch (maybeProtocol.get()) { - case ABORT: - sendAbort(callback); - break; - case SIMPLE: - sendRequest(commitRequest(false), callback); - break; - case THREE_PHASE: - sendRequest(commitRequest(true), callback); - break; - default: - throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); - } - } - } - @Override void handleForwardedRemoteRequest(final TransactionRequest request, final @Nullable Consumer> callback) { - LOG.debug("Applying forwarded request {}", request); - if (request instanceof ModifyTransactionRequest) { applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); } else if (request instanceof ReadTransactionRequest) { final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); - final Optional> result = modification.readNode(path); + final Optional> result = readOnlyView().readNode(path); callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); } else if (request instanceof ExistsTransactionRequest) { final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); - final boolean result = modification.readNode(path).isPresent(); + final boolean result = readOnlyView().readNode(path).isPresent(); callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); - } else if (request instanceof TransactionPreCommitRequest) { - sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); - } else if (request instanceof TransactionDoCommitRequest) { - sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); - } else if (request instanceof TransactionAbortRequest) { - sendAbort(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } @@ -263,8 +147,6 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { final Consumer> callback) { if (request instanceof AbortLocalTransactionRequest) { successor.sendAbort(request, callback); - } else if (request instanceof CommitLocalTransactionRequest) { - successor.sendCommit((CommitLocalTransactionRequest)request, callback); } else { throw new IllegalArgumentException("Unhandled request" + request); } @@ -272,18 +154,7 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { LOG.debug("Forwarded request {} to successor {}", request, successor); } - private void sendAbort(final TransactionRequest request, final Consumer> callback) { + void sendAbort(final TransactionRequest request, final Consumer> callback) { sendRequest(request, callback); - modification = new FailedDataTreeModification(this::abortedException); - } - - private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { - // Rebase old modification on new data tree. - try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) { - request.getModification().applyToCursor(cursor); - } - - seal(); - sendRequest(commitRequest(request.isCoordinated()), callback); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java new file mode 100644 index 0000000000..eb2362d88d --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import java.util.function.Consumer; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A read-only specialization of {@link LocalProxyTransaction}. + * + * @author Robert Varga + */ +@NotThreadSafe +final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction { + private static final Logger LOG = LoggerFactory.getLogger(LocalReadOnlyProxyTransaction.class); + + private final DataTreeSnapshot snapshot; + + LocalReadOnlyProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final DataTreeSnapshot snapshot) { + super(parent, identifier); + this.snapshot = Preconditions.checkNotNull(snapshot); + } + + @Override + boolean isSnapshotOnly() { + return true; + } + + @Override + DataTreeSnapshot readOnlyView() { + return snapshot; + } + + @Override + void doDelete(final YangInstanceIdentifier path) { + throw new UnsupportedOperationException("Read-only snapshot"); + } + + @Override + void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { + throw new UnsupportedOperationException("Read-only snapshot"); + } + + @Override + void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { + throw new UnsupportedOperationException("Read-only snapshot"); + } + + @Override + CommitLocalTransactionRequest commitRequest(final boolean coordinated) { + throw new UnsupportedOperationException("Read-only snapshot"); + } + + @Override + void doSeal() { + // No-op + } + + @Override + void flushState(final AbstractProxyTransaction successor) { + // No-op + } + + @Override + void applyModifyTransactionRequest(final ModifyTransactionRequest request, + final Consumer> callback) { + Verify.verify(request.getModifications().isEmpty()); + + final PersistenceProtocol protocol = request.getPersistenceProtocol().get(); + Verify.verify(protocol == PersistenceProtocol.ABORT); + abort(); + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; + final DataTreeModification mod = req.getModification(); + + LOG.debug("Applying modification {} to successor {}", mod, successor); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + + successor.seal(); + + final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); + successor.sendRequest(successorReq, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + LOG.debug("Forwarding abort {} to successor {}", request, successor); + successor.abort(); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java new file mode 100644 index 0000000000..8195f8dcca --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java @@ -0,0 +1,257 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import java.util.function.Consumer; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionMerge; +import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.Response; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.datastore.util.AbstractDataTreeModificationCursor; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.PathArgument; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModificationCursor; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An {@link AbstractProxyTransaction} for dispatching a transaction towards a shard leader which is co-located with + * the client instance. + * + *

+ * It requires a {@link DataTreeSnapshot}, which is used to instantiated a new {@link DataTreeModification}. Operations + * are then performed on this modification and once the transaction is submitted, the modification is sent to the shard + * leader. + * + *

+ * This class is not thread-safe as usual with transactions. Since it does not interact with the backend until the + * transaction is submitted, at which point this class gets out of the picture, this is not a cause for concern. + * + * @author Robert Varga + */ +@NotThreadSafe +final class LocalReadWriteProxyTransaction extends LocalProxyTransaction { + private static final Logger LOG = LoggerFactory.getLogger(LocalReadWriteProxyTransaction.class); + + private CursorAwareDataTreeModification modification; + private CursorAwareDataTreeModification sealedModification; + + LocalReadWriteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final DataTreeSnapshot snapshot) { + super(parent, identifier); + this.modification = (CursorAwareDataTreeModification) snapshot.newModification(); + } + + @Override + boolean isSnapshotOnly() { + return false; + } + + @Override + CursorAwareDataTreeSnapshot readOnlyView() { + return modification; + } + + @Override + void doDelete(final YangInstanceIdentifier path) { + modification.delete(path); + } + + @Override + void doMerge(final YangInstanceIdentifier path, final NormalizedNode data) { + modification.merge(path, data); + } + + @Override + void doWrite(final YangInstanceIdentifier path, final NormalizedNode data) { + modification.write(path, data); + } + + private RuntimeException abortedException() { + return new IllegalStateException("Tracker " + getIdentifier() + " has been aborted"); + } + + private RuntimeException submittedException() { + return new IllegalStateException("Tracker " + getIdentifier() + " has been submitted"); + } + + @Override + CommitLocalTransactionRequest commitRequest(final boolean coordinated) { + final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(getIdentifier(), nextSequence(), + localActor(), modification, coordinated); + modification = new FailedDataTreeModification(this::submittedException); + return ret; + } + + @Override + void doSeal() { + modification.ready(); + sealedModification = modification; + } + + @Override + void flushState(final AbstractProxyTransaction successor) { + sealedModification.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + } + + DataTreeSnapshot getSnapshot() { + Preconditions.checkState(sealedModification != null, "Proxy %s is not sealed yet", getIdentifier()); + return sealedModification; + } + + @Override + void applyModifyTransactionRequest(final ModifyTransactionRequest request, + final @Nullable Consumer> callback) { + for (TransactionModification mod : request.getModifications()) { + if (mod instanceof TransactionWrite) { + modification.write(mod.getPath(), ((TransactionWrite)mod).getData()); + } else if (mod instanceof TransactionMerge) { + modification.merge(mod.getPath(), ((TransactionMerge)mod).getData()); + } else if (mod instanceof TransactionDelete) { + modification.delete(mod.getPath()); + } else { + throw new IllegalArgumentException("Unsupported modification " + mod); + } + } + + final java.util.Optional maybeProtocol = request.getPersistenceProtocol(); + if (maybeProtocol.isPresent()) { + seal(); + Verify.verify(callback != null, "Request {} has null callback", request); + + switch (maybeProtocol.get()) { + case ABORT: + sendAbort(callback); + break; + case SIMPLE: + sendRequest(commitRequest(false), callback); + break; + case THREE_PHASE: + sendRequest(commitRequest(true), callback); + break; + default: + throw new IllegalArgumentException("Unhandled protocol " + maybeProtocol.get()); + } + } + } + + @Override + void handleForwardedRemoteRequest(final TransactionRequest request, + final @Nullable Consumer> callback) { + LOG.debug("Applying forwarded request {}", request); + + if (request instanceof TransactionPreCommitRequest) { + sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionDoCommitRequest) { + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionAbortRequest) { + sendAbort(callback); + } else { + super.handleForwardedRemoteRequest(request, callback); + } + } + + @Override + void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + final CommitLocalTransactionRequest req = (CommitLocalTransactionRequest) request; + final DataTreeModification mod = req.getModification(); + + LOG.debug("Applying modification {} to successor {}", mod, successor); + mod.applyToCursor(new AbstractDataTreeModificationCursor() { + @Override + public void write(final PathArgument child, final NormalizedNode data) { + successor.write(current().node(child), data); + } + + @Override + public void merge(final PathArgument child, final NormalizedNode data) { + successor.merge(current().node(child), data); + } + + @Override + public void delete(final PathArgument child) { + successor.delete(current().node(child)); + } + }); + + successor.seal(); + + final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated()); + successor.sendRequest(successorReq, callback); + } else if (request instanceof AbortLocalTransactionRequest) { + LOG.debug("Forwarding abort {} to successor {}", request, successor); + successor.abort(); + } else { + throw new IllegalArgumentException("Unhandled request" + request); + } + } + + @Override + void forwardToLocal(final LocalProxyTransaction successor, final TransactionRequest request, + final Consumer> callback) { + if (request instanceof CommitLocalTransactionRequest) { + Verify.verify(successor instanceof LocalReadWriteProxyTransaction); + ((LocalReadWriteProxyTransaction) successor).sendCommit((CommitLocalTransactionRequest)request, callback); + LOG.debug("Forwarded request {} to successor {}", request, successor); + } else { + super.forwardToLocal(successor, request, callback); + } + } + + @Override + void sendAbort(final TransactionRequest request, final Consumer> callback) { + super.sendAbort(request, callback); + modification = new FailedDataTreeModification(this::abortedException); + } + + private void sendCommit(final CommitLocalTransactionRequest request, final Consumer> callback) { + // Rebase old modification on new data tree. + try (DataTreeModificationCursor cursor = modification.createCursor(YangInstanceIdentifier.EMPTY)) { + request.getModification().applyToCursor(cursor); + } + + seal(); + sendRequest(commitRequest(request.isCoordinated()), callback); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index b3b604b7f0..846f5c37cf 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -32,7 +32,6 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.CursorAwareDataTreeModification; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; import org.slf4j.Logger; @@ -66,20 +65,21 @@ abstract class ProxyHistory implements Identifiable { @Override final AbstractProxyTransaction doCreateTransactionProxy( - final AbstractClientConnection connection, final TransactionIdentifier txId) { - return new RemoteProxyTransaction(this, txId); + final AbstractClientConnection connection, final TransactionIdentifier txId, + final boolean snapshotOnly) { + return new RemoteProxyTransaction(this, txId, snapshotOnly); } } private static final class Local extends AbstractLocal { - private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalProxyTransaction.class, "lastSealed"); + private static final AtomicReferenceFieldUpdater LAST_SEALED_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(Local.class, LocalReadWriteProxyTransaction.class, "lastSealed"); // Tracks the last open and last sealed transaction. We need to track both in case the user ends up aborting // the open one and attempts to create a new transaction again. - private LocalProxyTransaction lastOpen; + private LocalReadWriteProxyTransaction lastOpen; - private volatile LocalProxyTransaction lastSealed; + private volatile LocalReadWriteProxyTransaction lastSealed; Local(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, final DataTree dataTree) { @@ -88,11 +88,11 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { + final TransactionIdentifier txId, final boolean snapshotOnly) { Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); // onTransactionCompleted() runs concurrently - final LocalProxyTransaction localSealed = lastSealed; + final LocalReadWriteProxyTransaction localSealed = lastSealed; final DataTreeSnapshot baseSnapshot; if (localSealed != null) { baseSnapshot = localSealed.getSnapshot(); @@ -100,8 +100,11 @@ abstract class ProxyHistory implements Identifiable { baseSnapshot = takeSnapshot(); } - lastOpen = new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) baseSnapshot.newModification()); + if (snapshotOnly) { + return new LocalReadOnlyProxyTransaction(this, txId, baseSnapshot); + } + + lastOpen = new LocalReadWriteProxyTransaction(this, txId, baseSnapshot); LOG.debug("Proxy {} open transaction {}", this, lastOpen); return lastOpen; } @@ -113,16 +116,18 @@ abstract class ProxyHistory implements Identifiable { @Override void onTransactionAborted(final AbstractProxyTransaction tx) { - Preconditions.checkState(tx.equals(lastOpen)); - lastOpen = null; + if (tx.equals(lastOpen)) { + lastOpen = null; + } } @Override void onTransactionCompleted(final AbstractProxyTransaction tx) { Verify.verify(tx instanceof LocalProxyTransaction); - - if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalProxyTransaction) tx, null)) { - LOG.debug("Completed last sealed transaction {}", tx); + if (tx instanceof LocalReadWriteProxyTransaction) { + if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) { + LOG.debug("Completed last sealed transaction {}", tx); + } } } @@ -142,9 +147,10 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId) { - return new LocalProxyTransaction(this, txId, - (CursorAwareDataTreeModification) takeSnapshot().newModification()); + final TransactionIdentifier txId, final boolean snapshotOnly) { + final DataTreeSnapshot snapshot = takeSnapshot(); + return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) : + new LocalReadWriteProxyTransaction(this, txId, snapshot); } @Override @@ -212,7 +218,8 @@ abstract class ProxyHistory implements Identifiable { for (AbstractProxyTransaction t : proxies.values()) { LOG.debug("{} creating successor transaction proxy for {}", identifier, t); - final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); + final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(), + t.isSnapshotOnly()); LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); t.replayMessages(newProxy, previousEntries); } @@ -311,15 +318,16 @@ abstract class ProxyHistory implements Identifiable { return connection.localActor(); } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, + final boolean snapshotOnly) { lock.lock(); try { if (successor != null) { - return successor.createTransactionProxy(txId); + return successor.createTransactionProxy(txId, snapshotOnly); } final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); - final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId); + final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly); proxies.put(proxyId, ret); LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); return ret; @@ -356,7 +364,7 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, - TransactionIdentifier txId); + TransactionIdentifier txId, boolean snapshotOnly); abstract ProxyHistory createSuccessor(AbstractClientConnection connection); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 783096b7bf..1429ec5a78 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -63,16 +63,24 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { private static final int REQUEST_MAX_MODIFICATIONS = 1000; private final ModifyTransactionRequestBuilder builder; + private final boolean snapshotOnly; private boolean builderBusy; private volatile Exception operationFailure; - RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier) { + RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier, + final boolean snapshotOnly) { super(parent); + this.snapshotOnly = snapshotOnly; builder = new ModifyTransactionRequestBuilder(identifier, localActor()); } + @Override + boolean isSnapshotOnly() { + return snapshotOnly; + } + @Override public TransactionIdentifier getIdentifier() { return builder.getIdentifier(); @@ -110,15 +118,15 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { @Override CheckedFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); - return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), - t -> completeExists(future, t), future); + return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeExists(future, t), future); } @Override CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); - return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), - t -> completeRead(future, t), future); + return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path, + isSnapshotOnly()), t -> completeRead(future, t), future); } @Override @@ -302,11 +310,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else if (request instanceof ReadTransactionRequest) { ensureFlushedBuider(); sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ReadTransactionRequest) request).getPath()), callback); + ((ReadTransactionRequest) request).getPath(), isSnapshotOnly()), callback); } else if (request instanceof ExistsTransactionRequest) { ensureFlushedBuider(); sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), - ((ExistsTransactionRequest) request).getPath()), callback); + ((ExistsTransactionRequest) request).getPath(), isSnapshotOnly()), callback); } else if (request instanceof TransactionPreCommitRequest) { ensureFlushedBuider(); sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java index c04c9c5071..8220bfaaef 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java @@ -25,6 +25,14 @@ final class SingleClientHistory extends AbstractClientHistory { super(client, identifier); } + @Override + ClientSnapshot doCreateSnapshot() { + final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx()); + LOG.debug("{}: creating a new snapshot {}", this, txId); + + return new ClientSnapshot(this, txId); + } + @Override ClientTransaction doCreateTransaction() { final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 1adca56af2..7c2ddb05b2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; +import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; @@ -67,14 +68,7 @@ abstract class AbstractFrontendHistory implements Identifiable> maybeReplay = tx.replaySequence(request.getSequence()); @@ -88,6 +82,25 @@ abstract class AbstractFrontendHistory implements Identifiable request, final TransactionIdentifier id) + throws RequestException { + if (request instanceof CommitLocalTransactionRequest) { + LOG.debug("{}: allocating new ready transaction {}", persistenceId(), id); + return createReadyTransaction(id, ((CommitLocalTransactionRequest) request).getModification()); + } + if (request instanceof AbstractReadTransactionRequest) { + if (((AbstractReadTransactionRequest) request).isSnapshotOnly()) { + LOG.debug("{}: allocatint new open snapshot {}", persistenceId(), id); + return createOpenSnapshot(id); + } + } + + LOG.debug("{}: allocating new open transaction {}", persistenceId(), id); + return createOpenTransaction(id); + } + + abstract FrontendTransaction createOpenSnapshot(TransactionIdentifier id) throws RequestException; + abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException; abstract FrontendTransaction createReadyTransaction(TransactionIdentifier id, DataTreeModification mod) diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java new file mode 100644 index 0000000000..36a876be66 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadOnlyTransaction.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +/** + * Read-only frontend transaction state as observed by the shard leader. + * + * @author Robert Varga + */ +@NotThreadSafe +final class FrontendReadOnlyTransaction extends FrontendTransaction { + private final ReadOnlyShardDataTreeTransaction openTransaction; + + private FrontendReadOnlyTransaction(final AbstractFrontendHistory history, + final ReadOnlyShardDataTreeTransaction transaction) { + super(history, transaction.getIdentifier()); + this.openTransaction = Preconditions.checkNotNull(transaction); + } + + static FrontendReadOnlyTransaction create(final AbstractFrontendHistory history, + final ReadOnlyShardDataTreeTransaction transaction) { + return new FrontendReadOnlyTransaction(history, transaction); + } + + // Sequence has already been checked + @Override + @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + final long now) throws RequestException { + if (request instanceof ExistsTransactionRequest) { + return handleExistsTransaction((ExistsTransactionRequest) request); + } else if (request instanceof ReadTransactionRequest) { + return handleReadTransaction((ReadTransactionRequest) request); + } else if (request instanceof TransactionAbortRequest) { + return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + } else { + throw new UnsupportedRequestException(request); + } + } + + private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + openTransaction.abort(); + return new TransactionAbortSuccess(openTransaction.getIdentifier(), request.getSequence()); + } + + private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) + throws RequestException { + final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(openTransaction.getIdentifier(), + request.getSequence(), data.isPresent())); + } + + private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) + throws RequestException { + final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + return recordSuccess(request.getSequence(), new ReadTransactionSuccess(openTransaction.getIdentifier(), + request.getSequence(), data)); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java new file mode 100644 index 0000000000..79d555c856 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java @@ -0,0 +1,310 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.primitives.UnsignedLong; +import com.google.common.util.concurrent.FutureCallback; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionMerge; +import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionRequest; +import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionWrite; +import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; +import org.opendaylight.controller.cluster.access.concepts.RequestException; +import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Frontend read-write transaction state as observed by the shard leader. + * + * @author Robert Varga + */ +@NotThreadSafe +final class FrontendReadWriteTransaction extends FrontendTransaction { + private static final Logger LOG = LoggerFactory.getLogger(FrontendReadWriteTransaction.class); + + private ReadWriteShardDataTreeTransaction openTransaction; + private DataTreeModification sealedModification; + private ShardDataTreeCohort readyCohort; + + private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, + final ReadWriteShardDataTreeTransaction transaction) { + super(history, id); + this.openTransaction = Preconditions.checkNotNull(transaction); + } + + private FrontendReadWriteTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, + final DataTreeModification mod) { + super(history, id); + this.sealedModification = Preconditions.checkNotNull(mod); + } + + static FrontendReadWriteTransaction createOpen(final AbstractFrontendHistory history, + final ReadWriteShardDataTreeTransaction transaction) { + return new FrontendReadWriteTransaction(history, transaction.getIdentifier(), transaction); + } + + static FrontendReadWriteTransaction createReady(final AbstractFrontendHistory history, + final TransactionIdentifier id, final DataTreeModification mod) { + return new FrontendReadWriteTransaction(history, id, mod); + } + + // Sequence has already been checked + @Override + @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + final long now) throws RequestException { + if (request instanceof ModifyTransactionRequest) { + return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); + } else if (request instanceof CommitLocalTransactionRequest) { + handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now); + return null; + } else if (request instanceof ExistsTransactionRequest) { + return handleExistsTransaction((ExistsTransactionRequest) request); + } else if (request instanceof ReadTransactionRequest) { + return handleReadTransaction((ReadTransactionRequest) request); + } else if (request instanceof TransactionPreCommitRequest) { + handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now); + return null; + } else if (request instanceof TransactionDoCommitRequest) { + handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); + return null; + } else if (request instanceof TransactionAbortRequest) { + return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); + } else { + throw new UnsupportedRequestException(request); + } + } + + private void handleTransactionPreCommit(final TransactionPreCommitRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + readyCohort.preCommit(new FutureCallback() { + @Override + public void onSuccess(final DataTreeCandidate result) { + recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), + request.getSequence())); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure)); + readyCohort = null; + } + }); + } + + private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope, + final long now) throws RequestException { + readyCohort.commit(new FutureCallback() { + @Override + public void onSuccess(final UnsignedLong result) { + successfulCommit(envelope, now); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure)); + readyCohort = null; + } + }); + } + + private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + if (readyCohort == null) { + openTransaction.abort(); + return new TransactionAbortSuccess(getIdentifier(), request.getSequence()); + } + + readyCohort.abort(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + readyCohort = null; + recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(getIdentifier(), + request.getSequence())); + LOG.debug("Transaction {} aborted", getIdentifier()); + } + + @Override + public void onFailure(final Throwable failure) { + readyCohort = null; + LOG.warn("Transaction {} abort failed", getIdentifier(), failure); + recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); + } + }); + return null; + } + + private void coordinatedCommit(final RequestEnvelope envelope, final long now) { + readyCohort.canCommit(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), + envelope.getMessage().getSequence())); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); + readyCohort = null; + } + }); + } + + private void directCommit(final RequestEnvelope envelope, final long now) { + readyCohort.canCommit(new FutureCallback() { + @Override + public void onSuccess(final Void result) { + successfulDirectCanCommit(envelope, now); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); + readyCohort = null; + } + }); + + } + + private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { + readyCohort.preCommit(new FutureCallback() { + @Override + public void onSuccess(final DataTreeCandidate result) { + successfulDirectPreCommit(envelope, startTime); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure)); + readyCohort = null; + } + }); + } + + private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { + readyCohort.commit(new FutureCallback() { + + @Override + public void onSuccess(final UnsignedLong result) { + successfulCommit(envelope, startTime); + } + + @Override + public void onFailure(final Throwable failure) { + recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure)); + readyCohort = null; + } + }); + } + + private void successfulCommit(final RequestEnvelope envelope, final long startTime) { + recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), + envelope.getMessage().getSequence())); + readyCohort = null; + } + + private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + if (sealedModification.equals(request.getModification())) { + readyCohort = history().createReadyCohort(getIdentifier(), sealedModification); + + if (request.isCoordinated()) { + coordinatedCommit(envelope, now); + } else { + directCommit(envelope, now); + } + } else { + throw new UnsupportedRequestException(request); + } + } + + private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) + throws RequestException { + final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(getIdentifier(), request.getSequence(), + data.isPresent())); + } + + private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) + throws RequestException { + final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); + return recordSuccess(request.getSequence(), new ReadTransactionSuccess(getIdentifier(), request.getSequence(), + data)); + } + + private ModifyTransactionSuccess replyModifySuccess(final long sequence) { + return recordSuccess(sequence, new ModifyTransactionSuccess(getIdentifier(), sequence)); + } + + private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException { + + final DataTreeModification modification = openTransaction.getSnapshot(); + for (TransactionModification m : request.getModifications()) { + if (m instanceof TransactionDelete) { + modification.delete(m.getPath()); + } else if (m instanceof TransactionWrite) { + modification.write(m.getPath(), ((TransactionWrite) m).getData()); + } else if (m instanceof TransactionMerge) { + modification.merge(m.getPath(), ((TransactionMerge) m).getData()); + } else { + LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m); + } + } + + final java.util.Optional maybeProto = request.getPersistenceProtocol(); + if (!maybeProto.isPresent()) { + return replyModifySuccess(request.getSequence()); + } + + switch (maybeProto.get()) { + case ABORT: + openTransaction.abort(); + openTransaction = null; + return replyModifySuccess(request.getSequence()); + case SIMPLE: + readyCohort = openTransaction.ready(); + openTransaction = null; + directCommit(envelope, now); + return null; + case THREE_PHASE: + readyCohort = openTransaction.ready(); + openTransaction = null; + coordinatedCommit(envelope, now); + return null; + default: + throw new UnsupportedRequestException(request); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java index 9240aab26c..e4dd00b602 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java @@ -7,58 +7,29 @@ */ package org.opendaylight.controller.cluster.datastore; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import com.google.common.primitives.UnsignedLong; -import com.google.common.util.concurrent.FutureCallback; import java.util.ArrayDeque; import java.util.Iterator; import java.util.Queue; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; -import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ModifyTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException; -import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; -import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; -import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionCommitSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionDelete; -import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionMerge; -import org.opendaylight.controller.cluster.access.commands.TransactionModification; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; -import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; -import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope; import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeModification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.opendaylight.yangtools.concepts.Identifiable; /** - * Frontend transaction state as observed by the shard leader. + * Frontend common transaction state as observed by the shard leader. * * @author Robert Varga */ @NotThreadSafe -final class FrontendTransaction { - private static final Logger LOG = LoggerFactory.getLogger(FrontendTransaction.class); - +abstract class FrontendTransaction implements Identifiable { private final AbstractFrontendHistory history; private final TransactionIdentifier id; @@ -72,35 +43,21 @@ final class FrontendTransaction { private Long lastPurgedSequence; private long expectedSequence; - private ReadWriteShardDataTreeTransaction openTransaction; - private DataTreeModification sealedModification; - private ShardDataTreeCohort readyCohort; - - private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, - final ReadWriteShardDataTreeTransaction transaction) { + FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id) { this.history = Preconditions.checkNotNull(history); this.id = Preconditions.checkNotNull(id); - this.openTransaction = Preconditions.checkNotNull(transaction); } - private FrontendTransaction(final AbstractFrontendHistory history, final TransactionIdentifier id, - final DataTreeModification mod) { - this.history = Preconditions.checkNotNull(history); - this.id = Preconditions.checkNotNull(id); - this.sealedModification = Preconditions.checkNotNull(mod); + @Override + public final TransactionIdentifier getIdentifier() { + return id; } - static FrontendTransaction createOpen(final AbstractFrontendHistory history, - final ReadWriteShardDataTreeTransaction transaction) { - return new FrontendTransaction(history, transaction.getIdentifier(), transaction); + final AbstractFrontendHistory history() { + return history; } - static FrontendTransaction createReady(final AbstractFrontendHistory history, final TransactionIdentifier id, - final DataTreeModification mod) { - return new FrontendTransaction(history, id, mod); - } - - java.util.Optional> replaySequence(final long sequence) throws RequestException { + final java.util.Optional> replaySequence(final long sequence) throws RequestException { // Fast path check: if the requested sequence is the next request, bail early if (expectedSequence == sequence) { return java.util.Optional.empty(); @@ -144,36 +101,15 @@ final class FrontendTransaction { return java.util.Optional.empty(); } - void purgeSequencesUpTo(final long sequence) { + final void purgeSequencesUpTo(final long sequence) { // FIXME: implement this lastPurgedSequence = sequence; } // Sequence has already been checked - @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, - final long now) throws RequestException { - if (request instanceof ModifyTransactionRequest) { - return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); - } else if (request instanceof CommitLocalTransactionRequest) { - handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now); - return null; - } else if (request instanceof ExistsTransactionRequest) { - return handleExistsTransaction((ExistsTransactionRequest) request); - } else if (request instanceof ReadTransactionRequest) { - return handleReadTransaction((ReadTransactionRequest) request); - } else if (request instanceof TransactionPreCommitRequest) { - handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now); - return null; - } else if (request instanceof TransactionDoCommitRequest) { - handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); - return null; - } else if (request instanceof TransactionAbortRequest) { - return handleTransactionAbort((TransactionAbortRequest) request, envelope, now); - } else { - throw new UnsupportedRequestException(request); - } - } + abstract @Nullable TransactionSuccess handleRequest(final TransactionRequest request, + final RequestEnvelope envelope, final long now) throws RequestException; private void recordResponse(final long sequence, final Object response) { if (replayQueue.isEmpty()) { @@ -183,7 +119,7 @@ final class FrontendTransaction { expectedSequence++; } - private > T recordSuccess(final long sequence, final T success) { + final > T recordSuccess(final long sequence, final T success) { recordResponse(sequence, success); return success; } @@ -192,215 +128,15 @@ final class FrontendTransaction { return history.readTime() - startTime; } - private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime, + final void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime, final TransactionSuccess success) { recordResponse(success.getSequence(), success); envelope.sendSuccess(success, executionTime(startTime)); } - private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime, + final void recordAndSendFailure(final RequestEnvelope envelope, final long startTime, final RuntimeRequestException failure) { recordResponse(envelope.getMessage().getSequence(), failure); envelope.sendFailure(failure, executionTime(startTime)); } - - private void handleTransactionPreCommit(final TransactionPreCommitRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - readyCohort.preCommit(new FutureCallback() { - @Override - public void onSuccess(final DataTreeCandidate result) { - recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), - request.getSequence())); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure)); - readyCohort = null; - } - }); - } - - private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope, - final long now) throws RequestException { - readyCohort.commit(new FutureCallback() { - @Override - public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope, now); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure)); - readyCohort = null; - } - }); - } - - private TransactionSuccess handleTransactionAbort(final TransactionAbortRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - if (readyCohort == null) { - openTransaction.abort(); - return new TransactionAbortSuccess(id, request.getSequence()); - } - - readyCohort.abort(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - readyCohort = null; - recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence())); - LOG.debug("Transaction {} aborted", id); - } - - @Override - public void onFailure(final Throwable failure) { - readyCohort = null; - LOG.warn("Transaction {} abort failed", id, failure); - recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); - } - }); - return null; - } - - private void coordinatedCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), - envelope.getMessage().getSequence())); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void directCommit(final RequestEnvelope envelope, final long now) { - readyCohort.canCommit(new FutureCallback() { - @Override - public void onSuccess(final Void result) { - successfulDirectCanCommit(envelope, now); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); - readyCohort = null; - } - }); - - } - - private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.preCommit(new FutureCallback() { - @Override - public void onSuccess(final DataTreeCandidate result) { - successfulDirectPreCommit(envelope, startTime); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { - readyCohort.commit(new FutureCallback() { - - @Override - public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope, startTime); - } - - @Override - public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure)); - readyCohort = null; - } - }); - } - - private void successfulCommit(final RequestEnvelope envelope, final long startTime) { - recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), - envelope.getMessage().getSequence())); - readyCohort = null; - } - - private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - if (sealedModification.equals(request.getModification())) { - readyCohort = history.createReadyCohort(id, sealedModification); - - if (request.isCoordinated()) { - coordinatedCommit(envelope, now); - } else { - directCommit(envelope, now); - } - } else { - throw new UnsupportedRequestException(request); - } - } - - private ExistsTransactionSuccess handleExistsTransaction(final ExistsTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); - return recordSuccess(request.getSequence(), new ExistsTransactionSuccess(id, request.getSequence(), - data.isPresent())); - } - - private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) - throws RequestException { - final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); - return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data)); - } - - private ModifyTransactionSuccess replyModifySuccess(final long sequence) { - return recordSuccess(sequence, new ModifyTransactionSuccess(id, sequence)); - } - - private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, - final RequestEnvelope envelope, final long now) throws RequestException { - - final DataTreeModification modification = openTransaction.getSnapshot(); - for (TransactionModification m : request.getModifications()) { - if (m instanceof TransactionDelete) { - modification.delete(m.getPath()); - } else if (m instanceof TransactionWrite) { - modification.write(m.getPath(), ((TransactionWrite) m).getData()); - } else if (m instanceof TransactionMerge) { - modification.merge(m.getPath(), ((TransactionMerge) m).getData()); - } else { - LOG.warn("{}: ignoring unhandled modification {}", history.persistenceId(), m); - } - } - - final java.util.Optional maybeProto = request.getPersistenceProtocol(); - if (!maybeProto.isPresent()) { - return replyModifySuccess(request.getSequence()); - } - - switch (maybeProto.get()) { - case ABORT: - openTransaction.abort(); - openTransaction = null; - return replyModifySuccess(request.getSequence()); - case SIMPLE: - readyCohort = openTransaction.ready(); - openTransaction = null; - directCommit(envelope, now); - return null; - case THREE_PHASE: - readyCohort = openTransaction.ready(); - openTransaction = null; - coordinatedCommit(envelope, now); - return null; - default: - throw new UnsupportedRequestException(request); - } - } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java index 50d1dc5009..cd0cc30a09 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java @@ -47,11 +47,18 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { return chain.getIdentifier(); } + @Override + FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException { + checkDeadTransaction(id); + lastSeenTransaction = id.getTransactionId(); + return FrontendReadOnlyTransaction.create(this, chain.newReadOnlyTransaction(id)); + } + @Override FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException { checkDeadTransaction(id); lastSeenTransaction = id.getTransactionId(); - return FrontendTransaction.createOpen(this, chain.newReadWriteTransaction(id)); + return FrontendReadWriteTransaction.createOpen(this, chain.newReadWriteTransaction(id)); } @Override @@ -59,7 +66,7 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { throws RequestException { checkDeadTransaction(id); lastSeenTransaction = id.getTransactionId(); - return FrontendTransaction.createReady(this, id, mod); + return FrontendReadWriteTransaction.createReady(this, id, mod); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java index 14b0eecaa2..fe2588d577 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java @@ -37,15 +37,20 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory { return identifier; } + @Override + FrontendTransaction createOpenSnapshot(final TransactionIdentifier id) throws RequestException { + return FrontendReadOnlyTransaction.create(this, tree.newReadOnlyTransaction(id)); + } + @Override FrontendTransaction createOpenTransaction(final TransactionIdentifier id) throws RequestException { - return FrontendTransaction.createOpen(this, tree.newReadWriteTransaction(id)); + return FrontendReadWriteTransaction.createOpen(this, tree.newReadWriteTransaction(id)); } @Override FrontendTransaction createReadyTransaction(final TransactionIdentifier id, final DataTreeModification mod) throws RequestException { - return FrontendTransaction.createReady(this, id, mod); + return FrontendReadWriteTransaction.createReady(this, id, mod); } @Override -- 2.36.6