BUG-5280: move transactions keeping to history 09/44909/20
authorRobert Varga <rovarga@cisco.com>
Wed, 31 Aug 2016 09:43:21 +0000 (11:43 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 26 Oct 2016 20:55:11 +0000 (16:55 -0400)
Keeping transaction map in directly in DistributedDataStoreClientBehavior
is not consistent and will create problems when replaying state during
reconnect.

This patch moves transaction tracking into AbstractClientHistory, allowing
DistributedDataStoreClientBehavior to only track open histories. It also
makes locking more consistent, as transaction instantiation is completely
encapsulated in the AbstractClientHistory from which it is created.

Change-Id: I9fc031437a9d8c33df6f9e7294dd392f58965f3d
Signed-off-by: Robert Varga <rovarga@cisco.com>
13 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalProxyHistory.java [moved from opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java with 67% similarity]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DirectTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/EmptyTransactionCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java [new file with mode: 0644]

index b164157..ce2c164 100644 (file)
@@ -8,9 +8,13 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.concurrent.GuardedBy;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.concepts.Identifiable;
@@ -31,13 +35,24 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+    private static final AtomicLongFieldUpdater<AbstractClientHistory> NEXT_TX_UPDATER =
+            AtomicLongFieldUpdater.newUpdater(AbstractClientHistory.class, "nextTx");
     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
 
+    @GuardedBy("this")
+    private final Map<TransactionIdentifier, ClientTransaction> openTransactions = new HashMap<>();
+    @GuardedBy("this")
+    private final Map<TransactionIdentifier, AbstractTransactionCommitCohort> readyTransactions = new HashMap<>();
+
     private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
     private final DistributedDataStoreClientBehavior client;
     private final LocalHistoryIdentifier identifier;
 
+    // Used via NEXT_TX_UPDATER
+    @SuppressWarnings("unused")
+    private volatile long nextTx = 0;
+
     private volatile State state = State.IDLE;
 
     AbstractClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
@@ -64,29 +79,91 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         return client;
     }
 
+    final long nextTx() {
+        return NEXT_TX_UPDATER.getAndIncrement(this);
+    }
+
     @Override
     final void localAbort(final Throwable cause) {
-        LOG.debug("Force-closing history {}", getIdentifier(), cause);
-        state = State.CLOSED;
+        final State oldState = STATE_UPDATER.getAndSet(this, State.CLOSED);
+        if (oldState != State.CLOSED) {
+            LOG.debug("Force-closing history {}", getIdentifier(), cause);
+
+            synchronized (this) {
+                for (ClientTransaction t : openTransactions.values()) {
+                    t.localAbort(cause);
+                }
+                openTransactions.clear();
+                readyTransactions.clear();
+            }
+        }
     }
 
     private AbstractProxyHistory createHistoryProxy(final Long shard) {
-        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(),
-            identifier.getHistoryId(), shard);
-        return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId);
+        return createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
+            identifier.getHistoryId(), shard), client.resolver().getFutureBackendInfo(shard));
     }
 
+    abstract AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final Optional<ShardBackendInfo> backendInfo);
+
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
         final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
         return history.createTransactionProxy(transactionId);
     }
 
+    public final ClientTransaction createTransaction() {
+        Preconditions.checkState(state != State.CLOSED);
+
+        synchronized (this) {
+            final ClientTransaction ret = doCreateTransaction();
+            openTransactions.put(ret.getIdentifier(), ret);
+            return ret;
+        }
+    }
+
+    @GuardedBy("this")
+    abstract ClientTransaction doCreateTransaction();
+
+    /**
+     * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
+     *
+     * @param txId Transaction identifier
+     * @param cohort Transaction commit cohort
+     */
+    synchronized AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+            final AbstractTransactionCommitCohort cohort) {
+        final ClientTransaction tx = openTransactions.remove(txId);
+        Preconditions.checkState(tx != null, "Failed to find open transaction for %s", txId);
+
+        final AbstractTransactionCommitCohort previous = readyTransactions.putIfAbsent(txId, cohort);
+        Preconditions.checkState(previous == null, "Duplicate cohort %s for transaction %s, already have %s",
+                cohort, txId, previous);
+
+        return cohort;
+    }
+
+    /**
+     * Callback invoked from {@link ClientTransaction} when a child transaction has been aborted without touching
+     * backend.
+     *
+     * @param txId transaction identifier
+     */
+    synchronized void onTransactionAbort(final TransactionIdentifier txId) {
+        if (openTransactions.remove(txId) == null) {
+            LOG.warn("Could not find aborting transaction {}", txId);
+        }
+    }
+
     /**
-     * Callback invoked from {@link ClientTransaction} when a transaction has been submitted.
+     * Callback invoked from {@link AbstractTransactionCommitCohort} when a child transaction has been completed
+     * and all its state can be removed.
      *
-     * @param transaction Transaction handle
+     * @param txId transaction identifier
      */
