Merge (Abstract)TransactionContext 86/97586/8
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 21 Sep 2021 19:01:27 +0000 (21:01 +0200)
committerRobert Varga <nite@hq.sk>
Thu, 21 Oct 2021 07:46:10 +0000 (07:46 +0000)
We have an interface and an abstract base class. Merge the two into
an abstract class, reducing visibility of various methods. Also derive
from AbstractSimpleIdentifiable, to make it more explicit we require
a transaction identifier.

This allows all callers to bind to the same vtable, improving method
dispatch.

Change-Id: I51419c4ac832aa676c8707d9bd459936fd906760
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java
deleted file mode 100644 (file)
index bc8efa6..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-abstract class AbstractTransactionContext implements TransactionContext {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class);
-    private final TransactionIdentifier transactionIdentifier;
-    private long modificationCount = 0;
-    private boolean handOffComplete;
-    private final short transactionVersion;
-
-    protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) {
-        this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION);
-    }
-
-    protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier, short transactionVersion) {
-        // FIXME: requireNonNull()?
-        this.transactionIdentifier = transactionIdentifier;
-        this.transactionVersion = transactionVersion;
-    }
-
-    /**
-     * Get the transaction identifier associated with this context.
-     *
-     * @return Transaction identifier.
-     */
-    // FIXME: does this imply Identifiable?
-    protected final @NonNull TransactionIdentifier getIdentifier() {
-        return transactionIdentifier;
-    }
-
-    protected final void incrementModificationCount() {
-        modificationCount++;
-    }
-
-    protected final void logModificationCount() {
-        LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
-    }
-
-    @Override
-    public final void operationHandOffComplete() {
-        handOffComplete = true;
-    }
-
-    protected boolean isOperationHandOffComplete() {
-        return handOffComplete;
-    }
-
-    @Override
-    public boolean usesOperationLimiting() {
-        return false;
-    }
-
-    @Override
-    public short getTransactionVersion() {
-        return transactionVersion;
-    }
-}
index 211ebec63c3074e9199d9c034290928495451b91..29b52f75579b57e2fbad4563585de9b10828d154 100644 (file)
@@ -155,7 +155,7 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                         contextWrapper);
             }
         } else {
                         contextWrapper);
             }
         } else {
-            findPrimaryFuture.onComplete((result) -> {
+            findPrimaryFuture.onComplete(result -> {
                 if (result.isSuccess()) {
                     onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
                 } else {
                 if (result.isSuccess()) {
                     onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper);
                 } else {
@@ -240,12 +240,12 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
                     @Override
                 final DOMStoreReadTransaction readOnly = factory.newReadOnlyTransaction(parent.getIdentifier());
                 return new LocalTransactionContext(readOnly, parent.getIdentifier(), factory) {
                     @Override
-                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                    DOMStoreWriteTransaction getWriteDelegate() {
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
                         throw new UnsupportedOperationException();
                     }
 
                     @Override
-                    protected DOMStoreReadTransaction getReadDelegate() {
+                    DOMStoreReadTransaction getReadDelegate() {
                         return readOnly;
                     }
                 };
                         return readOnly;
                     }
                 };
@@ -253,12 +253,12 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
                     @Override
                 final DOMStoreReadWriteTransaction readWrite = factory.newReadWriteTransaction(parent.getIdentifier());
                 return new LocalTransactionContext(readWrite, parent.getIdentifier(), factory) {
                     @Override
-                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                    DOMStoreWriteTransaction getWriteDelegate() {
                         return readWrite;
                     }
 
                     @Override
                         return readWrite;
                     }
 
                     @Override
-                    protected DOMStoreReadTransaction getReadDelegate() {
+                    DOMStoreReadTransaction getReadDelegate() {
                         return readWrite;
                     }
                 };
                         return readWrite;
                     }
                 };
@@ -266,12 +266,12 @@ abstract class AbstractTransactionContextFactory<F extends LocalTransactionFacto
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
                     @Override
                 final DOMStoreWriteTransaction writeOnly = factory.newWriteOnlyTransaction(parent.getIdentifier());
                 return new LocalTransactionContext(writeOnly, parent.getIdentifier(), factory) {
                     @Override
-                    protected DOMStoreWriteTransaction getWriteDelegate() {
+                    DOMStoreWriteTransaction getWriteDelegate() {
                         return writeOnly;
                     }
 
                     @Override
                         return writeOnly;
                     }
 
                     @Override
-                    protected DOMStoreReadTransaction getReadDelegate() {
+                    DOMStoreReadTransaction getReadDelegate() {
                         throw new UnsupportedOperationException();
                     }
                 };
                         throw new UnsupportedOperationException();
                     }
                 };
index 5435bb548d5c58e91e2f20d07dc7895f97755df8..6b3006941817e05e4a7de6789e67871dda506846 100644 (file)
@@ -33,7 +33,7 @@ import scala.concurrent.Future;
  *
  * @author Thomas Pantelis
  */
  *
  * @author Thomas Pantelis
  */
-abstract class LocalTransactionContext extends AbstractTransactionContext {
+abstract class LocalTransactionContext extends TransactionContext {
     private final DOMStoreTransaction txDelegate;
     private final LocalTransactionReadySupport readySupport;
     private Exception operationError;
     private final DOMStoreTransaction txDelegate;
     private final LocalTransactionReadySupport readySupport;
     private Exception operationError;
@@ -45,9 +45,9 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
         this.readySupport = readySupport;
     }
 
         this.readySupport = readySupport;
     }
 
-    protected abstract DOMStoreWriteTransaction getWriteDelegate();
+    abstract DOMStoreWriteTransaction getWriteDelegate();
 
 
-    protected abstract DOMStoreReadTransaction getReadDelegate();
+    abstract DOMStoreReadTransaction getReadDelegate();
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void executeModification(final Consumer<DOMStoreWriteTransaction> consumer) {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void executeModification(final Consumer<DOMStoreWriteTransaction> consumer) {
@@ -62,22 +62,22 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
+    void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
         executeModification(transaction -> transaction.delete(path));
     }
 
     @Override
         executeModification(transaction -> transaction.delete(path));
     }
 
     @Override
-    public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+    void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
         executeModification(transaction -> transaction.merge(path, data));
     }
 
     @Override
         executeModification(transaction -> transaction.merge(path, data));
     }
 
     @Override
