From: Robert Varga Date: Wed, 31 Aug 2016 09:43:21 +0000 (+0200) Subject: BUG-5280: move transactions keeping to history X-Git-Tag: release/carbon~417 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=cc1ec4a8e2ec99ad7711d0e5e649b34d37d87da0 BUG-5280: move transactions keeping to history Keeping transaction map in directly in DistributedDataStoreClientBehavior is not consistent and will create problems when replaying state during reconnect. This patch moves transaction tracking into AbstractClientHistory, allowing DistributedDataStoreClientBehavior to only track open histories. It also makes locking more consistent, as transaction instantiation is completely encapsulated in the AbstractClientHistory from which it is created. Change-Id: I9fc031437a9d8c33df6f9e7294dd392f58965f3d Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index b164157982..ce2c164b56 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -8,9 +8,13 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; @@ -31,13 +35,24 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class); + private static final AtomicLongFieldUpdater NEXT_TX_UPDATER = + AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx"); private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state"); + @GuardedBy("this") + private final Map openTransactions = new HashMap<>(); + @GuardedBy("this") + private final Map readyTransactions = new HashMap<>(); + private final Map histories = new ConcurrentHashMap<>(); private final DistributedDataStoreClientBehavior client; private final LocalHistoryIdentifier identifier; + // Used via NEXT_TX_UPDATER + @SuppressWarnings("unused") + private volatile long nextTx = 0; + private volatile State state = State.IDLE; AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { @@ -64,29 +79,91 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia return client; } + final long nextTx() { + return NEXT_TX_UPDATER.getAndIncrement(this); + } + @Override final void localAbort(final Throwable cause) { - LOG.debug("Force-closing history {}", getIdentifier(), cause); - state = State.CLOSED; + final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED); + if (oldState != State.CLOSED) { + LOG.debug("Force-closing history {}", getIdentifier(), cause); + + synchronized (this) { + for (ClientTransaction t : openTransactions.values()) { + t.localAbort(cause); + } + openTransactions.clear(); + readyTransactions.clear(); + } + } } private AbstractProxyHistory createHistoryProxy(final Long shard) { - final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(), - identifier.getHistoryId(), shard); - return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId); + return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(), + identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard)); } + abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, + final Optional backendInfo); + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy); return history.createTransactionProxy(transactionId); } + public final ClientTransaction createTransaction() { + Preconditions.checkState(state != State.CLOSED); + + synchronized (this) { + final ClientTransaction ret = doCreateTransaction(); + openTransactions.put(ret.getIdentifier(), ret); + return ret; + } + } + + @GuardedBy("this") + abstract ClientTransaction doCreateTransaction(); + + /** + * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission. + * + * @param txId Transaction identifier + * @param cohort Transaction commit cohort + */ + synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, + final AbstractTransactionCommitCohort cohort) { + final ClientTransaction tx = openTransactions.remove(txId); + Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId); + + final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort); + Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s", + cohort, txId, previous); + + return cohort; + } + + /** + * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching + * backend. + * + * @param txId transaction identifier + */ + synchronized void onTransactionAbort(final TransactionIdentifier txId) { + if (openTransactions.remove(txId) == null) { + LOG.warn("Could not find aborting transaction {}", txId); + } + } + /** - * Callback invoked from {@link ClientTransaction} when a transaction has been submitted. + * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed + * and all its state can be removed. * - * @param transaction Transaction handle + * @param txId transaction identifier */ - void onTransactionReady(final ClientTransaction transaction) { - client.transactionComplete(transaction); + synchronized void onTransactionComplete(final TransactionIdentifier txId) { + if (readyTransactions.remove(txId) == null) { + LOG.warn("Could not find completed transaction {}", txId); + } } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java new file mode 100644 index 0000000000..b8493eae82 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; + +abstract class AbstractLocalProxyHistory extends AbstractProxyHistory { + private final DataTree dataTree; + + AbstractLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier, + final DataTree dataTree) { + super(client, identifier); + this.dataTree = Preconditions.checkNotNull(dataTree); + } + + final DataTreeSnapshot takeSnapshot() { + return dataTree.takeSnapshot(); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java index f21be06f40..d9f3b5f557 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java @@ -16,7 +16,7 @@ import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; /** - * Per-connection representation of a local history. + * Per-connection representation of a local history. This class handles state replication across a single connection. * * @author Robert Varga */ @@ -30,11 +30,18 @@ abstract class AbstractProxyHistory implements Identifiable backendInfo, final LocalHistoryIdentifier identifier) { final Optional dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new LocalProxyHistory(client, identifier, dataTree.get()) - : new RemoteProxyHistory(client, identifier); + return dataTree.isPresent() ? new ClientLocalProxyHistory(client, identifier, dataTree.get()) + : new RemoteProxyHistory(client, identifier); + } + + static AbstractProxyHistory createSingle(final DistributedDataStoreClientBehavior client, + final Optional backendInfo, final LocalHistoryIdentifier identifier) { + final Optional dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree); + return dataTree.isPresent() ? new SingleLocalProxyHistory(client, identifier, dataTree.get()) + : new RemoteProxyHistory(client, identifier); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java index 51b1700773..bd6cb64352 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java @@ -7,8 +7,11 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; /** @@ -21,4 +24,20 @@ abstract class AbstractTransactionCommitCohort implements DOMStoreThreePhaseComm static final ListenableFuture TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE); static final ListenableFuture VOID_FUTURE = Futures.immediateFuture(null); + private final AbstractClientHistory parent; + private final TransactionIdentifier txId; + + AbstractTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) { + this.parent = Preconditions.checkNotNull(parent); + this.txId = Preconditions.checkNotNull(txId); + } + + final void complete() { + parent.onTransactionComplete(txId); + } + + @Override + public final String toString() { + return MoreObjects.toStringHelper(this).add("txId", txId).toString(); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index 807cf98cb7..102d050617 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.annotations.Beta; import com.google.common.base.Preconditions; import com.google.common.base.Verify; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; @@ -27,27 +27,10 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier */ @Beta public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable { - - private static final AtomicLongFieldUpdater NEXT_TX_UPDATER = - AtomicLongFieldUpdater.newUpdater(ClientLocalHistory.class, "nextTx"); - - // Used via NEXT_TX_UPDATER - @SuppressWarnings("unused") - private volatile long nextTx = 0; - ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) { super(client, historyId); } - public ClientTransaction createTransaction() { - final State local = state(); - Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local); - updateState(local, State.TX_OPEN); - - return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), - NEXT_TX_UPDATER.getAndIncrement(this))); - } - @Override public void close() { final State local = state(); @@ -58,10 +41,28 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A } @Override - void onTransactionReady(final ClientTransaction transaction) { + ClientTransaction doCreateTransaction() { + final State local = state(); + Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local); + updateState(local, State.TX_OPEN); + + return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx())); + } + + @Override + AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, + final AbstractTransactionCommitCohort cohort) { + // FIXME: deal with CLOSED here final State local = state(); Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local); updateState(local, State.IDLE); - super.onTransactionReady(transaction); + + return super.onTransactionReady(txId, cohort); + } + + @Override + AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, + final Optional backendInfo) { + return AbstractProxyHistory.createClient(getClient(), backendInfo, historyId); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java similarity index 67% rename from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java rename to opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java index 8ccc4a6353..f06e57cc36 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java @@ -7,17 +7,14 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; -import com.google.common.base.Preconditions; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; -final class LocalProxyHistory extends AbstractProxyHistory { - private final DataTree dataTree; - - LocalProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier, DataTree dataTree) { - super(client, identifier); - this.dataTree = Preconditions.checkNotNull(dataTree); +final class ClientLocalProxyHistory extends AbstractLocalProxyHistory { + ClientLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier, + final DataTree dataTree) { + super(client, identifier, dataTree); } @Override @@ -25,6 +22,6 @@ final class LocalProxyHistory extends AbstractProxyHistory { final TransactionIdentifier txId) { // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure // causality - return new LocalProxyTransaction(client, txId, dataTree.takeSnapshot()); + return new LocalProxyTransaction(client, txId, takeSnapshot()); } } \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 0a1c8be247..81d00ee8bc 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -131,16 +131,22 @@ public final class ClientTransaction extends LocalAbortable implements Identifia for (AbstractProxyTransaction p : proxies.values()) { p.seal(); } - parent.onTransactionReady(this); + final AbstractTransactionCommitCohort cohort; switch (proxies.size()) { case 0: - return EmptyTransactionCommitCohort.INSTANCE; + cohort = new EmptyTransactionCommitCohort(parent, transactionId); + break; case 1: - return new DirectTransactionCommitCohort(Iterables.getOnlyElement(proxies.values())); + cohort = new DirectTransactionCommitCohort(parent, transactionId, + Iterables.getOnlyElement(proxies.values())); + break; default: - return new ClientTransactionCommitCohort(proxies.values()); + cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values()); + break; } + + return parent.onTransactionReady(transactionId, cohort); } /** @@ -152,6 +158,8 @@ public final class ClientTransaction extends LocalAbortable implements Identifia proxy.abort(); } proxies.clear(); + + parent.onTransactionAbort(transactionId); } } @@ -160,4 +168,8 @@ public final class ClientTransaction extends LocalAbortable implements Identifia LOG.debug("Aborting transaction {}", getIdentifier(), cause); abort(); } + + Map getProxies() { + return proxies; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java index a7de89aac3..a4eb5e074f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java @@ -9,13 +9,16 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import java.util.Collection; -import java.util.List; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohort { - private final List proxies; + private final Collection proxies; - ClientTransactionCommitCohort(final Collection proxies) { + ClientTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId, + final Collection proxies) { + super(parent, txId); this.proxies = ImmutableList.copyOf(proxies); } @@ -32,6 +35,11 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor return ret; } + private ListenableFuture addComplete(final ListenableFuture future) { + future.addListener(this::complete, MoreExecutors.directExecutor()); + return future; + } + @Override public ListenableFuture preCommit() { final VotingFuture ret = new VotingFuture<>(null, proxies.size()); @@ -49,7 +57,7 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor proxy.doCommit(ret); } - return ret; + return addComplete(ret); } @Override @@ -59,6 +67,6 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor proxy.abort(ret); } - return ret; + return addComplete(ret); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java index 49b281aa3a..1d5d4a70de 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; /** * An {@link AbstractTransactionCommitCohort} implementation for transactions which contain a single proxy. Since there @@ -19,7 +20,9 @@ import com.google.common.util.concurrent.ListenableFuture; final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohort { private final AbstractProxyTransaction proxy; - DirectTransactionCommitCohort(final AbstractProxyTransaction proxy) { + DirectTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId, + final AbstractProxyTransaction proxy) { + super(parent, txId); this.proxy = Preconditions.checkNotNull(proxy); } @@ -35,11 +38,13 @@ final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohor @Override public ListenableFuture abort() { + complete(); return VOID_FUTURE; } @Override public ListenableFuture commit() { + complete(); return VOID_FUTURE; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index eb1dd17bfd..9940ae57f3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.utils.ActorContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,10 +58,8 @@ import org.slf4j.LoggerFactory; final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient { private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class); - private final Map transactions = new ConcurrentHashMap<>(); private final Map histories = new ConcurrentHashMap<>(); private final AtomicLong nextHistoryId = new AtomicLong(1); - private final AtomicLong nextTransactionId = new AtomicLong(); private final ModuleShardBackendResolver resolver; private final SingleClientHistory singleHistory; @@ -99,11 +96,6 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple h.localAbort(cause); } histories.clear(); - - for (ClientTransaction t : transactions.values()) { - t.localAbort(cause); - } - transactions.clear(); } private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) { @@ -158,12 +150,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple @Override public ClientTransaction createTransaction() { - final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(), - nextTransactionId.getAndIncrement()); - final ClientTransaction tx = new ClientTransaction(singleHistory, txId); - LOG.debug("{}: creating a new transaction {}", persistenceId(), tx); - - return returnIfOperational(transactions, txId, tx, aborted); + return singleHistory.createTransaction(); } @Override @@ -176,10 +163,6 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple return resolver; } - void transactionComplete(final ClientTransaction transaction) { - transactions.remove(transaction.getIdentifier()); - } - void sendRequest(final TransactionRequest request, final Consumer> completer) { sendRequest(request, response -> { completer.accept(response); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java index 0884ed4a11..7193dd053f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java @@ -8,7 +8,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds; import com.google.common.util.concurrent.ListenableFuture; -import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; /** * An {@link AbstractTransactionCommitCohort} for use with empty transactions. This relies on the fact that no backends @@ -20,10 +20,8 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh * @author Robert Varga */ final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort { - static final DOMStoreThreePhaseCommitCohort INSTANCE = new EmptyTransactionCommitCohort(); - - private EmptyTransactionCommitCohort() { - // Hidden + EmptyTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) { + super(parent, txId); } @Override @@ -38,11 +36,13 @@ final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort @Override public ListenableFuture abort() { + complete(); return VOID_FUTURE; } @Override public ListenableFuture commit() { + complete(); return VOID_FUTURE; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java index b57e9b669e..6fa3cdf2a9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java @@ -7,7 +7,11 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import java.util.Optional; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An {@link AbstractClientHistory} which handles free-standing transactions. @@ -15,8 +19,23 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie * @author Robert Varga */ final class SingleClientHistory extends AbstractClientHistory { - protected SingleClientHistory(final DistributedDataStoreClientBehavior client, - final LocalHistoryIdentifier identifier) { + private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class); + + SingleClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { super(client, identifier); } + + @Override + ClientTransaction doCreateTransaction() { + final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx()); + LOG.debug("{}: creating a new transaction {}", this, txId); + + return new ClientTransaction(this, txId); + } + + @Override + AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId, + final Optional backendInfo) { + return AbstractProxyHistory.createSingle(getClient(), backendInfo, historyId); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java new file mode 100644 index 0000000000..f32fd59a74 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +final class SingleLocalProxyHistory extends AbstractLocalProxyHistory { + SingleLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier, + final DataTree dataTree) { + super(client, identifier, dataTree); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client, + final TransactionIdentifier txId) { + return new LocalProxyTransaction(client, txId, takeSnapshot()); + } +} \ No newline at end of file