-    void onTransactionReady(final ClientTransaction transaction) {
-        client.transactionComplete(transaction);
+    synchronized void onTransactionComplete(final TransactionIdentifier txId) {
+        if (readyTransactions.remove(txId) == null) {
+            LOG.warn("Could not find completed transaction {}", txId);
+        }
     }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractLocalProxyHistory.java
new file mode 100644 (file)
index 0000000..b8493ea
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
+
+abstract class AbstractLocalProxyHistory extends AbstractProxyHistory {
+    private final DataTree dataTree;
+
+    AbstractLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+        final DataTree dataTree) {
+        super(client, identifier);
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+    }
+
+    final DataTreeSnapshot takeSnapshot() {
+        return dataTree.takeSnapshot();
+    }
+}
index f21be06..d9f3b5f 100644 (file)
@@ -16,7 +16,7 @@ import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 
 /**
- * Per-connection representation of a local history.
+ * Per-connection representation of a local history. This class handles state replication across a single connection.
  *
  * @author Robert Varga
  */
@@ -30,11 +30,18 @@ abstract class AbstractProxyHistory implements Identifiable<LocalHistoryIdentifi
         this.identifier = Preconditions.checkNotNull(identifier);
     }
 
-    static AbstractProxyHistory create(final DistributedDataStoreClientBehavior client,
+    static AbstractProxyHistory createClient(final DistributedDataStoreClientBehavior client,
             final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
         final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
-        return dataTree.isPresent() ? new LocalProxyHistory(client, identifier, dataTree.get())
-                : new RemoteProxyHistory(client, identifier);
+        return dataTree.isPresent() ? new ClientLocalProxyHistory(client, identifier, dataTree.get())
+             : new RemoteProxyHistory(client, identifier);
+    }
+
+    static AbstractProxyHistory createSingle(final DistributedDataStoreClientBehavior client,
+            final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
+        final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
+        return dataTree.isPresent() ? new SingleLocalProxyHistory(client, identifier, dataTree.get())
+             : new RemoteProxyHistory(client, identifier);
     }
 
     @Override
index 51b1700..bd6cb64 100644 (file)
@@ -7,8 +7,11 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 
 /**
@@ -21,4 +24,20 @@ abstract class AbstractTransactionCommitCohort implements DOMStoreThreePhaseComm
     static final ListenableFuture<Boolean> TRUE_FUTURE = Futures.immediateFuture(Boolean.TRUE);
     static final ListenableFuture<Void> VOID_FUTURE = Futures.immediateFuture(null);
 
+    private final AbstractClientHistory parent;
+    private final TransactionIdentifier txId;
+
+    AbstractTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) {
+        this.parent = Preconditions.checkNotNull(parent);
+        this.txId = Preconditions.checkNotNull(txId);
+    }
+
+    final void complete() {
+        parent.onTransactionComplete(txId);
+    }
+
+    @Override
+    public final String toString() {
+        return MoreObjects.toStringHelper(this).add("txId", txId).toString();
+    }
 }
index 807cf98..102d050 100644 (file)
@@ -10,7 +10,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 import com.google.common.annotations.Beta;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
@@ -27,27 +27,10 @@ import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier
  */
 @Beta
 public final class ClientLocalHistory extends AbstractClientHistory implements AutoCloseable {
-
-    private static final AtomicLongFieldUpdater<ClientLocalHistory> NEXT_TX_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(ClientLocalHistory.class, "nextTx");
-
-    // Used via NEXT_TX_UPDATER
-    @SuppressWarnings("unused")
-    private volatile long nextTx = 0;
-
     ClientLocalHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier historyId) {
         super(client, historyId);
     }
 
-    public ClientTransaction createTransaction() {
-        final State local = state();
-        Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
-        updateState(local, State.TX_OPEN);
-
-        return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(),
-            NEXT_TX_UPDATER.getAndIncrement(this)));
-    }
-
     @Override
     public void close() {
         final State local = state();
@@ -58,10 +41,28 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
     }
 
     @Override
-    void onTransactionReady(final ClientTransaction transaction) {
+    ClientTransaction doCreateTransaction() {
+        final State local = state();
+        Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
+        updateState(local, State.TX_OPEN);
+
+        return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
+    }
+
+    @Override
+    AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
+            final AbstractTransactionCommitCohort cohort) {
+        // FIXME: deal with CLOSED here
         final State local = state();
         Verify.verify(local == State.TX_OPEN, "Local history %s is in unexpected state %s", this, local);
         updateState(local, State.IDLE);
-        super.onTransactionReady(transaction);
+
+        return super.onTransactionReady(txId, cohort);
+    }
+
+    @Override
+    AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final Optional<ShardBackendInfo> backendInfo) {
+        return AbstractProxyHistory.createClient(getClient(), backendInfo, historyId);
     }
 }