-    public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+    void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
         executeModification(transaction -> transaction.write(path, data));
     }
 
     @Override
         executeModification(transaction -> transaction.write(path, data));
     }
 
     @Override
-    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
+    <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
             final Boolean havePermit) {
         Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
             @Override
             final Boolean havePermit) {
         Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback<T>() {
             @Override
@@ -93,26 +93,26 @@ abstract class LocalTransactionContext extends AbstractTransactionContext {
         }, MoreExecutors.directExecutor());
     }
 
         }, MoreExecutors.directExecutor());
     }
 
-    private LocalThreePhaseCommitCohort ready() {
-        logModificationCount();
-        return readySupport.onTransactionReady(getWriteDelegate(), operationError);
-    }
-
     @Override
     @Override
-    public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+    Future<ActorSelection> readyTransaction(final Boolean havePermit,
             final Optional<SortedSet<String>> participatingShardNames) {
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateCoordinatedCommit(participatingShardNames);
     }
 
     @Override
             final Optional<SortedSet<String>> participatingShardNames) {
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateCoordinatedCommit(participatingShardNames);
     }
 
     @Override
-    public Future<Object> directCommit(final Boolean havePermit) {
+    Future<Object> directCommit(final Boolean havePermit) {
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateDirectCommit();
     }
 
     @Override
         final LocalThreePhaseCommitCohort cohort = ready();
         return cohort.initiateDirectCommit();
     }
 
     @Override
-    public void closeTransaction() {
+    void closeTransaction() {
         txDelegate.close();
     }
         txDelegate.close();
     }
+
+    private LocalThreePhaseCommitCohort ready() {
+        logModificationCount();
+        return readySupport.onTransactionReady(getWriteDelegate(), operationError);
+    }
 }
 }
index d98a1b4046a33f456213c8528b88aa5387b70e85..bfb0046ba027b809be6d53dd507d4d90eb491365 100644 (file)
@@ -22,7 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
 import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
