BUG-5280: move transactions keeping to history 09/44909/20
authorRobert Varga <rovarga@cisco.com>
Wed, 31 Aug 2016 09:43:21 +0000 (11:43 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 26 Oct 2016 20:55:11 +0000 (16:55 -0400)
Keeping transaction map in directly in DistributedDataStoreClientBehavior
is not consistent and will create problems when replaying state during
reconnect.

This patch moves transaction tracking into AbstractClientHistory, allowing
DistributedDataStoreClientBehavior to only track open histories. It also
makes locking more consistent, as transaction instantiation is completely
encapsulated in the AbstractClientHistory from which it is created.

Change-Id: I9fc031437a9d8c33df6f9e7294dd392f58965f3d
Signed-off-by: Robert Varga <rovarga@cisco.com>
13 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java with 67% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java [new file with mode: 0644]

index b164157982691cdda8095168b7b6432859b2bb0e..ce2c164b562ade50271cf0f71835803cdb6e3591 100644 (file)
@@ -8,9 +8,13 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -31,13 +35,24 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+    private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
 
+    @GuardedBy("this")
+    private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+    @GuardedBy("this")
+    private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
+
     private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
     private final DistributedDataStoreClientBehavior client;
     private final LocalHistoryIdentifier identifier;
 
+    // Used via NEXT_TX_UPDATER
+    @SuppressWarnings("unused")
+    private volatile long nextTx = 0;
+
     private volatile State state = State.IDLE;
 
     AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
@@ -64,29 +79,91 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         return client;
     }
 
+    final long nextTx() {
+        return NEXT_TX_UPDATER.getAndIncrement(this);
+    }
+
     @Override
     final void localAbort(final Throwable cause) {
-        LOG.debug("Force-closing history {}", getIdentifier(), cause);
-        state = State.CLOSED;
+        final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
+        if (oldState != State.CLOSED) {
+            LOG.debug("Force-closing history {}", getIdentifier(), cause);
+
+            synchronized (this) {
+                for (ClientTransaction t : openTransactions.values()) {
+                    t.localAbort(cause);
+                }
+                openTransactions.clear();
+                readyTransactions.clear();
+            }
+        }
     }
 
     private AbstractProxyHistory createHistoryProxy(final Long shard) {
-        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(),
-            identifier.getHistoryId(), shard);
-        return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId);
+        return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
+            identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
     }
 
+    abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final Optional<ShardBackendInfo> backendInfo);
+
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
         final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
         return history.createTransactionProxy(transactionId);
     }
 
+    public final ClientTransaction createTransaction() {
+        Preconditions.checkState(state != State.CLOSED);
+
+        synchronized (this) {
+            final ClientTransaction ret = doCreateTransaction();
+            openTransactions.put(ret.getIdentifier(), ret);
+            return ret;
+        }
+    }
+
+    @GuardedBy("this")
+    abstract ClientTransaction doCreateTransaction();
+
+    /**
+     * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+     *
+     * @param txId Transaction identifier
+     * @param cohort Transaction commit cohort
+     */
+    synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+            final AbstractTransactionCommitCohort cohort) {
+        final ClientTransaction tx = openTransactions.remove(txId);
+        Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+
+        final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
+        Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
+                cohort, txId, previous);
+
+        return cohort;
+    }
+
+    /**
+     * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
+     * backend.
+     *
+     * @param txId transaction identifier
+     */
+    synchronized void onTransactionAbort(final TransactionIdentifier txId) {
+        if (openTransactions.remove(txId) == null) {
+            LOG.warn("Could not find aborting transaction {}", txId);
+        }
+    }
+
     /**
-     * Callback invoked from {@link ClientTransaction} when a transaction has been submitted.
+     * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
+     * and all its state can be removed.
      *
-     * @param transaction Transaction handle
+     * @param txId transaction identifier
      */
