From: Robert Varga Date: Tue, 21 Sep 2021 19:01:27 +0000 (+0200) Subject: Merge (Abstract)TransactionContext X-Git-Tag: v4.0.4~1 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=82be660900ab9bdd84941a0c3498c1ae36982aba Merge (Abstract)TransactionContext We have an interface and an abstract base class. Merge the two into an abstract class, reducing visibility of various methods. Also derive from AbstractSimpleIdentifiable, to make it more explicit we require a transaction identifier. This allows all callers to bind to the same vtable, improving method dispatch. Change-Id: I51419c4ac832aa676c8707d9bd459936fd906760 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java deleted file mode 100644 index bc8efa6c5a..0000000000 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContext.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License v1.0 which accompanies this distribution, - * and is available at http://www.eclipse.org/legal/epl-v10.html - */ -package org.opendaylight.controller.cluster.datastore; - -import org.eclipse.jdt.annotation.NonNull; -import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract class AbstractTransactionContext implements TransactionContext { - private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionContext.class); - private final TransactionIdentifier transactionIdentifier; - private long modificationCount = 0; - private boolean handOffComplete; - private final short transactionVersion; - - protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier) { - this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION); - } - - protected AbstractTransactionContext(TransactionIdentifier transactionIdentifier, short transactionVersion) { - // FIXME: requireNonNull()? - this.transactionIdentifier = transactionIdentifier; - this.transactionVersion = transactionVersion; - } - - /** - * Get the transaction identifier associated with this context. - * - * @return Transaction identifier. - */ - // FIXME: does this imply Identifiable? - protected final @NonNull TransactionIdentifier getIdentifier() { - return transactionIdentifier; - } - - protected final void incrementModificationCount() { - modificationCount++; - } - - protected final void logModificationCount() { - LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount); - } - - @Override - public final void operationHandOffComplete() { - handOffComplete = true; - } - - protected boolean isOperationHandOffComplete() { - return handOffComplete; - } - - @Override - public boolean usesOperationLimiting() { - return false; - } - - @Override - public short getTransactionVersion() { - return transactionVersion; - } -} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java index 211ebec63c..29b52f7557 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractTransactionContextFactory.java @@ -155,7 +155,7 @@ abstract class AbstractTransactionContextFactory { + findPrimaryFuture.onComplete(result -> { if (result.isSuccess()) { onFindPrimaryShardSuccess(result.get(), parent, shardName, contextWrapper); } else { @@ -240,12 +240,12 @@ abstract class AbstractTransactionContextFactory consumer) { @@ -62,22 +62,22 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { } @Override - public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { + void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { executeModification(transaction -> transaction.delete(path)); } @Override - public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { executeModification(transaction -> transaction.merge(path, data)); } @Override - public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { executeModification(transaction -> transaction.write(path, data)); } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture proxyFuture, + void executeRead(final AbstractRead readCmd, final SettableFuture proxyFuture, final Boolean havePermit) { Futures.addCallback(readCmd.apply(getReadDelegate()), new FutureCallback() { @Override @@ -93,26 +93,26 @@ abstract class LocalTransactionContext extends AbstractTransactionContext { }, MoreExecutors.directExecutor()); } - private LocalThreePhaseCommitCohort ready() { - logModificationCount(); - return readySupport.onTransactionReady(getWriteDelegate(), operationError); - } - @Override - public Future readyTransaction(final Boolean havePermit, + Future readyTransaction(final Boolean havePermit, final Optional> participatingShardNames) { final LocalThreePhaseCommitCohort cohort = ready(); return cohort.initiateCoordinatedCommit(participatingShardNames); } @Override - public Future directCommit(final Boolean havePermit) { + Future directCommit(final Boolean havePermit) { final LocalThreePhaseCommitCohort cohort = ready(); return cohort.initiateDirectCommit(); } @Override - public void closeTransaction() { + void closeTransaction() { txDelegate.close(); } + + private LocalThreePhaseCommitCohort ready() { + logModificationCount(); + return readySupport.onTransactionReady(getWriteDelegate(), operationError); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java index d98a1b4046..bfb0046ba0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/NoOpTransactionContext.java @@ -22,7 +22,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.Future; -final class NoOpTransactionContext extends AbstractTransactionContext { +final class NoOpTransactionContext extends TransactionContext { private static final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class); private final Throwable failure; @@ -33,26 +33,25 @@ final class NoOpTransactionContext extends AbstractTransactionContext { } @Override - public void closeTransaction() { + void closeTransaction() { LOG.debug("NoOpTransactionContext {} closeTransaction called", getIdentifier()); } @Override - public Future directCommit(final Boolean havePermit) { + Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called, failure", getIdentifier(), failure); return akka.dispatch.Futures.failed(failure); } @Override - public Future readyTransaction(final Boolean havePermit, + Future readyTransaction(final Boolean havePermit, final Optional> participatingShardNamess) { LOG.debug("Tx {} readyTransaction called, failure", getIdentifier(), failure); return akka.dispatch.Futures.failed(failure); } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture proxyFuture, - final Boolean havePermit) { + void executeRead(final AbstractRead readCmd, final SettableFuture proxyFuture, final Boolean havePermit) { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); @@ -67,17 +66,17 @@ final class NoOpTransactionContext extends AbstractTransactionContext { } @Override - public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { + void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path); } @Override - public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path); } @Override - public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java index a7e76ed436..ade9c375e5 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/RemoteTransactionContext.java @@ -14,6 +14,7 @@ import static java.util.Objects.requireNonNull; import akka.actor.ActorSelection; import akka.dispatch.Futures; import akka.dispatch.OnComplete; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; import java.util.SortedSet; @@ -40,7 +41,7 @@ import scala.concurrent.Future; * * @author Thomas Pantelis */ -public class RemoteTransactionContext extends AbstractTransactionContext { +final class RemoteTransactionContext extends TransactionContext { private static final Logger LOG = LoggerFactory.getLogger(RemoteTransactionContext.class); private final ActorUtils actorUtils; @@ -59,7 +60,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { */ private volatile Throwable failedModification; - protected RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, + RemoteTransactionContext(final TransactionIdentifier identifier, final ActorSelection actor, final ActorUtils actorUtils, final short remoteTransactionVersion, final OperationLimiter limiter) { super(identifier, remoteTransactionVersion); this.limiter = requireNonNull(limiter); @@ -76,7 +77,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void closeTransaction() { + void closeTransaction() { LOG.debug("Tx {} closeTransaction called", getIdentifier()); TransactionContextCleanup.untrack(this); @@ -84,7 +85,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public Future directCommit(final Boolean havePermit) { + Future directCommit(final Boolean havePermit) { LOG.debug("Tx {} directCommit called", getIdentifier()); // Send the remaining batched modifications, if any, with the ready flag set. @@ -93,7 +94,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public Future readyTransaction(final Boolean havePermit, + Future readyTransaction(final Boolean havePermit, final Optional> participatingShardNames) { logModificationCount(); @@ -104,7 +105,9 @@ public class RemoteTransactionContext extends AbstractTransactionContext { bumpPermits(havePermit); Future lastModificationsFuture = sendBatchedModifications(true, false, participatingShardNames); - return transformReadyReply(lastModificationsFuture); + // Transform the last reply Future into a Future that returns the cohort actor path from + // the last reply message. That's the end result of the ready operation. + return TransactionReadyReplyMapper.transform(lastModificationsFuture, actorUtils, getIdentifier()); } private void bumpPermits(final Boolean havePermit) { @@ -113,13 +116,6 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } } - protected Future transformReadyReply(final Future readyReplyFuture) { - // Transform the last reply Future into a Future that returns the cohort actor path from - // the last reply message. That's the end result of the ready operation. - - return TransactionReadyReplyMapper.transform(readyReplyFuture, actorUtils, getIdentifier()); - } - private BatchedModifications newBatchedModifications() { return new BatchedModifications(getIdentifier(), getTransactionVersion()); } @@ -142,11 +138,12 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } } - protected Future sendBatchedModifications() { + @VisibleForTesting + Future sendBatchedModifications() { return sendBatchedModifications(false, false, Optional.empty()); } - protected Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, + private Future sendBatchedModifications(final boolean ready, final boolean doCommitOnReady, final Optional> participatingShardNames) { Future sent = null; if (ready || batchedModifications != null && !batchedModifications.getModifications().isEmpty()) { @@ -202,19 +199,19 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { + void executeDelete(final YangInstanceIdentifier path, final Boolean havePermit) { LOG.debug("Tx {} executeDelete called path = {}", getIdentifier(), path); executeModification(new DeleteModification(path), havePermit); } @Override - public void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + void executeMerge(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { LOG.debug("Tx {} executeMerge called path = {}", getIdentifier(), path); executeModification(new MergeModification(path, data), havePermit); } @Override - public void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { + void executeWrite(final YangInstanceIdentifier path, final NormalizedNode data, final Boolean havePermit) { LOG.debug("Tx {} executeWrite called path = {}", getIdentifier(), path); executeModification(new WriteModification(path, data), havePermit); } @@ -231,7 +228,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, + void executeRead(final AbstractRead readCmd, final SettableFuture returnFuture, final Boolean havePermit) { LOG.debug("Tx {} executeRead {} called path = {}", getIdentifier(), readCmd.getClass().getSimpleName(), readCmd.getPath()); @@ -298,7 +295,7 @@ public class RemoteTransactionContext extends AbstractTransactionContext { } @Override - public boolean usesOperationLimiting() { + boolean usesOperationLimiting() { return true; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java index c96233cceb..549136b589 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionContext.java @@ -11,29 +11,43 @@ import akka.actor.ActorSelection; import com.google.common.util.concurrent.SettableFuture; import java.util.Optional; import java.util.SortedSet; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbstractRead; +import org.opendaylight.yangtools.concepts.AbstractSimpleIdentifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.concurrent.Future; -/* - * FIXME: why do we need this interface? It should be possible to integrate it with - * AbstractTransactionContext, which is the only implementation anyway. - */ -interface TransactionContext { - void closeTransaction(); +abstract class TransactionContext extends AbstractSimpleIdentifiable { + private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); + + private final short transactionVersion; - Future readyTransaction(Boolean havePermit, Optional> participatingShardNames); + private long modificationCount = 0; + private boolean handOffComplete; - void executeRead(AbstractRead readCmd, SettableFuture promise, Boolean havePermit); + TransactionContext(final TransactionIdentifier transactionIdentifier) { + this(transactionIdentifier, DataStoreVersions.CURRENT_VERSION); + } - void executeDelete(YangInstanceIdentifier path, Boolean havePermit); + TransactionContext(final TransactionIdentifier transactionIdentifier, final short transactionVersion) { + super(transactionIdentifier); + this.transactionVersion = transactionVersion; + } - void executeMerge(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit); + final short getTransactionVersion() { + return transactionVersion; + } - void executeWrite(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit); + final void incrementModificationCount() { + modificationCount++; + } - Future directCommit(Boolean havePermit); + final void logModificationCount() { + LOG.debug("Total modifications on Tx {} = [ {} ]", getIdentifier(), modificationCount); + } /** * Invoked by {@link AbstractTransactionContextWrapper} when it has finished handing @@ -44,14 +58,35 @@ interface TransactionContext { * Implementations can rely on the wrapper calling this operation in a synchronized * block, so they do not need to ensure visibility of this state transition themselves. */ - void operationHandOffComplete(); + final void operationHandOffComplete() { + handOffComplete = true; + } + + final boolean isOperationHandOffComplete() { + return handOffComplete; + } /** * A TransactionContext that uses operation limiting should return true else false. * * @return true if operation limiting is used, false otherwise */ - boolean usesOperationLimiting(); + boolean usesOperationLimiting() { + return false; + } + + abstract void executeDelete(YangInstanceIdentifier path, Boolean havePermit); + + abstract void executeMerge(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit); + + abstract void executeWrite(YangInstanceIdentifier path, NormalizedNode data, Boolean havePermit); + + abstract void executeRead(AbstractRead readCmd, SettableFuture proxyFuture, Boolean havePermit); + + abstract Future readyTransaction(Boolean havePermit, + Optional> participatingShardNames); + + abstract Future directCommit(Boolean havePermit); - short getTransactionVersion(); + abstract void closeTransaction(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java index 128d08a4d9..48e4017984 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/LocalTransactionContextTest.java @@ -21,6 +21,12 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier; +import org.opendaylight.controller.cluster.access.concepts.FrontendType; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.MemberName; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.DataExists; import org.opendaylight.controller.cluster.datastore.messages.ReadData; import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction; @@ -33,13 +39,8 @@ import scala.concurrent.Future; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class LocalTransactionContextTest { - - @Mock - private OperationLimiter limiter; - @Mock private DOMStoreReadWriteTransaction readWriteTransaction; - @Mock private LocalTransactionReadySupport mockReadySupport; @@ -47,15 +48,17 @@ public class LocalTransactionContextTest { @Before public void setUp() { - localTransactionContext = new LocalTransactionContext(readWriteTransaction, limiter.getIdentifier(), - mockReadySupport) { + final TransactionIdentifier txId = new TransactionIdentifier(new LocalHistoryIdentifier(ClientIdentifier.create( + FrontendIdentifier.create(MemberName.forName("member"), FrontendType.forName("type")), 0), 0), 0); + + localTransactionContext = new LocalTransactionContext(readWriteTransaction, txId, mockReadySupport) { @Override - protected DOMStoreWriteTransaction getWriteDelegate() { + DOMStoreWriteTransaction getWriteDelegate() { return readWriteTransaction; } @Override - protected DOMStoreReadTransaction getReadDelegate() { + DOMStoreReadTransaction getReadDelegate() { return readWriteTransaction; } };