@@ -7,17 +7,14 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
-import com.google.common.base.Preconditions;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 
-final class LocalProxyHistory extends AbstractProxyHistory {
-    private final DataTree dataTree;
-
-    LocalProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier, DataTree dataTree) {
-        super(client, identifier);
-        this.dataTree = Preconditions.checkNotNull(dataTree);
+final class ClientLocalProxyHistory extends AbstractLocalProxyHistory {
+    ClientLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+        final DataTree dataTree) {
+        super(client, identifier, dataTree);
     }
 
     @Override
@@ -25,6 +22,6 @@ final class LocalProxyHistory extends AbstractProxyHistory {
             final TransactionIdentifier txId) {
         // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure
         //        causality
-        return new LocalProxyTransaction(client, txId, dataTree.takeSnapshot());
+        return new LocalProxyTransaction(client, txId, takeSnapshot());
     }
 }
\ No newline at end of file
index 0a1c8be..81d00ee 100644 (file)
@@ -131,16 +131,22 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
         for (AbstractProxyTransaction p : proxies.values()) {
             p.seal();
         }
-        parent.onTransactionReady(this);
 
+        final AbstractTransactionCommitCohort cohort;
         switch (proxies.size()) {
             case 0:
-                return EmptyTransactionCommitCohort.INSTANCE;
+                cohort = new EmptyTransactionCommitCohort(parent, transactionId);
+                break;
             case 1:
-                return new DirectTransactionCommitCohort(Iterables.getOnlyElement(proxies.values()));
+                cohort = new DirectTransactionCommitCohort(parent, transactionId,
+                    Iterables.getOnlyElement(proxies.values()));
+                break;
             default:
-                return new ClientTransactionCommitCohort(proxies.values());
+                cohort = new ClientTransactionCommitCohort(parent, transactionId, proxies.values());
+                break;
         }