-    void onTransactionReady(final ClientTransaction transaction) {
-        client.transactionComplete(transaction);
+    synchronized void onTransactionComplete(final TransactionIdentifier txId) {
+        if (readyTransactions.remove(txId) == null) {
+            LOG.warn("Could not find completed transaction {}", txId);
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java
new file mode 100644 (file)
index 0000000..b8493ea
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+abstract class AbstractLocalProxyHistory extends AbstractProxyHistory {
+    private final DataTree dataTree;
+
+    AbstractLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+        final DataTree dataTree) {
+        super(client, identifier);
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+    }
+
+    final DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
+    }
+}
index f21be06f40f754a0e39a3a94eeb818c53a247411..d9f3b5f557c09f5786c103cd2bf4c553ed74e60f 100644 (file)
@@ -16,7 +16,7 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 
 /**
- * Per-connection representation of a local history.
+ * Per-connection representation of a local history. This class handles state replication across a single connection.
  *
  * @author Robert Varga
  */
@@ -30,11 +30,18 @@ abstract class AbstractProxyHistory implements Identifiable<LocalHistoryIdentifi
         this.identifier = Preconditions.checkNotNull(identifier);
     }
 
-    static AbstractProxyHistory create(final DistributedDataStoreClientBehavior client,
+    static AbstractProxyHistory createClient(final DistributedDataStoreClientBehavior client,
             final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new LocalProxyHistory(client, identifier, dataTree.get())
-                : new RemoteProxyHistory(client, identifier);
+        return dataTree.isPresent() ? new ClientLocalProxyHistory(client, identifier, dataTree.get())
+             : new RemoteProxyHistory(client, identifier);
+    }
+
+    static AbstractProxyHistory createSingle(final DistributedDataStoreClientBehavior client,
+            final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
+        final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
+        return dataTree.isPresent() ? new SingleLocalProxyHistory(client, identifier, dataTree.get())
+             : new RemoteProxyHistory(client, identifier);
     }
 
     @Override
index 51b17007738d5ee0ace135ef9b4d509c229870e2..bd6cb64352642dc9d892d2e92fb2e01a4c8dcc3e 100644 (file)
@@ -7,8 +7,11 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 
 /**
@@ -21,4 +24,20 @@ abstract class AbstractTransactionCommitCohort implements DOMStoreThreePhaseComm
     static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
     static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
 
+    private final AbstractClientHistory parent;
+    private final TransactionIdentifier txId;
+
+    AbstractTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) {
+        this.parent = Preconditions.checkNotNull(parent);
+        this.txId = Preconditions.checkNotNull(txId);
+    }
+
+    final void complete() {
+        parent.onTransactionComplete(txId);
+    }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("txId", txId).toString();
+    }
 }
index 807cf98cb73b3dcf5e4841eaf0f4d5b171ac025f..102d0506173a24bfc01bb6239e7fa4af98cc9698 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
@@ -27,27 +27,10 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
  */
 @Beta
 public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
-
-    private static final AtomicLongFieldUpdater<ClientLocalHistory> NEXT_TX_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(ClientLocalHistory.class, "nextTx");
-
-    // Used via NEXT_TX_UPDATER
-    @SuppressWarnings("unused")
-    private volatile long nextTx = 0;
-
     ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
         super(client, historyId);
     }
 
-    public ClientTransaction createTransaction() {
-        final State local = state();
-        Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
-        updateState(local, State.TX_OPEN);
-
-        return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(),
-            NEXT_TX_UPDATER.getAndIncrement(this)));
-    }
-
     @Override
     public void close() {
         final State local = state();
@@ -58,10 +41,28 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
     }
 
     @Override
-    void onTransactionReady(final ClientTransaction transaction) {
+    ClientTransaction doCreateTransaction() {
+        final State local = state();
+        Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
+        updateState(local, State.TX_OPEN);
+
+        return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
+    }
+
+    @Override
+    AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+            final AbstractTransactionCommitCohort cohort) {
+        // FIXME: deal with CLOSED here
         final State local = state();
         Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local);
         updateState(local, State.IDLE);
-        super.onTransactionReady(transaction);
+
+        return super.onTransactionReady(txId, cohort);
+    }
+
+    @Override
+    AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final Optional<ShardBackendInfo> backendInfo) {
+        return AbstractProxyHistory.createClient(getClient(), backendInfo, historyId);
     }
 }