-final class NoOpTransactionContext extends AbstractTransactionContext {
+final class NoOpTransactionContext extends TransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
     private final Throwable failure;
     private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
     private final Throwable failure;
@@ -33,26 +33,25 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public void closeTransaction() {
+    void closeTransaction() {
         LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
     }
 
     @Override
         LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier());
     }
 
     @Override
-    public Future<Object> directCommit(final Boolean havePermit) {
+    Future<Object> directCommit(final Boolean havePermit) {
         LOG.debug("Tx {} directCommit called, failure", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
         LOG.debug("Tx {} directCommit called, failure", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+    Future<ActorSelection> readyTransaction(final Boolean havePermit,
             final Optional<SortedSet<String>> participatingShardNamess) {
         LOG.debug("Tx {} readyTransaction called, failure", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
             final Optional<SortedSet<String>> participatingShardNamess) {
         LOG.debug("Tx {} readyTransaction called, failure", getIdentifier(), failure);
         return akka.dispatch.Futures.failed(failure);
     }
 
     @Override
-    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture,
-            final Boolean havePermit) {
+    <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> proxyFuture, final Boolean havePermit) {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
 
@@ -67,17 +66,17 @@ final class NoOpTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
+    void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
         LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
     }
 
     @Override
         LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
     }
 
     @Override
-    public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+    void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
         LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
     }
 
     @Override
         LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
     }
 
     @Override
-    public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+    void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
         LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
     }
 }
         LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
     }
 }
index a7e76ed436af511f74427372868cdedf82d9464e..ade9c375e5cfe93735c712fb8f1650125d9948d9 100644 (file)
@@ -14,6 +14,7 @@ import static java.util.Objects.requireNonNull;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
 import akka.actor.ActorSelection;
 import akka.dispatch.Futures;
 import akka.dispatch.OnComplete;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Optional;
 import java.util.SortedSet;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Optional;
 import java.util.SortedSet;
@@ -40,7 +41,7 @@ import scala.concurrent.Future;
  *
  * @author Thomas Pantelis
  */
  *
  * @author Thomas Pantelis
  */
-public class RemoteTransactionContext extends AbstractTransactionContext {
+final class RemoteTransactionContext extends TransactionContext {
     private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
 
     private final ActorUtils actorUtils;
     private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class);
 
     private final ActorUtils actorUtils;
@@ -59,7 +60,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
      */
     private volatile Throwable failedModification;
 
      */
     private volatile Throwable failedModification;
 
-    protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
+    RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor,
             final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) {
         super(identifier, remoteTransactionVersion);
         this.limiter = requireNonNull(limiter);
             final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) {
         super(identifier, remoteTransactionVersion);
         this.limiter = requireNonNull(limiter);
@@ -76,7 +77,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public void closeTransaction() {
+    void closeTransaction() {
         LOG.debug("Tx {} closeTransaction called", getIdentifier());
         TransactionContextCleanup.untrack(this);
 
         LOG.debug("Tx {} closeTransaction called", getIdentifier());
         TransactionContextCleanup.untrack(this);
 
@@ -84,7 +85,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public Future<Object> directCommit(final Boolean havePermit) {
+    Future<Object> directCommit(final Boolean havePermit) {
         LOG.debug("Tx {} directCommit called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
         LOG.debug("Tx {} directCommit called", getIdentifier());
 
         // Send the remaining batched modifications, if any, with the ready flag set.
@@ -93,7 +94,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public Future<ActorSelection> readyTransaction(final Boolean havePermit,
+    Future<ActorSelection> readyTransaction(final Boolean havePermit,
             final Optional<SortedSet<String>> participatingShardNames) {
         logModificationCount();
 
             final Optional<SortedSet<String>> participatingShardNames) {
         logModificationCount();
 
@@ -104,7 +105,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         bumpPermits(havePermit);
         Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
 
         bumpPermits(havePermit);
         Future<Object> lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames);
 
-        return transformReadyReply(lastModificationsFuture);
+        // Transform the last reply Future into a Future that returns the cohort actor path from
+        // the last reply message. That's the end result of the ready operation.
+        return TransactionReadyReplyMapper.transform(lastModificationsFuture, actorUtils, getIdentifier());
     }
 
     private void bumpPermits(final Boolean havePermit) {
     }
 
     private void bumpPermits(final Boolean havePermit) {
@@ -113,13 +116,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         }
     }
 
         }
     }
 
-    protected Future<ActorSelection> transformReadyReply(final Future<Object> readyReplyFuture) {
-        // Transform the last reply Future into a Future that returns the cohort actor path from
-        // the last reply message. That's the end result of the ready operation.
-
-        return TransactionReadyReplyMapper.transform(readyReplyFuture, actorUtils, getIdentifier());
-    }
-
     private BatchedModifications newBatchedModifications() {
         return new BatchedModifications(getIdentifier(), getTransactionVersion());
     }
     private BatchedModifications newBatchedModifications() {
         return new BatchedModifications(getIdentifier(), getTransactionVersion());
     }
@@ -142,11 +138,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
         }
     }
 
         }
     }
 