+
+        return parent.onTransactionReady(transactionId, cohort);
     }
 
     /**
@@ -152,6 +158,8 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
                 proxy.abort();
             }
             proxies.clear();
+
+            parent.onTransactionAbort(transactionId);
         }
     }
 
@@ -160,4 +168,8 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
         LOG.debug("Aborting transaction {}", getIdentifier(), cause);
         abort();
     }
+
+    Map<Long, AbstractProxyTransaction> getProxies() {
+        return proxies;
+    }
 }
index a7de89a..a4eb5e0 100644 (file)
@@ -9,13 +9,16 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collection;
-import java.util.List;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohort {
-    private final List<AbstractProxyTransaction> proxies;
+    private final Collection<AbstractProxyTransaction> proxies;
 
-    ClientTransactionCommitCohort(final Collection<AbstractProxyTransaction> proxies) {
+    ClientTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId,
+            final Collection<AbstractProxyTransaction> proxies) {
+        super(parent, txId);
         this.proxies = ImmutableList.copyOf(proxies);
     }
 
@@ -32,6 +35,11 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor
         return ret;
     }
 
+    private ListenableFuture<Void> addComplete(final ListenableFuture<Void> future) {
+        future.addListener(this::complete, MoreExecutors.directExecutor());
+        return future;
+    }
+
     @Override
     public ListenableFuture<Void> preCommit() {
         final VotingFuture<Void> ret = new VotingFuture<>(null, proxies.size());
@@ -49,7 +57,7 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor
             proxy.doCommit(ret);
         }
 
-        return ret;
+        return addComplete(ret);
     }
 
     @Override
@@ -59,6 +67,6 @@ final class ClientTransactionCommitCohort extends AbstractTransactionCommitCohor
             proxy.abort(ret);
         }
 
-        return ret;
+        return addComplete(ret);
     }
 }
index 49b281a..1d5d4a7 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
  * An {@link AbstractTransactionCommitCohort} implementation for transactions which contain a single proxy. Since there
@@ -19,7 +20,9 @@ import com.google.common.util.concurrent.ListenableFuture;
 final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohort {
     private final AbstractProxyTransaction proxy;
 
-    DirectTransactionCommitCohort(final AbstractProxyTransaction proxy) {
+    DirectTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId,
+        final AbstractProxyTransaction proxy) {
+        super(parent, txId);
         this.proxy = Preconditions.checkNotNull(proxy);
     }
 
@@ -35,11 +38,13 @@ final class DirectTransactionCommitCohort extends AbstractTransactionCommitCohor
 
     @Override
     public ListenableFuture<Void> abort() {
+        complete();
         return VOID_FUTURE;
     }
 
     @Override
     public ListenableFuture<Void> commit() {
+        complete();
         return VOID_FUTURE;
     }
 }
index eb1dd17..9940ae5 100644 (file)
@@ -20,7 +20,6 @@ import org.opendaylight.controller.cluster.access.client.ClientActorContext;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Response;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -59,10 +58,8 @@ import org.slf4j.LoggerFactory;
 final class DistributedDataStoreClientBehavior extends ClientActorBehavior implements DistributedDataStoreClient {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreClientBehavior.class);
 
-    private final Map<TransactionIdentifier, ClientTransaction> transactions = new ConcurrentHashMap<>();
     private final Map<LocalHistoryIdentifier, ClientLocalHistory> histories = new ConcurrentHashMap<>();
     private final AtomicLong nextHistoryId = new AtomicLong(1);
-    private final AtomicLong nextTransactionId = new AtomicLong();
     private final ModuleShardBackendResolver resolver;
     private final SingleClientHistory singleHistory;
 
@@ -99,11 +96,6 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
             h.localAbort(cause);
         }
         histories.clear();
-
-        for (ClientTransaction t : transactions.values()) {
-            t.localAbort(cause);
-        }
-        transactions.clear();
     }
 
     private DistributedDataStoreClientBehavior shutdown(final ClientActorBehavior currentBehavior) {
@@ -158,12 +150,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
 
     @Override
     public ClientTransaction createTransaction() {
-        final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
-            nextTransactionId.getAndIncrement());
-        final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
-        LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
-
-        return returnIfOperational(transactions, txId, tx, aborted);
+        return singleHistory.createTransaction();
     }
 
     @Override
@@ -176,10 +163,6 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
         return resolver;
     }
 
-    void transactionComplete(final ClientTransaction transaction) {
-        transactions.remove(transaction.getIdentifier());
-    }
-
     void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
         sendRequest(request, response -> {
             completer.accept(response);
index 0884ed4..7193dd0 100644 (file)
@@ -8,7 +8,7 @@
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
 import com.google.common.util.concurrent.ListenableFuture;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 
 /**
  * An {@link AbstractTransactionCommitCohort} for use with empty transactions. This relies on the fact that no backends
@@ -20,10 +20,8 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCoh
  * @author Robert Varga
  */
 final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort {
-    static final DOMStoreThreePhaseCommitCohort INSTANCE = new EmptyTransactionCommitCohort();
-
-    private EmptyTransactionCommitCohort() {
-        // Hidden
+    EmptyTransactionCommitCohort(final AbstractClientHistory parent, final TransactionIdentifier txId) {
+        super(parent, txId);
     }
 
     @Override
@@ -38,11 +36,13 @@ final class EmptyTransactionCommitCohort extends AbstractTransactionCommitCohort
 
     @Override
     public ListenableFuture<Void> abort() {
+        complete();
         return VOID_FUTURE;
     }
 
     @Override
     public ListenableFuture<Void> commit() {
+        complete();
         return VOID_FUTURE;
     }
 }
index b57e9b6..6fa3cdf 100644 (file)
@@ -7,7 +7,11 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import java.util.Optional;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An {@link AbstractClientHistory} which handles free-standing transactions.
@@ -15,8 +19,23 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
  * @author Robert Varga
  */
 final class SingleClientHistory extends AbstractClientHistory {
-    protected SingleClientHistory(final DistributedDataStoreClientBehavior client,
-            final LocalHistoryIdentifier identifier) {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractClientHistory.class);
+
+    SingleClientHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
         super(client, identifier);
     }
+
+    @Override
+    ClientTransaction doCreateTransaction() {
+        final TransactionIdentifier txId = new TransactionIdentifier(getIdentifier(), nextTx());
+        LOG.debug("{}: creating a new transaction {}", this, txId);
+
+        return new ClientTransaction(this, txId);
+    }
+
+    @Override
+    AbstractProxyHistory createHistoryProxy(final LocalHistoryIdentifier historyId,
+            final Optional<ShardBackendInfo> backendInfo) {
+        return AbstractProxyHistory.createSingle(getClient(), backendInfo, historyId);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/SingleLocalProxyHistory.java
new file mode 100644 (file)
index 0000000..f32fd59
--- /dev/null
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+final class SingleLocalProxyHistory extends AbstractLocalProxyHistory {
+    SingleLocalProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier,
+        final DataTree dataTree) {
+        super(client, identifier, dataTree);
+    }
+
+    @Override
+    AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
+            final TransactionIdentifier txId) {
+        return new LocalProxyTransaction(client, txId, takeSnapshot());
+    }
+}
\ No newline at end of file