@@ -7,17 +7,14 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 
-final class LocalProxyHistory extends AbstractProxyHistory {
-    private final DataTree dataTree;
-
-    LocalProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier, DataTree dataTree) {
-        super(client, identifier);
-        this.dataTree = Preconditions.checkNotNull(dataTree);
+final class ClientLocalProxyHistory extends AbstractLocalProxyHistory {
+    ClientLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+        final DataTree dataTree) {
+        super(client, identifier, dataTree);
     }
 
     @Override
@@ -25,6 +22,6 @@ final class LocalProxyHistory extends AbstractProxyHistory {
             final TransactionIdentifier txId) {
         // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure
         //        causality
-        return new LocalProxyTransaction(client, txId, dataTree.takeSnapshot());
+        return new LocalProxyTransaction(client, txId, takeSnapshot());
     }
 }
\ No newline at end of file
index 0a1c8be2471102db655b80359f63ebb5faa2cfd2..81d00ee8bce3279842696f6a912d4744423a2e47 100644 (file)
@@ -131,16 +131,22 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
         for (AbstractProxyTransaction p : proxies.values()) {
             p.seal();
         }
-        parent.onTransactionReady(this);
 
+        final AbstractTransactionCommitCohort cohort;
         switch (proxies.size()) {
             case 0:
-                return EmptyTransactionCommitCohort.INSTANCE;
+                cohort = new EmptyTransactionCommitCohort(parent, transactionId);
+                break;
             case 1:
-                return new DirectTransactionCommitCohort(Iterables.getOnlyElement(proxies.values()));
+                cohort = new DirectTransactionCommitCohort(parent, transactionId,
+                    Iterables.getOnlyElement(proxies.values()));
+                break;
             default:
-                return new ClientTransactionCommitCohort(proxies.values());
+                cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values());
+                break;
         }