-    protected Future<Object> sendBatchedModifications() {
+    @VisibleForTesting
+    Future<Object> sendBatchedModifications() {
         return sendBatchedModifications(false, false, Optional.empty());
     }
 
         return sendBatchedModifications(false, false, Optional.empty());
     }
 
-    protected Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
+    private Future<Object> sendBatchedModifications(final boolean ready, final boolean doCommitOnReady,
             final Optional<SortedSet<String>> participatingShardNames) {
         Future<Object> sent = null;
         if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
             final Optional<SortedSet<String>> participatingShardNames) {
         Future<Object> sent = null;
         if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) {
@@ -202,19 +199,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
+    void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) {
         LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
         executeModification(new DeleteModification(path), havePermit);
     }
 
     @Override
         LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path);
         executeModification(new DeleteModification(path), havePermit);
     }
 
     @Override
-    public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+    void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
         LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
         executeModification(new MergeModification(path, data), havePermit);
     }
 
     @Override
         LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path);
         executeModification(new MergeModification(path, data), havePermit);
     }
 
     @Override
-    public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
+    void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) {
         LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
         executeModification(new WriteModification(path, data), havePermit);
     }
         LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path);
         executeModification(new WriteModification(path, data), havePermit);
     }
@@ -231,7 +228,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
+    <T> void executeRead(final AbstractRead<T> readCmd, final SettableFuture<T> returnFuture,
             final Boolean havePermit) {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
             final Boolean havePermit) {
         LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(),
                 readCmd.getPath());
@@ -298,7 +295,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext {
     }
 
     @Override
     }
 
     @Override
-    public boolean usesOperationLimiting() {
+    boolean usesOperationLimiting() {
         return true;
     }
 }
         return true;
     }
 }
index c96233ccebbff6f04f4b59b18e771ea75aabac43..549136b589fb1db11842ce1b9a2b7d5a83820844 100644 (file)
@@ -11,29 +11,43 @@ import akka.actor.ActorSelection;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Optional;
 import java.util.SortedSet;
 import com.google.common.util.concurrent.SettableFuture;
 import java.util.Optional;
 import java.util.SortedSet;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
 import org.opendaylight.controller.cluster.datastore.messages.AbstractRead;
+import org.opendaylight.yangtools.concepts.AbstractSimpleIdentifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import scala.concurrent.Future;
 
 import scala.concurrent.Future;
 