+
+        return parent.onTransactionReady(transactionId, cohort);
     }
 
     /**
@@ -152,6 +158,8 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
                 proxy.abort();
             }
             proxies.clear();
+
+            parent.onTransactionAbort(transactionId);
         }
     }
 
@@ -160,4 +168,8 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
         LOG.debug("Aborting transaction {}", getIdentifier(), cause);
         abort();
     }
+
+    Map<Long, AbstractProxyTransaction> getProxies() {
+        return proxies;
+    }
 }
index a7de89aac3e7da504f4010a01eedcda598981597..a4eb5e074f421ffc3e8bb718f02e747540f3839a 100644 (file)
@@ -9,13 +9,16 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
-import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohort {
-    private final List<AbstractProxyTransaction> proxies;
+    private final Collection<AbstractProxyTransaction> proxies;
 
-    ClientTransactionCommitCohort(final Collection<AbstractProxyTransaction> proxies) {
+    ClientTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId,
+            final Collection<AbstractProxyTransaction> proxies) {
+        super(parent, txId);
         this.proxies = ImmutableList.copyOf(proxies);
     }
 
@@ -32,6 +35,11 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor
         return ret;
     }
 
+    private ListenableFuture<Void> addComplete(final ListenableFuture<Void> future) {
+        future.addListener(this::complete, MoreExecutors.directExecutor());
+        return future;
+    }
+
     @Override
     public ListenableFuture<Void> preCommit() {
         final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
@@ -49,7 +57,7 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor
             proxy.doCommit(ret);
         }
 
-        return ret;
+        return addComplete(ret);
     }
 
     @Override
@@ -59,6 +67,6 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor
             proxy.abort(ret);
         }
 
-        return ret;
+        return addComplete(ret);
     }
 }
index 49b281aa3a511bef3b56811ea2f73da2e9a0a443..1d5d4a70dedac1c10122ad991ac3c3ae3f9f1507 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
  * An {@link AbstractTransactionCommitCohort} implementation for transactions which contain a single proxy. Since there
@@ -19,7 +20,9 @@ import com.google.common.util.concurrent.ListenableFuture;
 final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohort {
     private final AbstractProxyTransaction proxy;
 
-    DirectTransactionCommitCohort(final AbstractProxyTransaction proxy) {
+    DirectTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId,
+        final AbstractProxyTransaction proxy) {
+        super(parent, txId);
         this.proxy = Preconditions.checkNotNull(proxy);
     }
 
@@ -35,11 +38,13 @@ final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohor
 
     @Override
     public ListenableFuture<Void> abort() {
+        complete();
         return VOID_FUTURE;
     }
 
     @Override
     public ListenableFuture<Void> commit() {
+        complete();
         return VOID_FUTURE;
     }
 }
index eb1dd17bfd9438fc6adff5351db71527d90b3453..9940ae57f32858f042084440341d56a660dcb89b 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,10 +58,8 @@ import org.slf4j.LoggerFactory;
 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
 
-    private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
     private final AtomicLong nextHistoryId = new AtomicLong(1);
-    private final AtomicLong nextTransactionId = new AtomicLong();
     private final ModuleShardBackendResolver resolver;
     private final SingleClientHistory singleHistory;
 
@@ -99,11 +96,6 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
             h.localAbort(cause);
         }
         histories.clear();
-
-        for (ClientTransaction t : transactions.values()) {
-            t.localAbort(cause);
-        }
-        transactions.clear();
     }
 
     private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
@@ -158,12 +150,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
 
     @Override
     public ClientTransaction createTransaction() {
-        final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
-            nextTransactionId.getAndIncrement());
-        final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
-        LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
-
-        return returnIfOperational(transactions, txId, tx, aborted);
+        return singleHistory.createTransaction();
     }
 
     @Override
@@ -176,10 +163,6 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
         return resolver;
     }
 
-    void transactionComplete(final ClientTransaction transaction) {
-        transactions.remove(transaction.getIdentifier());
-    }
-
     void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
         sendRequest(request, response -> {
             completer.accept(response);
index 0884ed4a11c7c15476031750a43a933a43d03a4f..7193dd053f762cb37c4cf701afe1efd89fb33919 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
  * An {@link AbstractTransactionCommitCohort} for use with empty transactions. This relies on the fact that no backends
@@ -20,10 +20,8 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
  * @author Robert Varga
  */
 final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort {
-    static final DOMStoreThreePhaseCommitCohort INSTANCE = new EmptyTransactionCommitCohort();
-
-    private EmptyTransactionCommitCohort() {
-        // Hidden
+    EmptyTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) {
+        super(parent, txId);
     }
 
     @Override
@@ -38,11 +36,13 @@ final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort
 
     @Override
     public ListenableFuture<Void> abort() {
+        complete();
         return VOID_FUTURE;
     }
 
     @Override
     public ListenableFuture<Void> commit() {
+        complete();
         return VOID_FUTURE;
     }
 }
index b57e9b669ea6770622dd2f5709b2ad4b5cf44782..6fa3cdf2a9088c4c19288c6a0f075b7e3f18418e 100644 (file)
@@ -7,7 +7,11 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link AbstractClientHistory} which handles free-standing transactions.
@@ -15,8 +19,23 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
  * @author Robert Varga
  */
 final class SingleClientHistory extends AbstractClientHistory {
-    protected SingleClientHistory(final DistributedDataStoreClientBehavior client,
-            final LocalHistoryIdentifier identifier) {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+
+    SingleClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
         super(client, identifier);
     }
+
+    @Override
+    ClientTransaction doCreateTransaction() {
+        final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx());
+        LOG.debug("{}: creating a new transaction {}", this, txId);
+
+        return new ClientTransaction(this, txId);
+    }
+
+    @Override
+    AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final Optional<ShardBackendInfo> backendInfo) {
+        return AbstractProxyHistory.createSingle(getClient(), backendInfo, historyId);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java
new file mode 100644 (file)
index 0000000..f32fd59
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+final class SingleLocalProxyHistory extends AbstractLocalProxyHistory {
+    SingleLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+        final DataTree dataTree) {
+        super(client, identifier, dataTree);
+    }
+
+    @Override
+    AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
+            final TransactionIdentifier txId) {
+        return new LocalProxyTransaction(client, txId, takeSnapshot());
+    }
+}
\ No newline at end of file