-/*
- * FIXME: why do we need this interface? It should be possible to integrate it with
- *        AbstractTransactionContext, which is the only implementation anyway.
- */
-interface TransactionContext {
-    void closeTransaction();
+abstract class TransactionContext extends AbstractSimpleIdentifiable<TransactionIdentifier> {
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
+
+    private final short transactionVersion;
 
 
-    Future<ActorSelection> readyTransaction(Boolean havePermit, Optional<SortedSet<String>> participatingShardNames);
+    private long modificationCount = 0;
+    private boolean handOffComplete;
 
 
-    <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> promise, Boolean havePermit);
+    TransactionContext(final TransactionIdentifier transactionIdentifier) {
+        this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION);
+    }
 
 
-    void executeDelete(YangInstanceIdentifier path, Boolean havePermit);
+    TransactionContext(final TransactionIdentifier transactionIdentifier, final short transactionVersion) {
+        super(transactionIdentifier);
+        this.transactionVersion = transactionVersion;
+    }
 
 
-    void executeMerge(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
+    final short getTransactionVersion() {
+        return transactionVersion;
+    }
 
 
-    void executeWrite(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
+    final void incrementModificationCount() {
+        modificationCount++;
+    }
 
 
-    Future<Object> directCommit(Boolean havePermit);
+    final void logModificationCount() {
+        LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount);
+    }
 
     /**
      * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing
 
     /**
      * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing
@@ -44,14 +58,35 @@ interface TransactionContext {
      * Implementations can rely on the wrapper calling this operation in a synchronized
      * block, so they do not need to ensure visibility of this state transition themselves.
      */
      * Implementations can rely on the wrapper calling this operation in a synchronized
      * block, so they do not need to ensure visibility of this state transition themselves.
      */
-    void operationHandOffComplete();
+    final void operationHandOffComplete() {
+        handOffComplete = true;
+    }
+
+    final boolean isOperationHandOffComplete() {
+        return handOffComplete;
+    }
 
     /**
      * A TransactionContext that uses operation limiting should return true else false.
      *
      * @return true if operation limiting is used, false otherwise
      */
 
     /**
      * A TransactionContext that uses operation limiting should return true else false.
      *
      * @return true if operation limiting is used, false otherwise
      */
-    boolean usesOperationLimiting();
+    boolean usesOperationLimiting() {
+        return false;
+    }
+
+    abstract void executeDelete(YangInstanceIdentifier path, Boolean havePermit);
+
+    abstract void executeMerge(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
+
+    abstract void executeWrite(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit);
+
+    abstract <T> void executeRead(AbstractRead<T> readCmd, SettableFuture<T> proxyFuture, Boolean havePermit);
+
+    abstract Future<ActorSelection> readyTransaction(Boolean havePermit,
+            Optional<SortedSet<String>> participatingShardNames);
+
+    abstract Future<Object> directCommit(Boolean havePermit);
 
 
-    short getTransactionVersion();
+    abstract void closeTransaction();
 }
 }
index 128d08a4d97b6db3739d1f6935a26bacff21765b..48e40179845ded6ede7a9780194a015e169abdfa 100644 (file)
@@ -21,6 +21,12 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.FrontendType;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
@@ -33,13 +39,8 @@ import scala.concurrent.Future;
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class LocalTransactionContextTest {
 
 @RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class LocalTransactionContextTest {
-
-    @Mock
-    private OperationLimiter limiter;
-
     @Mock
     private DOMStoreReadWriteTransaction readWriteTransaction;
     @Mock
     private DOMStoreReadWriteTransaction readWriteTransaction;
-
     @Mock
     private LocalTransactionReadySupport mockReadySupport;
 
     @Mock
     private LocalTransactionReadySupport mockReadySupport;
 
@@ -47,15 +48,17 @@ public class LocalTransactionContextTest {
 
     @Before
     public void setUp() {
 
     @Before
     public void setUp() {
-        localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier(),
-                mockReadySupport) {
+        final TransactionIdentifier txId = new TransactionIdentifier(new LocalHistoryIdentifier(ClientIdentifier.create(
+            FrontendIdentifier.create(MemberName.forName("member"), FrontendType.forName("type")), 0), 0), 0);
+
+        localTransactionContext = new LocalTransactionContext(readWriteTransaction, txId, mockReadySupport) {
             @Override
             @Override
-            protected DOMStoreWriteTransaction getWriteDelegate() {
+            DOMStoreWriteTransaction getWriteDelegate() {
                 return readWriteTransaction;
             }
 
             @Override
                 return readWriteTransaction;
             }
 
             @Override
-            protected DOMStoreReadTransaction getReadDelegate() {
+            DOMStoreReadTransaction getReadDelegate() {
                 return readWriteTransaction;
             }
         };
                 return readWriteTransaction;
             }
         };