Eliminate ask-based support from Shard 20/114220/33
authorRobert Varga <robert.varga@pantheon.tech>
Mon, 28 Oct 2024 00:19:38 +0000 (01:19 +0100)
committerRobert Varga <nite@hq.sk>
Fri, 1 Nov 2024 15:49:35 +0000 (15:49 +0000)
This eliminates all the moving pieces in production code, leaving only
messages.

We also remove the remaining tests, notably ShardCommitCoordinationTest,
ShardTransactionFailureTest and ShardTransactionTest.

Replacement for ShardCommitCoordinationTest will come via a separate
issue, as we need to revisit how ShardDataTree.pendingTransactions work,
now that we only have properly-coordinated can-commit/pre-commit stages.

Replacement for ShardTransaction(Failure)Test will be submitted in a
separate patch.

JIRA: CONTROLLER-2129
Change-Id: I0161f0291fc5b27779c8304456cac800ac7c9e90
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
18 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DefaultShardStatsMXBean.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionActorFactory.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/AbstractTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java [deleted file]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/CohortEntry.java
deleted file mode 100644 (file)
index 129ef26..0000000
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortDecorator;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-
-@Deprecated(since = "9.0.0", forRemoval = true)
-final class CohortEntry {
-    private final ReadWriteShardDataTreeTransaction transaction;
-    private final TransactionIdentifier transactionId;
-    private final short clientVersion;
-
-    private RuntimeException lastBatchedModificationsException;
-    private int totalBatchedModificationsReceived;
-    private int totalOperationsProcessed;
-    private ShardDataTreeCohort cohort;
-    private boolean doImmediateCommit;
-    private ActorRef replySender;
-    private Shard shard;
-
-    private CohortEntry(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
-        cohort = null;
-        this.transaction = requireNonNull(transaction);
-        transactionId = transaction.getIdentifier();
-        this.clientVersion = clientVersion;
-    }
-
-    private CohortEntry(final ShardDataTreeCohort cohort, final short clientVersion) {
-        this.cohort = requireNonNull(cohort);
-        transactionId = cohort.transactionId();
-        transaction = null;
-        this.clientVersion = clientVersion;
-    }
-
-    static CohortEntry createOpen(final ReadWriteShardDataTreeTransaction transaction, final short clientVersion) {
-        return new CohortEntry(transaction, clientVersion);
-    }
-
-    static CohortEntry createReady(final ShardDataTreeCohort cohort, final short clientVersion) {
-        return new CohortEntry(cohort, clientVersion);
-    }
-
-    TransactionIdentifier getTransactionId() {
-        return transactionId;
-    }
-
-    short getClientVersion() {
-        return clientVersion;
-    }
-
-    boolean isFailed() {
-        return cohort != null && cohort.isFailed();
-    }
-
-    DataTreeModification getDataTreeModification() {
-        return cohort.getDataTreeModification();
-    }
-
-    ReadWriteShardDataTreeTransaction getTransaction() {
-        return transaction;
-    }
-
-    int getTotalBatchedModificationsReceived() {
-        return totalBatchedModificationsReceived;
-    }
-
-    int getTotalOperationsProcessed() {
-        return totalOperationsProcessed;
-    }
-
-    RuntimeException getLastBatchedModificationsException() {
-        return lastBatchedModificationsException;
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Re-thrown")
-    void applyModifications(final List<Modification> modifications) {
-        totalBatchedModificationsReceived++;
-        if (lastBatchedModificationsException == null) {
-            totalOperationsProcessed += modifications.size();
-            for (Modification modification : modifications) {
-                try {
-                    modification.apply(transaction.getSnapshot());
-                } catch (RuntimeException e) {
-                    lastBatchedModificationsException = e;
-                    throw e;
-                }
-            }
-        }
-    }
-
-    void canCommit(final FutureCallback<Empty> callback) {
-        cohort.canCommit(callback);
-    }
-
-    void preCommit(final FutureCallback<DataTreeCandidate> callback) {
-        cohort.preCommit(callback);
-    }
-
-    void commit(final FutureCallback<UnsignedLong> callback) {
-        cohort.commit(callback);
-    }
-
-    void abort(final FutureCallback<Empty> callback) {
-        cohort.abort(callback);
-    }
-
-    void ready(final Optional<SortedSet<String>> participatingShardNames, final CohortDecorator cohortDecorator) {
-        checkState(cohort == null, "cohort was already set");
-
-        cohort = transaction.ready(participatingShardNames);
-
-        if (cohortDecorator != null) {
-            // Call the hook for unit tests.
-            cohort = cohortDecorator.decorate(transactionId, cohort);
-        }
-    }
-
-    boolean isSealed() {
-        return cohort != null;
-    }
-
-    Optional<SortedSet<String>> getParticipatingShardNames() {
-        return cohort != null ? cohort.getParticipatingShardNames() : Optional.empty();
-    }
-
-    boolean isDoImmediateCommit() {
-        return doImmediateCommit;
-    }
-
-    void setDoImmediateCommit(final boolean doImmediateCommit) {
-        this.doImmediateCommit = doImmediateCommit;
-    }
-
-    ActorRef getReplySender() {
-        return replySender;
-    }
-
-    void setReplySender(final ActorRef replySender) {
-        this.replySender = replySender;
-    }
-
-    Shard getShard() {
-        return shard;
-    }
-
-    void setShard(final Shard shard) {
-        this.shard = shard;
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder builder = new StringBuilder();
-        builder.append("CohortEntry [transactionId=").append(transactionId).append(", doImmediateCommit=")
-                .append(doImmediateCommit).append("]");
-        return builder.toString();
-    }
-}
index d477deaec8a31c0b39b42e42b5f96cf6f67e4415..8c46a1685aeb000711b700a41446daf4da7a027d 100644 (file)
@@ -26,8 +26,6 @@ import org.opendaylight.controller.cluster.raft.PeerAddressResolver;
 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.config.distributed.datastore.provider.rev231229.DataStoreProperties.ExportOnRecovery;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -72,8 +70,6 @@ public class DatastoreContext implements ClientActorConfig {
 
     public static final long DEFAULT_SYNC_INDEX_THRESHOLD = 10;
 
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreContext.class);
-
     private static final Set<String> GLOBAL_DATASTORE_NAMES = ConcurrentHashMap.newKeySet();
 
     private final DefaultConfigParamsImpl raftConfig = new DefaultConfigParamsImpl();
index e68b400c4eea51eb1dfb426468a01ae45fb28182..cdfc30e80628732326d958a76598ada996faf90b 100644 (file)
@@ -257,7 +257,8 @@ final class DefaultShardStatsMXBean extends AbstractMXBean implements ShardStats
 
     @Override
     public int getTxCohortCacheSize() {
-        return shard != null ? shard.getCohortCacheSize() : -1;
+        // FIXME: deprecate this?
+        return shard != null ? 0 : -1;
     }
 
     @Override
index a3ae949e05fbbc0ae134e9bd174612a2120621e0..6353b4797adc43ec198369e1b9c9e433d90ea20e 100644 (file)
@@ -24,7 +24,6 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
@@ -41,7 +40,6 @@ import org.apache.pekko.actor.Status.Failure;
 import org.apache.pekko.persistence.RecoveryCompleted;
 import org.apache.pekko.persistence.SnapshotOffer;
 import org.apache.pekko.serialization.JavaSerializer;
-import org.apache.pekko.serialization.Serialization;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.ABIVersion;
@@ -59,7 +57,6 @@ import org.opendaylight.controller.cluster.access.concepts.RequestSuccess;
 import org.opendaylight.controller.cluster.access.concepts.RetiredGenerationException;
 import org.opendaylight.controller.cluster.access.concepts.RuntimeRequestException;
 import org.opendaylight.controller.cluster.access.concepts.SliceableMessage;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
 import org.opendaylight.controller.cluster.common.actor.CommonConfig;
 import org.opendaylight.controller.cluster.common.actor.Dispatchers;
@@ -67,26 +64,16 @@ import org.opendaylight.controller.cluster.common.actor.Dispatchers.DispatcherTy
 import org.opendaylight.controller.cluster.common.actor.MessageTracker;
 import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
 import org.opendaylight.controller.cluster.datastore.actors.JsonExportActor;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStatsMXBean;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.GetKnownClients;
 import org.opendaylight.controller.cluster.datastore.messages.GetKnownClientsReply;
 import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
 import org.opendaylight.controller.cluster.datastore.messages.MakeLeaderLocal;
 import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
 import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
@@ -180,9 +167,6 @@ public class Shard extends RaftActor {
 
     private DatastoreContext datastoreContext;
 
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private final ShardCommitCoordinator commitCoordinator;
-
     private long transactionCommitTimeout;
 
     private Cancellable txCommitTimeoutCheckSchedule;
@@ -191,18 +175,12 @@ public class Shard extends RaftActor {
 
     private final MessageTracker appendEntriesReplyTracker;
 
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private final ShardTransactionActorFactory transactionActorFactory;
-
     private final ShardSnapshotCohort snapshotCohort;
 
     private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
 
     private ShardSnapshot restoreFromSnapshot;
 
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private final ShardTransactionMessageRetrySupport messageRetrySupport;
-
     @VisibleForTesting
     final FrontendMetadata frontendMetadata;
 
@@ -258,8 +236,6 @@ public class Shard extends RaftActor {
             getContext().become(new MeteringBehavior(this));
         }
 
-        commitCoordinator = new ShardCommitCoordinator(store, LOG, name);
-
         setTransactionCommitTimeout();
 
         // create a notifier actor for each cluster member
@@ -269,15 +245,10 @@ public class Shard extends RaftActor {
                 getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
 
         dispatchers = new Dispatchers(context().system().dispatchers());
-        transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
-            dispatchers.getDispatcherPath(Dispatchers.DispatcherType.Transaction),
-                self(), getContext(), shardMBean.shardStats(), builder.getId().getShardName());
 
         snapshotCohort = ShardSnapshotCohort.create(getContext(), builder.getId().getMemberName(), store, LOG,
             name, datastoreContext);
 
-        messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
-
         responseMessageSlicer = MessageSlicer.builder().logContext(name)
                 .messageSliceSize(datastoreContext.getMaximumMessageSliceSize())
                 .fileBackedStreamFactory(getRaftActorContext().getFileBackedOutputStreamFactory())
@@ -304,14 +275,10 @@ public class Shard extends RaftActor {
 
         super.postStop();
 
-        messageRetrySupport.close();
-
         if (txCommitTimeoutCheckSchedule != null) {
             txCommitTimeoutCheckSchedule.cancel();
         }
 
-        commitCoordinator.abortPendingTransactions("Transaction aborted due to shutdown.", this);
-
         shardMBean.unregisterMBean();
         listenerInfoMXBean.unregister();
     }
@@ -395,28 +362,7 @@ public class Shard extends RaftActor {
             } else if (GetKnownClients.INSTANCE.equals(message)) {
                 handleGetKnownClients();
             } else if (!responseMessageSlicer.handleMessage(message)) {
-                // Ask-based protocol messages
-                if (CreateTransaction.isSerializedType(message)) {
-                    handleCreateTransaction(message);
-                } else if (message instanceof BatchedModifications request) {
-                    handleBatchedModifications(request);
-                } else if (message instanceof ForwardedReadyTransaction request) {
-                    handleForwardedReadyTransaction(request);
-                } else if (message instanceof ReadyLocalTransaction request) {
-                    handleReadyLocalTransaction(request);
-                } else if (CanCommitTransaction.isSerializedType(message)) {
-                    handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
-                } else if (CommitTransaction.isSerializedType(message)) {
-                    handleCommitTransaction(CommitTransaction.fromSerializable(message));
-                } else if (AbortTransaction.isSerializedType(message)) {
-                    handleAbortTransaction(AbortTransaction.fromSerializable(message));
-                } else if (CloseTransactionChain.isSerializedType(message)) {
-                    closeTransactionChain(CloseTransactionChain.fromSerializable(message));
-                } else if (ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) {
-                    messageRetrySupport.onTimerMessage(message);
-                } else {
-                    super.handleNonRaftCommand(message);
-                }
+                super.handleNonRaftCommand(message);
             }
         }
     }
@@ -457,7 +403,6 @@ public class Shard extends RaftActor {
 
     private void commitTimeoutCheck() {
         store.checkForExpiredTransactions(transactionCommitTimeout, this::updateAccess);
-        commitCoordinator.checkForExpiredTransactions(transactionCommitTimeout, this);
         requestMessageAssembler.checkExpiredAssembledMessageState();
     }
 
@@ -659,10 +604,6 @@ public class Shard extends RaftActor {
         return store.getQueueSize();
     }
 
-    final int getCohortCacheSize() {
-        return commitCoordinator.getCohortCacheSize();
-    }
-
     @Override
     protected final ActorRef roleChangeNotifier() {
         return roleChangeNotifier;
@@ -701,253 +642,10 @@ public class Shard extends RaftActor {
         }
     }
 
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void handleCommitTransaction(final CommitTransaction commit) {
-        final var txId = commit.getTransactionId();
-        if (isLeader()) {
-            askProtocolEncountered(txId);
-            commitCoordinator.handleCommit(txId, getSender(), this);
-        } else {
-            final var leader = getLeader();
-            if (leader == null) {
-                messageRetrySupport.addMessageToRetry(commit, getSender(), "Could not commit transaction " + txId);
-            } else {
-                LOG.debug("{}: Forwarding CommitTransaction to leader {}", persistenceId(), leader);
-                leader.forward(commit, getContext());
-            }
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
-        final var txId = canCommit.getTransactionId();
-        LOG.debug("{}: Can committing transaction {}", persistenceId(), txId);
-
-        if (isLeader()) {
-            askProtocolEncountered(txId);
-            commitCoordinator.handleCanCommit(txId, getSender(), this);
-        } else {
-            final var leader = getLeader();
-            if (leader == null) {
-                messageRetrySupport.addMessageToRetry(canCommit, getSender(),
-                        "Could not canCommit transaction " + txId);
-            } else {
-                LOG.debug("{}: Forwarding CanCommitTransaction to leader {}", persistenceId(), leader);
-                leader.forward(canCommit, getContext());
-            }
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void handleBatchedModificationsLocal(final BatchedModifications batched, final ActorRef sender) {
-        askProtocolEncountered(batched.getTransactionId());
-
-        try {
-            commitCoordinator.handleBatchedModifications(batched, sender, this);
-        } catch (Exception e) {
-            LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
-                    batched.getTransactionId(), e);
-            sender.tell(new Failure(e), getSelf());
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void handleBatchedModifications(final BatchedModifications batched) {
-        // This message is sent to prepare the modifications transaction directly on the Shard as an
-        // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
-        // BatchedModifications message, the caller sets the ready flag in the message indicating
-        // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
-        // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
-        // ReadyTransaction message.
-
-        // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
-        // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
-        // the primary/leader shard. However with timing and caching on the front-end, there's a small
-        // window where it could have a stale leader during leadership transitions.
-        //
-        boolean isLeaderActive = isLeaderActive();
-        if (isLeader() && isLeaderActive) {
-            handleBatchedModificationsLocal(batched, getSender());
-        } else {
-            final var leader = getLeader();
-            if (!isLeaderActive || leader == null) {
-                messageRetrySupport.addMessageToRetry(batched, getSender(),
-                        "Could not process BatchedModifications " + batched.getTransactionId());
-            } else {
-                // If this is not the first batch and leadership changed in between batched messages,
-                // we need to reconstruct previous BatchedModifications from the transaction
-                // DataTreeModification, honoring the max batched modification count, and forward all the
-                // previous BatchedModifications to the new leader.
-                final var newModifications = commitCoordinator.createForwardedBatchedModifications(batched,
-                    datastoreContext.getShardBatchedModificationCount());
-
-                LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(),
-                        newModifications.size(), leader);
-
-                for (BatchedModifications bm : newModifications) {
-                    leader.forward(bm, getContext());
-                }
-            }
-        }
-    }
-
-    private boolean failIfIsolatedLeader(final ActorRef sender) {
-        if (isIsolatedLeader()) {
-            sender.tell(new Failure(new NoShardLeaderException(String.format(
-                    "Shard %s was the leader but has lost contact with all of its followers. Either all"
-                    + " other follower nodes are down or this node is isolated by a network partition.",
-                    persistenceId()))), getSelf());
-            return true;
-        }
-
-        return false;
-    }
-
     protected boolean isIsolatedLeader() {
         return getRaftState() == RaftState.IsolatedLeader;
     }
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    @Deprecated(since = "9.0.0", forRemoval = true)
-   private void handleReadyLocalTransaction(final ReadyLocalTransaction message) {
-        final var txId = message.getTransactionId();
-        LOG.debug("{}: handleReadyLocalTransaction for {}", persistenceId(), txId);
-
-        final var isLeaderActive = isLeaderActive();
-        if (isLeader() && isLeaderActive) {
-            askProtocolEncountered(txId);
-            try {
-                commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
-            } catch (Exception e) {
-                LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(), txId, e);
-                getSender().tell(new Failure(e), getSelf());
-            }
-        } else {
-            final var leader = getLeader();
-            if (!isLeaderActive || leader == null) {
-                messageRetrySupport.addMessageToRetry(message, getSender(),
-                        "Could not process ready local transaction " + txId);
-            } else {
-                LOG.debug("{}: Forwarding ReadyLocalTransaction to leader {}", persistenceId(), leader);
-                message.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
-                leader.forward(message, getContext());
-            }
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void handleForwardedReadyTransaction(final ForwardedReadyTransaction forwardedReady) {
-        LOG.debug("{}: handleForwardedReadyTransaction for {}", persistenceId(), forwardedReady.getTransactionId());
-
-        final var isLeaderActive = isLeaderActive();
-        if (isLeader() && isLeaderActive) {
-            askProtocolEncountered(forwardedReady.getTransactionId());
-            commitCoordinator.handleForwardedReadyTransaction(forwardedReady, getSender(), this);
-        } else {
-            final var leader = getLeader();
-            if (!isLeaderActive || leader == null) {
-                messageRetrySupport.addMessageToRetry(forwardedReady, getSender(),
-                        "Could not process forwarded ready transaction " + forwardedReady.getTransactionId());
-            } else {
-                LOG.debug("{}: Forwarding ForwardedReadyTransaction to leader {}", persistenceId(), leader);
-
-                final var readyLocal = new ReadyLocalTransaction(forwardedReady.getTransactionId(),
-                        forwardedReady.getTransaction().getSnapshot(), forwardedReady.isDoImmediateCommit(),
-                        forwardedReady.getParticipatingShardNames());
-                readyLocal.setRemoteVersion(getCurrentBehavior().getLeaderPayloadVersion());
-                leader.forward(readyLocal, getContext());
-            }
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void handleAbortTransaction(final AbortTransaction abort) {
-        final var transactionId = abort.getTransactionId();
-        askProtocolEncountered(transactionId);
-        doAbortTransaction(transactionId, getSender());
-    }
-
-    final void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
-        commitCoordinator.handleAbort(transactionID, sender, this);
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void handleCreateTransaction(final Object message) {
-        if (isLeader()) {
-            createTransaction(CreateTransaction.fromSerializable(message));
-        } else if (getLeader() != null) {
-            getLeader().forward(message, getContext());
-        } else {
-            getSender().tell(new Failure(new NoShardLeaderException(
-                    "Could not create a shard transaction", persistenceId())), getSelf());
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
-        if (isLeader()) {
-            final var id = closeTransactionChain.getIdentifier();
-            askProtocolEncountered(id.getClientId());
-            store.closeTransactionChain(id);
-        } else if (getLeader() != null) {
-            getLeader().forward(closeTransactionChain, getContext());
-        } else {
-            LOG.warn("{}: Could not close transaction {}", persistenceId(), closeTransactionChain.getIdentifier());
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void createTransaction(final CreateTransaction createTransaction) {
-        askProtocolEncountered(createTransaction.getTransactionId());
-
-        try {
-            if (TransactionType.fromInt(createTransaction.getTransactionType()) != TransactionType.READ_ONLY
-                    && failIfIsolatedLeader(getSender())) {
-                return;
-            }
-
-            final var transactionActor = createTransaction(createTransaction.getTransactionType(),
-                createTransaction.getTransactionId());
-
-            getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
-                    createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
-        } catch (Exception e) {
-            getSender().tell(new Failure(e), getSelf());
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private ActorRef createTransaction(final int transactionType, final TransactionIdentifier transactionId) {
-        LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
-        return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
-            transactionId);
-    }
-
-    // Called on leader only
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void askProtocolEncountered(final TransactionIdentifier transactionId) {
-        askProtocolEncountered(transactionId.getHistoryId().getClientId());
-    }
-
-    // Called on leader only
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    private void askProtocolEncountered(final ClientIdentifier clientId) {
-        final var frontend = clientId.getFrontendId();
-        final var state = knownFrontends.get(frontend);
-        if (!(state instanceof LeaderFrontendState.Disabled)) {
-            LOG.debug("{}: encountered ask-based client {}, disabling transaction tracking", persistenceId(), clientId);
-            if (knownFrontends.isEmpty()) {
-                knownFrontends = new HashMap<>();
-            }
-            knownFrontends.put(frontend, new LeaderFrontendState.Disabled(persistenceId(), clientId, getDataStore()));
-
-            persistPayload(clientId, DisableTrackingPayload.create(clientId,
-                datastoreContext.getInitialPayloadSerializedBufferCapacity()), false);
-        }
-    }
-
     private void updateSchemaContext(final UpdateSchemaContext message) {
         updateSchemaContext(message.modelContext());
     }
@@ -1032,10 +730,6 @@ public class Shard extends RaftActor {
             paused = false;
             store.purgeLeaderState();
         }
-
-        if (hasLeader && !isIsolatedLeader()) {
-            messageRetrySupport.retryMessages();
-        }
     }
 
     @Override
@@ -1050,43 +744,11 @@ public class Shard extends RaftActor {
             }
 
             requestMessageAssembler.close();
-
-            if (!hasLeader()) {
-                // No leader anywhere, nothing else to do
-                return;
-            }
-
-            // Another leader was elected. If we were the previous leader and had pending transactions, convert
-            // them to transaction messages and send to the new leader.
-            ActorSelection leader = getLeader();
-            if (leader != null) {
-                // Clears all pending transactions and converts them to messages to be forwarded to a new leader.
-                Collection<?> messagesToForward = commitCoordinator.convertPendingTransactionsToMessages(
-                    datastoreContext.getShardBatchedModificationCount());
-
-                if (!messagesToForward.isEmpty()) {
-                    LOG.debug("{}: Forwarding {} pending transaction messages to leader {}", persistenceId(),
-                            messagesToForward.size(), leader);
-
-                    for (Object message : messagesToForward) {
-                        LOG.debug("{}: Forwarding pending transaction message {}", persistenceId(), message);
-
-                        leader.tell(message, self());
-                    }
-                }
-            } else {
-                commitCoordinator.abortPendingTransactions("The transacton was aborted due to inflight leadership "
-                        + "change and the leader address isn't available.", this);
-            }
         } else {
             // We have become the leader, we need to reconstruct frontend state
             knownFrontends = verifyNotNull(frontendMetadata.toLeaderState(this));
             LOG.debug("{}: became leader with frontend state for {}", persistenceId(), knownFrontends.keySet());
         }
-
-        if (!isIsolatedLeader()) {
-            messageRetrySupport.retryMessages();
-        }
     }
 
     @Override
@@ -1134,11 +796,6 @@ public class Shard extends RaftActor {
         return super.journalPluginId();
     }
 
-    @VisibleForTesting
-    final ShardCommitCoordinator getCommitCoordinator() {
-        return commitCoordinator;
-    }
-
     // non-final for mocking
     DatastoreContext getDatastoreContext() {
         return datastoreContext;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java
deleted file mode 100644 (file)
index 663c332..0000000
+++ /dev/null
@@ -1,480 +0,0 @@
-/*
- * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Status.Failure;
-import org.apache.pekko.serialization.Serialization;
-import org.eclipse.jdt.annotation.NonNull;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
-import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
-import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.common.Empty;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
-import org.slf4j.Logger;
-
-/**
- * Coordinates commits for a shard ensuring only one concurrent 3-phase commit.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-final class ShardCommitCoordinator {
-
-    // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts.
-    @VisibleForTesting
-    public interface CohortDecorator {
-        ShardDataTreeCohort decorate(Identifier transactionID, ShardDataTreeCohort actual);
-    }
-
-    private final Map<Identifier, CohortEntry> cohortCache = new HashMap<>();
-
-    private final ShardDataTree dataTree;
-
-    private final Logger log;
-
-    private final String name;
-
-    // This is a hook for unit tests to replace or decorate the ShardDataTreeCohorts.
-    @VisibleForTesting
-    private CohortDecorator cohortDecorator;
-
-    private ReadyTransactionReply readyTransactionReply;
-
-    ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
-        this.log = log;
-        this.name = name;
-        this.dataTree = requireNonNull(dataTree);
-    }
-
-    int getCohortCacheSize() {
-        return cohortCache.size();
-    }
-
-    private String persistenceId() {
-        return dataTree.logContext();
-    }
-
-    private ReadyTransactionReply readyTransactionReply(final ActorRef cohort) {
-        if (readyTransactionReply == null) {
-            readyTransactionReply = new ReadyTransactionReply(Serialization.serializedActorPath(cohort));
-        }
-
-        return readyTransactionReply;
-    }
-
-    /**
-     * This method is called to ready a transaction that was prepared by ShardTransaction actor. It caches
-     * the prepared cohort entry for the given transactions ID in preparation for the subsequent 3-phase commit.
-     *
-     * @param ready the ForwardedReadyTransaction message to process
-     * @param sender the sender of the message
-     * @param shard the transaction's shard actor
-     */
-    void handleForwardedReadyTransaction(final ForwardedReadyTransaction ready, final ActorRef sender,
-            final Shard shard) {
-        log.debug("{}: Readying transaction {}, client version {}", name,
-                ready.getTransactionId(), ready.getTxnClientVersion());
-
-        final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
-        final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
-        cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
-
-        if (ready.isDoImmediateCommit()) {
-            cohortEntry.setDoImmediateCommit(true);
-            cohortEntry.setReplySender(sender);
-            cohortEntry.setShard(shard);
-            handleCanCommit(cohortEntry);
-        } else {
-            // The caller does not want immediate commit - the 3-phase commit will be coordinated by the
-            // front-end so send back a ReadyTransactionReply with our actor path.
-            sender.tell(readyTransactionReply(shard.self()), shard.self());
-        }
-    }
-
-    /**
-     * This method handles a BatchedModifications message for a transaction being prepared directly on the
-     * Shard actor instead of via a ShardTransaction actor. If there's no currently cached
-     * DOMStoreWriteTransaction, one is created. The batched modifications are applied to the write Tx. If
-     * the BatchedModifications is ready to commit then a DOMStoreThreePhaseCommitCohort is created.
-     *
-     * @param batched the BatchedModifications message to process
-     * @param sender the sender of the message
-     */
-    @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Replay of captured failure")
-    void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
-        CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
-        if (cohortEntry == null || cohortEntry.isSealed()) {
-            cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
-                batched.getVersion());
-            cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("{}: Applying {} batched modifications for Tx {}", name,
-                    batched.getModifications().size(), batched.getTransactionId());
-        }
-
-        cohortEntry.applyModifications(batched.getModifications());
-
-        if (batched.isReady()) {
-            if (cohortEntry.getLastBatchedModificationsException() != null) {
-                cohortCache.remove(cohortEntry.getTransactionId());
-                throw cohortEntry.getLastBatchedModificationsException();
-            }
-
-            if (cohortEntry.getTotalBatchedModificationsReceived() != batched.getTotalMessagesSent()) {
-                cohortCache.remove(cohortEntry.getTransactionId());
-                throw new IllegalStateException(String.format(
-                        "The total number of batched messages received %d does not match the number sent %d",
-                        cohortEntry.getTotalBatchedModificationsReceived(), batched.getTotalMessagesSent()));
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
-                        batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
-            }
-
-            cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
-            cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
-
-            if (batched.isDoCommitOnReady()) {
-                cohortEntry.setReplySender(sender);
-                cohortEntry.setShard(shard);
-                handleCanCommit(cohortEntry);
-            } else {
-                sender.tell(readyTransactionReply(shard.self()), shard.self());
-            }
-        } else {
-            sender.tell(new BatchedModificationsReply(batched.getModifications().size()), shard.self());
-        }
-    }
-
-    /**
-     * This method handles {@link ReadyLocalTransaction} message. All transaction modifications have
-     * been prepared beforehand by the sender and we just need to drive them through into the
-     * dataTree.
-     *
-     * @param message the ReadyLocalTransaction message to process
-     * @param sender the sender of the message
-     * @param shard the transaction's shard actor
-     */
-    void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
-        final TransactionIdentifier txId = message.getTransactionId();
-        final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
-                message.getParticipatingShardNames());
-        final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
-        cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
-        cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
-
-        log.debug("{}: Applying local modifications for Tx {}", name, txId);
-
-        if (message.isDoCommitOnReady()) {
-            cohortEntry.setReplySender(sender);
-            cohortEntry.setShard(shard);
-            handleCanCommit(cohortEntry);
-        } else {
-            sender.tell(readyTransactionReply(shard.self()), shard.self());
-        }
-    }
-
-    @Deprecated(since = "9.0.0", forRemoval = true)
-    Collection<BatchedModifications> createForwardedBatchedModifications(final BatchedModifications from,
-            final int maxModificationsPerBatch) {
-        CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId());
-        if (cohortEntry == null || cohortEntry.getTransaction() == null) {
-            return Collections.singletonList(from);
-        }
-
-        cohortEntry.applyModifications(from.getModifications());
-
-        final LinkedList<BatchedModifications> newModifications = new LinkedList<>();
-        cohortEntry.getTransaction().getSnapshot().applyToCursor(new AbstractBatchedModificationsCursor() {
-            @Override
-            protected BatchedModifications getModifications() {
-                if (newModifications.isEmpty()
-                        || newModifications.getLast().getModifications().size() >= maxModificationsPerBatch) {
-                    newModifications.add(new BatchedModifications(from.getTransactionId(), from.getVersion()));
-                }
-
-                return newModifications.getLast();
-            }
-        });
-
-        BatchedModifications last = newModifications.getLast();
-        last.setDoCommitOnReady(from.isDoCommitOnReady());
-        if (from.isReady()) {
-            last.setReady(from.getParticipatingShardNames());
-        }
-        last.setTotalMessagesSent(newModifications.size());
-        return newModifications;
-    }
-
-    private void handleCanCommit(final CohortEntry cohortEntry) {
-        cohortEntry.canCommit(new FutureCallback<>() {
-            @Override
-            public void onSuccess(final Empty result) {
-                log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId());
-
-                if (cohortEntry.isDoImmediateCommit()) {
-                    doCommit(cohortEntry);
-                } else {
-                    cohortEntry.getReplySender().tell(
-                        CanCommitTransactionReply.yes(cohortEntry.getClientVersion()).toSerializable(),
-                        cohortEntry.getShard().self());
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
-                    failure);
-
-                cohortCache.remove(cohortEntry.getTransactionId());
-                cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
-            }
-        });
-    }
-
-    /**
-     * This method handles the canCommit phase for a transaction.
-     *
-     * @param transactionID the ID of the transaction to canCommit
-     * @param sender the actor to which to send the response
-     * @param shard the transaction's shard actor
-     */
-    void handleCanCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
-        // Lookup the cohort entry that was cached previously (or should have been) by
-        // transactionReady (via the ForwardedReadyTransaction message).
-        final CohortEntry cohortEntry = cohortCache.get(transactionID);
-        if (cohortEntry == null) {
-            // Either canCommit was invoked before ready (shouldn't happen) or a long time passed
-            // between canCommit and ready and the entry was expired from the cache or it was aborted.
-            IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
-            log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
-            sender.tell(new Failure(ex), shard.self());
-            return;
-        }
-
-        cohortEntry.setReplySender(sender);
-        cohortEntry.setShard(shard);
-
-        handleCanCommit(cohortEntry);
-    }
-
-    void doCommit(final CohortEntry cohortEntry) {
-        log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
-
-        // We perform the preCommit phase here atomically with the commit phase. This is an
-        // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
-        // coordination of preCommit across shards in case of failure but preCommit should not
-        // normally fail since we ensure only one concurrent 3-phase commit.
-        cohortEntry.preCommit(new FutureCallback<DataTreeCandidate>() {
-            @Override
-            public void onSuccess(final DataTreeCandidate candidate) {
-                finishCommit(cohortEntry.getReplySender(), cohortEntry);
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                log.error("{} An exception occurred while preCommitting transaction {}", name,
-                        cohortEntry.getTransactionId(), failure);
-
-                cohortCache.remove(cohortEntry.getTransactionId());
-                cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
-            }
-        });
-    }
-
-    void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
-        log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
-
-        cohortEntry.commit(new FutureCallback<UnsignedLong>() {
-            @Override
-            public void onSuccess(final UnsignedLong result) {
-                final TransactionIdentifier txId = cohortEntry.getTransactionId();
-                log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
-                    sender);
-
-                cohortCache.remove(cohortEntry.getTransactionId());
-                sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
-                    cohortEntry.getShard().self());
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                final TransactionIdentifier txId = cohortEntry.getTransactionId();
-                log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
-
-                cohortCache.remove(cohortEntry.getTransactionId());
-                sender.tell(new Failure(failure), cohortEntry.getShard().self());
-            }
-        });
-    }
-
-    /**
-     * This method handles the preCommit and commit phases for a transaction.
-     *
-     * @param transactionID the ID of the transaction to commit
-     * @param sender the actor to which to send the response
-     * @param shard the transaction's shard actor
-     */
-    void handleCommit(final Identifier transactionID, final ActorRef sender, final Shard shard) {
-        final CohortEntry cohortEntry = cohortCache.get(transactionID);
-        if (cohortEntry == null) {
-            // Either a long time passed between canCommit and commit and the entry was expired from the cache
-            // or it was aborted.
-            IllegalStateException ex = new IllegalStateException(
-                    String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
-            log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
-            sender.tell(new Failure(ex), shard.self());
-            return;
-        }
-
-        cohortEntry.setReplySender(sender);
-        doCommit(cohortEntry);
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    void handleAbort(final Identifier transactionID, final ActorRef sender, final Shard shard) {
-        CohortEntry cohortEntry = cohortCache.remove(transactionID);
-        if (cohortEntry == null) {
-            return;
-        }
-
-        log.debug("{}: Aborting transaction {}", name, transactionID);
-
-        final ActorRef self = shard.getSelf();
-        cohortEntry.abort(new FutureCallback<>() {
-            @Override
-            public void onSuccess(final Empty result) {
-                if (sender != null) {
-                    sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
-                }
-            }
-
-            @Override
-            public void onFailure(final Throwable failure) {
-                log.error("{}: An exception happened during abort", name, failure);
-
-                if (sender != null) {
-                    sender.tell(new Failure(failure), self);
-                }
-            }
-        });
-    }
-
-    void checkForExpiredTransactions(final long timeout, final Shard shard) {
-        cohortCache.values().removeIf(CohortEntry::isFailed);
-    }
-
-    void abortPendingTransactions(final String reason, final Shard shard) {
-        final var failure = new Failure(new RuntimeException(reason));
-        final var pending = dataTree.getAndClearPendingTransactions();
-
-        log.debug("{}: Aborting {} pending queued transactions", name, pending.size());
-
-        for (var cohort : pending) {
-            final var cohortEntry = cohortCache.remove(cohort.transactionId());
-            if (cohortEntry != null) {
-                final var replySender = cohortEntry.getReplySender();
-                if (replySender != null) {
-                    replySender.tell(failure, shard.self());
-                }
-            }
-        }
-
-        cohortCache.clear();
-    }
-
-    Collection<?> convertPendingTransactionsToMessages(final int maxModificationsPerBatch) {
-        final var messages = new ArrayList<VersionedExternalizableMessage>();
-        for (var cohort : dataTree.getAndClearPendingTransactions()) {
-            final var cohortEntry = cohortCache.remove(cohort.transactionId());
-            if (cohortEntry == null) {
-                continue;
-            }
-
-            final var newMessages = new ArrayDeque<BatchedModifications>();
-            cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() {
-                @Override
-                protected BatchedModifications getModifications() {
-                    final var lastBatch = newMessages.peekLast();
-                    if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) {
-                        return lastBatch;
-                    }
-
-                    // Allocate a new message
-                    final var ret = new BatchedModifications(cohortEntry.getTransactionId(),
-                        cohortEntry.getClientVersion());
-                    newMessages.add(ret);
-                    return ret;
-                }
-            });
-
-            final var last = newMessages.peekLast();
-            if (last != null) {
-                final boolean immediate = cohortEntry.isDoImmediateCommit();
-                last.setDoCommitOnReady(immediate);
-                last.setReady(cohortEntry.getParticipatingShardNames());
-                last.setTotalMessagesSent(newMessages.size());
-
-                messages.addAll(newMessages);
-
-                if (!immediate) {
-                    switch (cohort.getState()) {
-                        case CAN_COMMIT_COMPLETE:
-                        case CAN_COMMIT_PENDING:
-                            messages.add(new CanCommitTransaction(cohortEntry.getTransactionId(),
-                                cohortEntry.getClientVersion()));
-                            break;
-                        case PRE_COMMIT_COMPLETE:
-                        case PRE_COMMIT_PENDING:
-                            messages.add(new CommitTransaction(cohortEntry.getTransactionId(),
-                                cohortEntry.getClientVersion()));
-                            break;
-                        default:
-                            break;
-                    }
-                }
-            }
-        }
-
-        return messages;
-    }
-
-    @VisibleForTesting
-    void setCohortDecorator(final CohortDecorator cohortDecorator) {
-        this.cohortDecorator = cohortDecorator;
-    }
-}
index 2d2a0727e922597735746612d7bb694d032081bb..996ae38d6d02597003a486a5b7969f7ae95c59dd 100644 (file)
@@ -1170,17 +1170,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return cohort;
     }
 
-    // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
-    // the newReadWriteTransaction()
-    final ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
-            final Optional<SortedSet<String>> participatingShardNames) {
-        final var historyId = txId.getHistoryId();
-        if (historyId.getHistoryId() == 0) {
-            return createReadyCohort(txId, mod, participatingShardNames);
-        }
-        return ensureTransactionChain(historyId, null).createReadyCohort(txId, mod, participatingShardNames);
-    }
-
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     final void checkForExpiredTransactions(final long transactionCommitTimeoutMillis,
             final Function<SimpleShardDataTreeCohort, OptionalLong> accessTimeUpdater) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
deleted file mode 100644 (file)
index e9a3356..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-
-/**
- * Actor for a shard read transaction.
- *
- * @author syedbahm
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public final class ShardReadTransaction extends ShardTransaction {
-    private final AbstractShardDataTreeTransaction<?> transaction;
-
-    public ShardReadTransaction(final AbstractShardDataTreeTransaction<?> transaction, final ActorRef shardActor,
-            final ShardStats shardStats) {
-        super(shardActor, shardStats, transaction.getIdentifier());
-        this.transaction = requireNonNull(transaction);
-    }
-
-    @Override
-    public void handleReceive(final Object message) {
-        if (ReadData.isSerializedType(message)) {
-            readData(transaction, ReadData.fromSerializable(message));
-        } else if (DataExists.isSerializedType(message)) {
-            dataExists(transaction, DataExists.fromSerializable(message));
-        } else {
-            super.handleReceive(message);
-        }
-    }
-
-    @Override
-    protected AbstractShardDataTreeTransaction<?> getDOMStoreTransaction() {
-        return transaction;
-    }
-
-    @Override
-    protected boolean returnCloseTransactionReply() {
-        return false;
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
deleted file mode 100644 (file)
index 2cca705..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-
-/**
- * Actor for a shard read/write transaction.
- *
- * @author syedbahm
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardReadWriteTransaction extends ShardWriteTransaction {
-    public ShardReadWriteTransaction(final ReadWriteShardDataTreeTransaction transaction, final ActorRef shardActor,
-            final ShardStats shardStats) {
-        super(transaction, shardActor, shardStats);
-    }
-
-    @Override
-    public void handleReceive(final Object message) {
-        if (ReadData.isSerializedType(message)) {
-            readData(ReadData.fromSerializable(message));
-        } else if (DataExists.isSerializedType(message)) {
-            dataExists((DataExists) message);
-        } else {
-            super.handleReceive(message);
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
deleted file mode 100644 (file)
index 33c1dfa..0000000
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.PoisonPill;
-import org.apache.pekko.actor.Props;
-import org.apache.pekko.actor.ReceiveTimeout;
-import org.apache.pekko.actor.Status.Failure;
-import org.apache.pekko.japi.Creator;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActorWithMetering;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-
-/**
- * The ShardTransaction Actor represents a remote transaction that delegates all actions to DOMDataReadWriteTransaction.
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public abstract class ShardTransaction extends AbstractUntypedActorWithMetering {
-    private final ActorRef shardActor;
-    private final ShardStats shardStats;
-    private final TransactionIdentifier transactionId;
-
-    protected ShardTransaction(final ActorRef shardActor, final ShardStats shardStats,
-            final TransactionIdentifier transactionId) {
-        // actor name override used for metering. This does not change the "real" actor name
-        super("shard-tx");
-        this.shardActor = shardActor;
-        this.shardStats = shardStats;
-        this.transactionId = requireNonNull(transactionId);
-    }
-
-    public static Props props(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
-            final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
-        return Props.create(ShardTransaction.class,
-            new ShardTransactionCreator(type, transaction, shardActor, datastoreContext, shardStats));
-    }
-
-    protected abstract AbstractShardDataTreeTransaction<?> getDOMStoreTransaction();
-
-    protected ActorRef getShardActor() {
-        return shardActor;
-    }
-
-    protected final TransactionIdentifier getTransactionId() {
-        return transactionId;
-    }
-
-    @Override
-    @Deprecated(since = "11.0.0", forRemoval = true)
-    public final ActorRef getSender() {
-        return super.getSender();
-    }
-
-    @Override
-    public void handleReceive(final Object message) {
-        if (CloseTransaction.isSerializedType(message)) {
-            closeTransaction(true);
-        } else if (message instanceof ReceiveTimeout) {
-            LOG.debug("Got ReceiveTimeout for inactivity - closing transaction {}", transactionId);
-            closeTransaction(false);
-        } else {
-            unknownMessage(message);
-        }
-    }
-
-    protected boolean returnCloseTransactionReply() {
-        return true;
-    }
-
-    private void closeTransaction(final boolean sendReply) {
-        getDOMStoreTransaction().abortFromTransactionActor();
-
-        if (sendReply && returnCloseTransactionReply()) {
-            getSender().tell(new CloseTransactionReply(), getSelf());
-        }
-
-        getSelf().tell(PoisonPill.getInstance(), getSelf());
-    }
-
-    private boolean checkClosed(final AbstractShardDataTreeTransaction<?> transaction) {
-        final boolean ret = transaction.isClosed();
-        if (ret) {
-            shardStats.incrementFailedReadTransactionsCount();
-            getSender().tell(new Failure(new ReadFailedException("Transaction is closed")), getSelf());
-        }
-        return ret;
-    }
-
-    protected void readData(final AbstractShardDataTreeTransaction<?> transaction, final ReadData message) {
-        if (checkClosed(transaction)) {
-            return;
-        }
-
-        final YangInstanceIdentifier path = message.getPath();
-        ReadDataReply readDataReply = new ReadDataReply(transaction.getSnapshot().readNode(path).orElse(null),
-            message.getVersion());
-        getSender().tell(readDataReply.toSerializable(), self());
-    }
-
-    protected void dataExists(final AbstractShardDataTreeTransaction<?> transaction, final DataExists message) {
-        if (checkClosed(transaction)) {
-            return;
-        }
-
-        final YangInstanceIdentifier path = message.getPath();
-        boolean exists = transaction.getSnapshot().readNode(path).isPresent();
-        getSender().tell(new DataExistsReply(exists, message.getVersion()).toSerializable(), getSelf());
-    }
-
-    @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "Some fields are not Serializable but we don't "
-            + "create remote instances of this actor and thus don't need it to be Serializable.")
-    private static class ShardTransactionCreator implements Creator<ShardTransaction> {
-        @java.io.Serial
-        private static final long serialVersionUID = 1L;
-
-        final AbstractShardDataTreeTransaction<?> transaction;
-        final ActorRef shardActor;
-        final DatastoreContext datastoreContext;
-        final ShardStats shardStats;
-        final TransactionType type;
-
-        ShardTransactionCreator(final TransactionType type, final AbstractShardDataTreeTransaction<?> transaction,
-                final ActorRef shardActor, final DatastoreContext datastoreContext, final ShardStats shardStats) {
-            this.transaction = requireNonNull(transaction);
-            this.shardActor = shardActor;
-            this.shardStats = shardStats;
-            this.datastoreContext = datastoreContext;
-            this.type = type;
-        }
-
-        @Override
-        public ShardTransaction create() {
-            final var tx = switch (type) {
-                case READ_ONLY -> new ShardReadTransaction(transaction, shardActor, shardStats);
-                case READ_WRITE -> new ShardReadWriteTransaction((ReadWriteShardDataTreeTransaction) transaction,
-                    shardActor, shardStats);
-                case WRITE_ONLY -> new ShardWriteTransaction((ReadWriteShardDataTreeTransaction) transaction,
-                    shardActor, shardStats);
-                default -> throw new IllegalArgumentException("Unhandled transaction type " + type);
-            };
-            tx.getContext().setReceiveTimeout(datastoreContext.getShardTransactionIdleTimeout());
-            return tx;
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionActorFactory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionActorFactory.java
deleted file mode 100644 (file)
index 77b4654..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.pekko.actor.AbstractActor.ActorContext;
-import org.apache.pekko.actor.ActorRef;
-import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-
-/**
- * A factory for creating ShardTransaction actors.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-final class ShardTransactionActorFactory {
-    private static final AtomicLong ACTOR_NAME_COUNTER = new AtomicLong();
-
-    private final ShardDataTree dataTree;
-    private final DatastoreContext datastoreContext;
-    private final String txnDispatcherPath;
-    private final ShardStats shardMBean;
-    private final ActorContext actorContext;
-    private final ActorRef shardActor;
-    private final String shardName;
-
-    ShardTransactionActorFactory(final ShardDataTree dataTree, final DatastoreContext datastoreContext,
-            final String txnDispatcherPath, final ActorRef shardActor, final ActorContext actorContext,
-            final ShardStats shardMBean, final String shardName) {
-        this.dataTree = requireNonNull(dataTree);
-        this.datastoreContext = requireNonNull(datastoreContext);
-        this.txnDispatcherPath = requireNonNull(txnDispatcherPath);
-        this.shardMBean = requireNonNull(shardMBean);
-        this.actorContext = requireNonNull(actorContext);
-        this.shardActor = requireNonNull(shardActor);
-        this.shardName = requireNonNull(shardName);
-    }
-
-    private String actorNameFor(final TransactionIdentifier txId) {
-        final LocalHistoryIdentifier historyId = txId.getHistoryId();
-        final ClientIdentifier clientId = historyId.getClientId();
-        final FrontendIdentifier frontendId = clientId.getFrontendId();
-
-        final StringBuilder sb = new StringBuilder("shard-");
-        sb.append(shardName).append('-')
-            .append(frontendId.getMemberName().getName()).append(':')
-            .append(frontendId.getClientType().getName()).append('@')
-            .append(clientId.getGeneration()).append(':');
-        if (historyId.getHistoryId() != 0) {
-            sb.append(historyId.getHistoryId()).append('-');
-        }
-
-        return sb.append(txId.getTransactionId()).append('_').append(ACTOR_NAME_COUNTER.incrementAndGet()).toString();
-    }
-
-    ActorRef newShardTransaction(final TransactionType type, final TransactionIdentifier transactionID) {
-        final AbstractShardDataTreeTransaction<?> transaction = switch (type) {
-            case READ_ONLY -> dataTree.newReadOnlyTransaction(transactionID);
-            case READ_WRITE, WRITE_ONLY -> dataTree.newReadWriteTransaction(transactionID);
-            default -> throw new IllegalArgumentException("Unsupported transaction type " + type);
-        };
-        return actorContext.actorOf(ShardTransaction.props(type, transaction, shardActor, datastoreContext, shardMBean)
-            .withDispatcher(txnDispatcherPath), actorNameFor(transactionID));
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionMessageRetrySupport.java
deleted file mode 100644 (file)
index 9938c94..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static java.util.Objects.requireNonNull;
-
-import java.io.Closeable;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Cancellable;
-import org.apache.pekko.actor.Status.Failure;
-import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Supporting class for Shard that maintains state for retrying transaction messages when there is no leader.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-class ShardTransactionMessageRetrySupport implements Closeable {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardTransactionMessageRetrySupport.class);
-
-    static final Class<?> TIMER_MESSAGE_CLASS = MessageInfo.class;
-
-    private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
-    private final Shard shard;
-
-    ShardTransactionMessageRetrySupport(final Shard shard) {
-        this.shard = shard;
-    }
-
-    void addMessageToRetry(final Object message, final ActorRef replyTo, final String failureMessage) {
-        LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
-
-        MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
-
-        FiniteDuration period = shard.getDatastoreContext().getShardRaftConfig().getElectionTimeOutInterval().$times(2);
-        messageInfo.timer = shard.getContext().system().scheduler().scheduleOnce(period, shard.getSelf(),
-                messageInfo, shard.getContext().dispatcher(), ActorRef.noSender());
-
-        messagesToRetry.add(messageInfo);
-    }
-
-    void retryMessages() {
-        if (messagesToRetry.isEmpty()) {
-            return;
-        }
-
-        MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
-        messagesToRetry.clear();
-
-        for (MessageInfo info: copy) {
-            LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
-            info.retry(shard);
-        }
-    }
-
-    void onTimerMessage(final Object message) {
-        MessageInfo messageInfo = (MessageInfo)message;
-
-        LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
-
-        messagesToRetry.remove(messageInfo);
-        messageInfo.timedOut(shard);
-    }
-
-    @Override
-    public void close() {
-        for (MessageInfo info: messagesToRetry) {
-            info.timedOut(shard);
-        }
-
-        messagesToRetry.clear();
-    }
-
-    private static final class MessageInfo {
-        final Object message;
-        final ActorRef replyTo;
-        final String failureMessage;
-        Cancellable timer;
-
-        MessageInfo(final Object message, final ActorRef replyTo, final String failureMessage) {
-            this.message = message;
-            this.replyTo = replyTo;
-            this.failureMessage = requireNonNull(failureMessage);
-        }
-
-        void retry(final Shard shard) {
-            timer.cancel();
-            shard.getSelf().tell(message, replyTo);
-        }
-
-        void timedOut(final Shard shard) {
-            replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),
-                    shard.getSelf());
-        }
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
deleted file mode 100644 (file)
index 2f0dc9b..0000000
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
- * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.PoisonPill;
-import org.apache.pekko.actor.Status.Failure;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-
-/**
- * Actor for a shard write-only transaction.
- *
- * @author syedbahm
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardWriteTransaction extends ShardTransaction {
-    private int totalBatchedModificationsReceived;
-    private Exception lastBatchedModificationsException;
-    private final ReadWriteShardDataTreeTransaction transaction;
-
-    public ShardWriteTransaction(final ReadWriteShardDataTreeTransaction transaction, final ActorRef shardActor,
-            final ShardStats shardStats) {
-        super(shardActor, shardStats, transaction.getIdentifier());
-        this.transaction = transaction;
-    }
-
-    @Override
-    protected ReadWriteShardDataTreeTransaction getDOMStoreTransaction() {
-        return transaction;
-    }
-
-    @Override
-    public void handleReceive(final Object message) {
-        if (message instanceof BatchedModifications) {
-            batchedModifications((BatchedModifications)message);
-        } else {
-            super.handleReceive(message);
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private void batchedModifications(final BatchedModifications batched) {
-        if (checkClosed()) {
-            if (batched.isReady()) {
-                getSelf().tell(PoisonPill.getInstance(), getSelf());
-            }
-            return;
-        }
-
-        try {
-            for (Modification modification: batched.getModifications()) {
-                modification.apply(transaction.getSnapshot());
-            }
-
-            totalBatchedModificationsReceived++;
-            if (batched.isReady()) {
-                if (lastBatchedModificationsException != null) {
-                    throw lastBatchedModificationsException;
-                }
-
-                if (totalBatchedModificationsReceived != batched.getTotalMessagesSent()) {
-                    throw new IllegalStateException(String.format(
-                            "The total number of batched messages received %d does not match the number sent %d",
-                            totalBatchedModificationsReceived, batched.getTotalMessagesSent()));
-                }
-
-                readyTransaction(batched);
-            } else {
-                getSender().tell(new BatchedModificationsReply(batched.getModifications().size()), getSelf());
-            }
-        } catch (Exception e) {
-            lastBatchedModificationsException = e;
-            getSender().tell(new Failure(e), getSelf());
-
-            if (batched.isReady()) {
-                getSelf().tell(PoisonPill.getInstance(), getSelf());
-            }
-        }
-    }
-
-    protected final void dataExists(final DataExists message) {
-        super.dataExists(transaction, message);
-    }
-
-    protected final void readData(final ReadData message) {
-        super.readData(transaction, message);
-    }
-
-    private boolean checkClosed() {
-        final boolean ret = transaction.isClosed();
-        if (ret) {
-            getSender().tell(new Failure(new IllegalStateException("Transaction is closed, no modifications allowed")),
-                getSelf());
-        }
-        return ret;
-    }
-
-    private void readyTransaction(final BatchedModifications batched) {
-        TransactionIdentifier transactionID = getTransactionId();
-
-        LOG.debug("readyTransaction : {}", transactionID);
-
-        getShardActor().forward(new ForwardedReadyTransaction(transactionID, batched.getVersion(),
-                transaction, batched.isDoCommitOnReady(), batched.getParticipatingShardNames()), getContext());
-
-        // The shard will handle the commit from here so we're no longer needed - self-destruct.
-        getSelf().tell(PoisonPill.getInstance(), getSelf());
-    }
-}
index d6bbc727bb9a5a675a599a17a776541469ebb675..8e10c9b48a75f632294ac54db815b4c45cd4606e 100644 (file)
@@ -14,7 +14,6 @@ import static org.junit.Assert.fail;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -22,16 +21,11 @@ import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulCommit;
 import static org.opendaylight.controller.cluster.datastore.ShardDataTreeMocking.successfulPreCommit;
 
-import com.google.common.primitives.UnsignedLong;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.SortedSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -58,7 +52,6 @@ import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
 import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
 import org.opendaylight.controller.md.cluster.datastore.model.CarsModel;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.DataContainerChild;
@@ -232,23 +225,6 @@ public abstract class AbstractShardTest extends AbstractActorTest {
         return cohort;
     }
 
-    @Deprecated(since = "11.0.0", forRemoval = true)
-    protected static Map<TransactionIdentifier, CapturingShardDataTreeCohort> setupCohortDecorator(final Shard shard,
-            final TransactionIdentifier... transactionIDs) {
-        final Map<TransactionIdentifier, CapturingShardDataTreeCohort> cohortMap = new HashMap<>();
-        for (TransactionIdentifier id: transactionIDs) {
-            cohortMap.put(id, new CapturingShardDataTreeCohort());
-        }
-
-        shard.getCommitCoordinator().setCohortDecorator((transactionID, actual) -> {
-            CapturingShardDataTreeCohort cohort = cohortMap.get(transactionID);
-            cohort.setDelegate(actual);
-            return cohort;
-        });
-
-        return cohortMap;
-    }
-
     public static NormalizedNode readStore(final TestActorRef<? extends Shard> shard,
             final YangInstanceIdentifier id) {
         return shard.underlyingActor().getDataStore().readNode(id).orElse(null);
@@ -345,100 +321,4 @@ public abstract class AbstractShardTest extends AbstractActorTest {
             return delegate.create();
         }
     }
-
-    @Deprecated(since = "11.0.0", forRemoval = true)
-    public static class CapturingShardDataTreeCohort extends ShardDataTreeCohort {
-        private volatile ShardDataTreeCohort delegate;
-        private FutureCallback<Empty> canCommit;
-        private FutureCallback<DataTreeCandidate> preCommit;
-        private FutureCallback<UnsignedLong> commit;
-
-        public void setDelegate(final ShardDataTreeCohort delegate) {
-            this.delegate = delegate;
-        }
-
-        public FutureCallback<Empty> getCanCommit() {
-            assertNotNull("canCommit was not invoked", canCommit);
-            return canCommit;
-        }
-
-        public FutureCallback<DataTreeCandidate> getPreCommit() {
-            assertNotNull("preCommit was not invoked", preCommit);
-            return preCommit;
-        }
-
-        public FutureCallback<UnsignedLong> getCommit() {
-            assertNotNull("commit was not invoked", commit);
-            return commit;
-        }
-
-        @Override
-        TransactionIdentifier transactionId() {
-            return delegate.transactionId();
-        }
-
-        @Override
-        DataTreeCandidateTip getCandidate() {
-            return delegate.getCandidate();
-        }
-
-        @Override
-        DataTreeModification getDataTreeModification() {
-            return delegate.getDataTreeModification();
-        }
-
-        @Override
-        public void canCommit(final FutureCallback<Empty> callback) {
-            canCommit = mockFutureCallback(callback);
-            delegate.canCommit(canCommit);
-        }
-
-        @Override
-        public void preCommit(final FutureCallback<DataTreeCandidate> callback) {
-            preCommit = mockFutureCallback(callback);
-            delegate.preCommit(preCommit);
-        }
-
-        @Override
-        public void commit(final FutureCallback<UnsignedLong> callback) {
-            commit = mockFutureCallback(callback);
-            delegate.commit(commit);
-        }
-
-        @SuppressWarnings("unchecked")
-        private static <T> FutureCallback<T> mockFutureCallback(final FutureCallback<T> actual) {
-            FutureCallback<T> mock = mock(FutureCallback.class);
-            doAnswer(invocation -> {
-                actual.onFailure(invocation.getArgument(0));
-                return null;
-            }).when(mock).onFailure(any(Throwable.class));
-
-            doAnswer(invocation -> {
-                actual.onSuccess(invocation.getArgument(0));
-                return null;
-            }).when(mock).onSuccess((T) nullable(Object.class));
-
-            return mock;
-        }
-
-        @Override
-        public void abort(final FutureCallback<Empty> callback) {
-            delegate.abort(callback);
-        }
-
-        @Override
-        public boolean isFailed() {
-            return delegate.isFailed();
-        }
-
-        @Override
-        public State getState() {
-            return delegate.getState();
-        }
-
-        @Override
-        Optional<SortedSet<String>> getParticipatingShardNames() {
-            return delegate.getParticipatingShardNames();
-        }
-    }
 }
index fca1f767b305ba3c8bbd39f9fc15de9a092023df..93986a82a252a768d7775f72b2c6336159a455e9 100644 (file)
@@ -26,7 +26,8 @@ public abstract class AbstractTest {
     protected static final MemberName MEMBER_NAME = MemberName.forName("member-1");
     protected static final MemberName MEMBER_2_NAME = MemberName.forName("member-2");
 
-    private static final FrontendType FRONTEND_TYPE = FrontendType.forName(ShardTransactionTest.class.getSimpleName());
+    // FIXME: use a different name
+    private static final FrontendType FRONTEND_TYPE = FrontendType.forName("ShardTransactionTest");
 
     protected static final FrontendIdentifier FRONTEND_ID = FrontendIdentifier.create(MEMBER_NAME, FRONTEND_TYPE);
 
index 68deb562d53eb9bf6233d120a7bbe2854ab7f40b..2aa3e22a66304be176074ff56561d5105f1420d1 100644 (file)
@@ -48,14 +48,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.ActorSelection;
 import org.apache.pekko.actor.ActorSystem;
 import org.apache.pekko.actor.Address;
 import org.apache.pekko.actor.AddressFromURIString;
-import org.apache.pekko.actor.Status.Failure;
 import org.apache.pekko.cluster.Cluster;
 import org.apache.pekko.cluster.Member;
-import org.apache.pekko.dispatch.Futures;
 import org.apache.pekko.pattern.Patterns;
 import org.apache.pekko.testkit.javadsl.TestKit;
 import org.junit.After;
@@ -75,11 +72,6 @@ import org.opendaylight.controller.cluster.datastore.DatastoreContext.Builder;
 import org.opendaylight.controller.cluster.datastore.TestShard.RequestFrontendMetadata;
 import org.opendaylight.controller.cluster.datastore.TestShard.StartDropMessages;
 import org.opendaylight.controller.cluster.datastore.TestShard.StopDropMessages;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendShardDataTreeSnapshotMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
@@ -121,7 +113,6 @@ import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
 import org.opendaylight.yangtools.yang.data.tree.api.ConflictingModificationAppliedException;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTree;
 import org.opendaylight.yangtools.yang.data.tree.api.DataTreeConfiguration;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
 import org.opendaylight.yangtools.yang.data.tree.impl.di.InMemoryDataTreeFactory;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import scala.collection.Set;
@@ -720,137 +711,6 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
-    @Test
-    public void testReadyLocalTransactionForwardedToLeader() throws Exception {
-        initDatastoresWithCars("testReadyLocalTransactionForwardedToLeader");
-        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
-
-        final Optional<ActorRef> carsFollowerShard =
-                followerDistributedDataStore.getActorUtils().findLocalShard("cars");
-        assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
-
-        final DataTree dataTree = new InMemoryDataTreeFactory().create(
-            DataTreeConfiguration.DEFAULT_OPERATIONAL, SchemaContextHelper.full());
-
-        // Send a tx with immediate commit.
-
-        DataTreeModification modification = dataTree.takeSnapshot().newModification();
-        modification.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        modification.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-
-        final var car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
-        modification.write(CarsModel.newCarPath("optima"), car1);
-        modification.ready();
-
-        ReadyLocalTransaction readyLocal = new ReadyLocalTransaction(tx1 , modification, true, Optional.empty());
-
-        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
-        Object resp = followerTestKit.expectMsgClass(Object.class);
-        if (resp instanceof Failure failure) {
-            throw new AssertionError("Unexpected failure response", failure.cause());
-        }
-
-        assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
-
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
-
-        // Send another tx without immediate commit.
-
-        final var car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
-        modification = dataTree.takeSnapshot().newModification();
-        modification.write(CarsModel.newCarPath("sportage"), car2);
-        modification.ready();
-
-        readyLocal = new ReadyLocalTransaction(tx2 , modification, false, Optional.empty());
-
-        carsFollowerShard.orElseThrow().tell(readyLocal, followerTestKit.getRef());
-        resp = followerTestKit.expectMsgClass(Object.class);
-        if (resp instanceof Failure failure) {
-            throw new AssertionError("Unexpected failure response", failure.cause());
-        }
-
-        assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
-
-        final ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
-                ((ReadyTransactionReply)resp).getCohortPath());
-
-        ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(leaderDistributedDataStore.getActorUtils(),
-            List.of(new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
-                () -> DataStoreVersions.CURRENT_VERSION)), tx2);
-        cohort.canCommit().get(5, TimeUnit.SECONDS);
-        cohort.preCommit().get(5, TimeUnit.SECONDS);
-        cohort.commit().get(5, TimeUnit.SECONDS);
-
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
-    }
-
-    @Test
-    public void testForwardedReadyTransactionForwardedToLeader() throws Exception {
-        initDatastoresWithCars("testForwardedReadyTransactionForwardedToLeader");
-        followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorUtils(), "cars");
-
-        final Optional<ActorRef> carsFollowerShard =
-                followerDistributedDataStore.getActorUtils().findLocalShard("cars");
-        assertTrue("Cars follower shard found", carsFollowerShard.isPresent());
-
-        carsFollowerShard.orElseThrow().tell(GetShardDataTree.INSTANCE, followerTestKit.getRef());
-        final DataTree dataTree = followerTestKit.expectMsgClass(DataTree.class);
-
-        // Send a tx with immediate commit.
-
-        DataTreeModification modification = dataTree.takeSnapshot().newModification();
-        modification.write(CarsModel.BASE_PATH, CarsModel.emptyContainer());
-        modification.merge(CarsModel.CAR_LIST_PATH, CarsModel.newCarMapNode());
-
-        final var car1 = CarsModel.newCarEntry("optima", Uint64.valueOf(20000));
-        modification.write(CarsModel.newCarPath("optima"), car1);
-
-        ForwardedReadyTransaction forwardedReady = new ForwardedReadyTransaction(tx1, DataStoreVersions.CURRENT_VERSION,
-            new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx1, modification),
-            true, Optional.empty());
-
-        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
-        Object resp = followerTestKit.expectMsgClass(Object.class);
-        if (resp instanceof Failure failure) {
-            throw new AssertionError("Unexpected failure response", failure.cause());
-        }
-
-        assertEquals("Response type", CommitTransactionReply.class, resp.getClass());
-
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1);
-
-        // Send another tx without immediate commit.
-
-        final var car2 = CarsModel.newCarEntry("sportage", Uint64.valueOf(30000));
-        modification = dataTree.takeSnapshot().newModification();
-        modification.write(CarsModel.newCarPath("sportage"), car2);
-
-        forwardedReady = new ForwardedReadyTransaction(tx2, DataStoreVersions.CURRENT_VERSION,
-            new ReadWriteShardDataTreeTransaction(mock(ShardDataTreeTransactionParent.class), tx2, modification),
-            false, Optional.empty());
-
-        carsFollowerShard.orElseThrow().tell(forwardedReady, followerTestKit.getRef());
-        resp = followerTestKit.expectMsgClass(Object.class);
-        if (resp instanceof Failure failure) {
-            throw new AssertionError("Unexpected failure response", failure.cause());
-        }
-
-        assertEquals("Response type", ReadyTransactionReply.class, resp.getClass());
-
-        ActorSelection txActor = leaderDistributedDataStore.getActorUtils().actorSelection(
-                ((ReadyTransactionReply)resp).getCohortPath());
-
-        final ThreePhaseCommitCohortProxy cohort = new ThreePhaseCommitCohortProxy(
-            leaderDistributedDataStore.getActorUtils(), List.of(
-                new ThreePhaseCommitCohortProxy.CohortInfo(Futures.successful(txActor),
-                    () -> DataStoreVersions.CURRENT_VERSION)), tx2);
-        cohort.canCommit().get(5, TimeUnit.SECONDS);
-        cohort.preCommit().get(5, TimeUnit.SECONDS);
-        cohort.commit().get(5, TimeUnit.SECONDS);
-
-        verifyCars(leaderDistributedDataStore.newReadOnlyTransaction(), car1, car2);
-    }
-
     @Test
     public void testTransactionForwardedToLeaderAfterRetry() throws Exception {
         followerDatastoreContextBuilder.shardBatchedModificationCount(2);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinationTest.java
deleted file mode 100644 (file)
index 11cd2b7..0000000
+++ /dev/null
@@ -1,572 +0,0 @@
-/*
- * Copyright (c) 2018 Inocybe Technologies and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertNotNull;
-import static org.opendaylight.controller.cluster.datastore.DataStoreVersions.CURRENT_VERSION;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.EMPTY_OUTER_LIST;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.EMPTY_TEST;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.INNER_LIST_QNAME;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.NAME_QNAME;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.OUTER_LIST_PATH;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.TEST_PATH;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerEntryPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerMapPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.innerNode;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntry;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerEntryPath;
-import static org.opendaylight.controller.md.cluster.datastore.model.TestModel.outerNode;
-
-import com.google.common.collect.ImmutableSortedSet;
-import java.time.Duration;
-import java.util.Optional;
-import java.util.SortedSet;
-import org.apache.pekko.dispatch.Dispatchers;
-import org.apache.pekko.testkit.TestActorRef;
-import org.apache.pekko.testkit.javadsl.TestKit;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Unit tests for various 3PC coordination scenarios.
- *
- * @author Thomas Pantelis
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardCommitCoordinationTest extends AbstractShardTest {
-    private static final Logger LOG = LoggerFactory.getLogger(ShardCommitCoordinationTest.class);
-
-    /**
-     * Test 2 tx's accessing the same shards.
-     * <pre>
-     *   tx1 -> shard A, shard B
-     *   tx2 -> shard A, shard B
-     * </pre>
-     * The tx's are readied such the pendingTransactions queue are as follows:
-     * <pre>
-     *   Queue for shard A -> tx1, tx2
-     *   Queue for shard B -> tx2, tx1
-     * </pre>
-     * This is a potential deadlock scenario (ABBA) which should be avoided by allowing tx1 to proceed on shard B
-     * even though it isn't at the head of the queues.
-     */
-    @Test
-    public void testTwoTransactionsWithSameTwoParticipatingShards() {
-        final String testName = "testTwoTransactionsWithSameTwoParticipatingShards";
-        LOG.info("{} starting", testName);
-
-        final TestKit kit1 = new TestKit(getSystem());
-        final TestKit kit2 = new TestKit(getSystem());
-
-        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
-        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
-
-        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
-                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardA);
-
-        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
-                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardB);
-
-        final TransactionIdentifier txId1 = nextTransactionId();
-        final TransactionIdentifier txId2 = nextTransactionId();
-
-        SortedSet<String> participatingShardNames = ImmutableSortedSet.of(shardAId.getShardName(),
-                shardBId.getShardName());
-
-        // Ready [tx1, tx2] on shard A.
-
-        shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames), kit1.getRef());
-        kit1.expectMsgClass(ReadyTransactionReply.class);
-
-        shardA.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
-                participatingShardNames), kit2.getRef());
-        kit2.expectMsgClass(ReadyTransactionReply.class);
-
-        // Ready [tx2, tx1] on shard B.
-
-        shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, outerNode(1),
-                participatingShardNames), kit2.getRef());
-        kit2.expectMsgClass(ReadyTransactionReply.class);
-
-        shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames), kit1.getRef());
-        kit1.expectMsgClass(ReadyTransactionReply.class);
-
-        // Send tx2 CanCommit to A - tx1 is at the head of the queue so tx2 should not proceed as A is the first shard
-        // in the participating shard list.
-
-        shardA.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectNoMessage(Duration.ofMillis(100));
-
-        // Send tx1 CanCommit to A - it's at the head of the queue so should proceed.
-
-        shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
-        // shard list [A] matches that of tx2 [A] so tx1 should be de-queued and allowed to proceed.
-
-        shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Send tx2 CanCommit to B - tx1 should now be at the head of he queue.
-
-        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectNoMessage(Duration.ofMillis(100));
-
-        // Finish commit of tx1.
-
-        shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CommitTransactionReply.class);
-
-        // Finish commit of tx2.
-
-        kit2.expectMsgClass(CanCommitTransactionReply.class);
-        kit2.expectMsgClass(CanCommitTransactionReply.class);
-
-        shardA.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CommitTransactionReply.class);
-
-        // Verify data in the data store.
-
-        verifyOuterListEntry(shardA, 1);
-        verifyOuterListEntry(shardB, 1);
-
-        LOG.info("{} ending", testName);
-    }
-
-    /**
-     * Test multiple tx's accessing a mix of same and differing shards.
-     * <pre>
-     *   tx1 -> shard X, shard B
-     *   tx2 -> shard X, shard B
-     *   tx3 -> shard A, shard B
-     *   tx4 -> shard A, shard B
-     *   tx5 -> shard A, shard B
-     * </pre>
-     * The tx's are readied such the pendingTransactions queue are as follows:
-     * <pre>
-     *   Queue for shard A -> tx3, tx4, tx5
-     *   Queue for shard B -> tx1, tx2, tx5, tx4, tx3
-     * </pre>
-     * Note: shard X means any other shard which isn't relevant for the test.
-     * This is a potential deadlock scenario (ABBA) which should be avoided by moving tx3 ahead of tx5 on shard B when
-     * CanCommit is requested.
-     */
-    @Test
-    public void testMultipleTransactionsWithMixedParticipatingShards() {
-        final String testName = "testMultipleTransactionsWithMixedParticipatingShards";
-        LOG.info("{} starting", testName);
-
-        final TestKit kit1 = new TestKit(getSystem());
-        final TestKit kit2 = new TestKit(getSystem());
-        final TestKit kit3 = new TestKit(getSystem());
-        final TestKit kit4 = new TestKit(getSystem());
-        final TestKit kit5 = new TestKit(getSystem());
-
-        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
-        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
-
-        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
-                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardA);
-
-        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
-                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardB);
-
-        final TransactionIdentifier txId1 = nextTransactionId();
-        final TransactionIdentifier txId2 = nextTransactionId();
-        final TransactionIdentifier txId3 = nextTransactionId();
-        final TransactionIdentifier txId4 = nextTransactionId();
-        final TransactionIdentifier txId5 = nextTransactionId();
-
-        final SortedSet<String> participatingShardNames1 = ImmutableSortedSet.of(shardAId.getShardName(),
-                shardBId.getShardName());
-        final SortedSet<String> participatingShardNames2 = ImmutableSortedSet.of("shardX", shardBId.getShardName());
-
-        // Ready [tx3, tx4, tx5] on shard A.
-
-        shardA.tell(newReadyBatchedModifications(txId3, TEST_PATH, EMPTY_TEST, participatingShardNames1),
-            kit3.getRef());
-        kit3.expectMsgClass(ReadyTransactionReply.class);
-
-        shardA.tell(newReadyBatchedModifications(txId4, OUTER_LIST_PATH, EMPTY_OUTER_LIST,
-                participatingShardNames1), kit4.getRef());
-        kit4.expectMsgClass(ReadyTransactionReply.class);
-
-        shardA.tell(newReadyBatchedModifications(txId5, outerEntryPath(1), outerEntry(1), participatingShardNames1),
-            kit5.getRef());
-        kit5.expectMsgClass(ReadyTransactionReply.class);
-
-        // Ready [tx1, tx2, tx5, tx4, tx3] on shard B.
-
-        shardB.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames2),
-            kit1.getRef());
-        kit1.expectMsgClass(ReadyTransactionReply.class);
-
-        shardB.tell(newReadyBatchedModifications(txId2, OUTER_LIST_PATH, EMPTY_OUTER_LIST,
-                participatingShardNames2), kit2.getRef());
-        kit2.expectMsgClass(ReadyTransactionReply.class);
-
-        shardB.tell(newReadyBatchedModifications(txId5, innerEntryPath(1, "one"),
-                ImmutableNodes.mapEntry(INNER_LIST_QNAME, NAME_QNAME, "one"), participatingShardNames1), kit5.getRef());
-        kit5.expectMsgClass(ReadyTransactionReply.class);
-
-        shardB.tell(newReadyBatchedModifications(txId4, innerMapPath(1), innerNode(),
-                participatingShardNames1), kit4.getRef());
-        kit4.expectMsgClass(ReadyTransactionReply.class);
-
-        shardB.tell(newReadyBatchedModifications(txId3, outerEntryPath(1), outerEntry(1), participatingShardNames1),
-            kit3.getRef());
-        kit3.expectMsgClass(ReadyTransactionReply.class);
-
-        // Send tx3 CanCommit to A - it's at the head of the queue so should proceed.
-
-        shardA.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
-        kit3.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Send tx1 CanCommit to B - it's at the head of the queue so should proceed.
-
-        shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Send tx3 CanCommit to B - tx1 is at the head of the queue but the preceding shards in tx3's participating
-        // shard list [A] matches that of tx5 so tx3 should be moved ahead of tx5 in the queue.
-
-        shardB.tell(new CanCommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
-        kit3.expectNoMessage(Duration.ofMillis(100));
-
-        // Send tx4 CanCommit to B - tx4's participating shard list [A] matches that of tx3 and tx5 - so tx4 should
-        // be moved ahead of tx5 in the queue but not tx3 since should be in the CAN_COMMIT_PENDING state.
-
-        shardB.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
-        kit4.expectNoMessage(Duration.ofMillis(100));
-
-        // Send tx5 CanCommit to B - it's position in the queue should remain the same.
-
-        shardB.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
-        kit5.expectNoMessage(Duration.ofMillis(100));
-
-        // Finish commit of tx1.
-
-        shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CommitTransactionReply.class);
-
-        // Finish commit of tx2.
-
-        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CanCommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CommitTransactionReply.class);
-
-        // Finish commit of tx3.
-
-        // From shard B
-        kit3.expectMsgClass(CanCommitTransactionReply.class);
-
-        shardA.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
-        kit3.expectMsgClass(CommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId3, CURRENT_VERSION).toSerializable(), kit3.getRef());
-        kit3.expectMsgClass(CommitTransactionReply.class);
-
-        // Finish commit of tx4.
-
-        // From shard B
-        kit4.expectMsgClass(CanCommitTransactionReply.class);
-
-        shardA.tell(new CanCommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
-        kit4.expectMsgClass(CanCommitTransactionReply.class);
-        shardA.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
-        kit4.expectMsgClass(CommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId4, CURRENT_VERSION).toSerializable(), kit4.getRef());
-        kit4.expectMsgClass(CommitTransactionReply.class);
-
-        // Finish commit of tx5.
-
-        // From shard B
-        kit5.expectMsgClass(CanCommitTransactionReply.class);
-
-        shardA.tell(new CanCommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
-        kit5.expectMsgClass(CanCommitTransactionReply.class);
-        shardA.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
-        kit5.expectMsgClass(CommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId5, CURRENT_VERSION).toSerializable(), kit5.getRef());
-        kit5.expectMsgClass(CommitTransactionReply.class);
-
-        verifyOuterListEntry(shardA, 1);
-        verifyInnerListEntry(shardB, 1, "one");
-
-        LOG.info("{} ending", testName);
-    }
-
-    /**
-     * Test 2 tx's accessing 2 shards, the second in common.
-     * <pre>
-     *   tx1 -> shard A, shard C
-     *   tx2 -> shard B, shard C
-     * </pre>
-     * The tx's are readied such the pendingTransactions queue are as follows:
-     * <pre>
-     *   Queue for shard A -> tx1
-     *   Queue for shard B -> tx2
-     *   Queue for shard C -> tx2, tx1
-     * </pre>
-     * When the tx's re committed verify the ready order is preserved.
-     */
-    @Test
-    public void testTwoTransactionsWithOneCommonParticipatingShard1() {
-        final String testName = "testTwoTransactionsWithOneCommonParticipatingShard1";
-        LOG.info("{} starting", testName);
-
-        final TestKit kit1 = new TestKit(getSystem());
-        final TestKit kit2 = new TestKit(getSystem());
-
-        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
-        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
-        final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
-
-        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
-                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardA);
-
-        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
-                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardB);
-
-        final TestActorRef<Shard> shardC = actorFactory.createTestActor(
-                newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardC);
-
-        final TransactionIdentifier txId1 = nextTransactionId();
-        final TransactionIdentifier txId2 = nextTransactionId();
-
-        SortedSet<String> participatingShardNames1 =
-                ImmutableSortedSet.of(shardAId.getShardName(), shardCId.getShardName());
-        SortedSet<String> participatingShardNames2 =
-                ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
-
-        // Ready [tx1] on shard A.
-
-        shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames1),
-            kit1.getRef());
-        kit1.expectMsgClass(ReadyTransactionReply.class);
-
-        // Ready [tx2] on shard B.
-
-        shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
-            kit2.getRef());
-        kit2.expectMsgClass(ReadyTransactionReply.class);
-
-        // Ready [tx2, tx1] on shard C.
-
-        shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
-            kit2.getRef());
-        kit2.expectMsgClass(ReadyTransactionReply.class);
-
-        shardC.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
-                participatingShardNames1), kit1.getRef());
-        kit1.expectMsgClass(ReadyTransactionReply.class);
-
-        // Send tx1 CanCommit to A - should succeed.
-
-        shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Send tx2 CanCommit to B - should succeed.
-
-        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Send tx1 CanCommit to C - tx2 is at the head of the queue but the preceding shards in tx1's participating
-        // shard list [A] do not match that of tx2 [B] so tx1 should not be allowed to proceed.
-
-        shardC.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectNoMessage(Duration.ofMillis(100));
-
-        // Send tx2 CanCommit to C - it's at the head of the queue so should proceed.
-
-        shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Finish commit of tx2.
-
-        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CommitTransactionReply.class);
-
-        shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CommitTransactionReply.class);
-
-        // Finish commit of tx1.
-
-        kit1.expectMsgClass(CanCommitTransactionReply.class);
-        shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CommitTransactionReply.class);
-
-        shardC.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CommitTransactionReply.class);
-
-        // Verify data in the data store.
-
-        verifyOuterListEntry(shardC, 1);
-
-        LOG.info("{} ending", testName);
-    }
-
-    /**
-     * Test 2 tx's accessing 2 shards, the first for one and the second for the other in common.
-     * <pre>
-     *   tx1 -> shard A, shard B
-     *   tx2 -> shard B, shard C
-     * </pre>
-     * The tx's are readied such the pendingTransactions queue are as follows:
-     * <pre>
-     *   Queue for shard A -> tx1
-     *   Queue for shard B -> tx2, tx1
-     *   Queue for shard C -> tx2
-     * </pre>
-     * When the tx's re committed verify the ready order is preserved.
-     */
-    @Test
-    public void testTwoTransactionsWithOneCommonParticipatingShard2() {
-        final String testName = "testTwoTransactionsWithOneCommonParticipatingShard2";
-        LOG.info("{} starting", testName);
-
-        final TestKit kit1 = new TestKit(getSystem());
-        final TestKit kit2 = new TestKit(getSystem());
-
-        final ShardIdentifier shardAId = ShardIdentifier.create("shardA", MemberName.forName(testName), "config");
-        final ShardIdentifier shardBId = ShardIdentifier.create("shardB", MemberName.forName(testName), "config");
-        final ShardIdentifier shardCId = ShardIdentifier.create("shardC", MemberName.forName(testName), "config");
-
-        final TestActorRef<Shard> shardA = actorFactory.createTestActor(
-                newShardBuilder().id(shardAId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardA);
-
-        final TestActorRef<Shard> shardB = actorFactory.createTestActor(
-                newShardBuilder().id(shardBId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardB);
-
-        final TestActorRef<Shard> shardC = actorFactory.createTestActor(
-                newShardBuilder().id(shardCId).props().withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shardC);
-
-        final TransactionIdentifier txId1 = nextTransactionId();
-        final TransactionIdentifier txId2 = nextTransactionId();
-
-        SortedSet<String> participatingShardNames1 =
-                ImmutableSortedSet.of(shardAId.getShardName(), shardBId.getShardName());
-        SortedSet<String> participatingShardNames2 =
-                ImmutableSortedSet.of(shardBId.getShardName(), shardCId.getShardName());
-
-        // Ready [tx1] on shard A.
-
-        shardA.tell(newReadyBatchedModifications(txId1, TEST_PATH, EMPTY_TEST, participatingShardNames1),
-            kit1.getRef());
-        kit1.expectMsgClass(ReadyTransactionReply.class);
-
-        // Ready [tx2, tx1] on shard B.
-
-        shardB.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
-            kit2.getRef());
-        kit2.expectMsgClass(ReadyTransactionReply.class);
-
-        shardB.tell(newReadyBatchedModifications(txId1, OUTER_LIST_PATH, outerNode(1),
-                participatingShardNames1), kit1.getRef());
-        kit1.expectMsgClass(ReadyTransactionReply.class);
-
-        // Ready [tx2] on shard C.
-
-        shardC.tell(newReadyBatchedModifications(txId2, TEST_PATH, EMPTY_TEST, participatingShardNames2),
-            kit2.getRef());
-        kit2.expectMsgClass(ReadyTransactionReply.class);
-
-        // Send tx1 CanCommit to A - should succeed.
-
-        shardA.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Send tx1 CanCommit to B - tx2 is at the head of the queue but the preceding shards in tx1's participating
-        // shard list [A] do not match that of tx2 [] so tx1 should not be allowed to proceed.
-
-        shardB.tell(new CanCommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectNoMessage(Duration.ofMillis(100));
-
-        // Send tx2 CanCommit to B - it's at the head of the queue so should proceed.
-
-        shardB.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CanCommitTransactionReply.class);
-
-        // Finish commit of tx2.
-
-        shardC.tell(new CanCommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CanCommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CommitTransactionReply.class);
-
-        shardC.tell(new CommitTransaction(txId2, CURRENT_VERSION).toSerializable(), kit2.getRef());
-        kit2.expectMsgClass(CommitTransactionReply.class);
-
-        // Finish commit of tx1.
-
-        kit1.expectMsgClass(CanCommitTransactionReply.class);
-        shardA.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CommitTransactionReply.class);
-
-        shardB.tell(new CommitTransaction(txId1, CURRENT_VERSION).toSerializable(), kit1.getRef());
-        kit1.expectMsgClass(CommitTransactionReply.class);
-
-        // Verify data in the data store.
-
-        verifyOuterListEntry(shardB, 1);
-
-        LOG.info("{} ending", testName);
-    }
-
-    static void verifyInnerListEntry(final TestActorRef<Shard> shard, final int outerID, final String innerID) {
-        final YangInstanceIdentifier path = innerEntryPath(outerID, innerID);
-        final NormalizedNode innerListEntry = readStore(shard, path);
-        assertNotNull(path + " not found", innerListEntry);
-    }
-
-    private static BatchedModifications newReadyBatchedModifications(final TransactionIdentifier transactionID,
-            final YangInstanceIdentifier path, final NormalizedNode data,
-            final SortedSet<String> participatingShardNames) {
-        final BatchedModifications batched = new BatchedModifications(transactionID, CURRENT_VERSION);
-        batched.addModification(new WriteModification(path, data));
-        batched.setReady(Optional.of(participatingShardNames));
-        batched.setTotalMessagesSent(1);
-        return batched;
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
deleted file mode 100644 (file)
index ca237e7..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-
-import java.util.concurrent.TimeUnit;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Props;
-import org.apache.pekko.testkit.TestActorRef;
-import org.junit.Before;
-import org.junit.Test;
-import org.opendaylight.controller.cluster.access.concepts.MemberName;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.tree.api.TreeType;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-/**
- * Covers negative test cases.
- *
- * @author Basheeruddin Ahmed
- */
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardTransactionFailureTest extends AbstractActorTest {
-    private static final EffectiveModelContext TEST_SCHEMA_CONTEXT = TestModel.createTestContext();
-    private static final TransactionType RO = TransactionType.READ_ONLY;
-    private static final TransactionType RW = TransactionType.READ_WRITE;
-
-    private static final Shard MOCK_SHARD = mock(Shard.class);
-
-    private static final ShardDataTree STORE = new ShardDataTree(MOCK_SHARD, TEST_SCHEMA_CONTEXT, TreeType.OPERATIONAL);
-
-    private static final ShardIdentifier SHARD_IDENTIFIER =
-        ShardIdentifier.create("inventory", MemberName.forName("member-1"), "operational");
-
-    private final DatastoreContext datastoreContext = DatastoreContext.newBuilder().build();
-
-    private final ShardStats shardStats = new DefaultShardStatsMXBean(SHARD_IDENTIFIER.toString(), "DataStore", null)
-        .shardStats();
-
-    private ActorRef createShard() {
-        ActorRef shard = getSystem().actorOf(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
-                .schemaContextProvider(() -> TEST_SCHEMA_CONTEXT).props());
-        ShardTestKit.waitUntilLeader(shard);
-        return shard;
-    }
-
-    @Before
-    public void setup() {
-        doReturn(new ShardStats()).when(MOCK_SHARD).shardStats();
-    }
-
-    @Test(expected = ReadFailedException.class)
-    public void testNegativeReadWithReadOnlyTransactionClosed() throws Exception {
-
-        final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(RO, STORE.newReadOnlyTransaction(nextTransactionId()), shard,
-                datastoreContext, shardStats);
-
-        final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
-                "testNegativeReadWithReadOnlyTransactionClosed");
-
-        Future<Object> future = org.apache.pekko.pattern.Patterns.ask(subject,
-                new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
-        Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-
-        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
-
-        future = org.apache.pekko.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.of(),
-                DataStoreVersions.CURRENT_VERSION), 3000);
-        Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-    }
-
-
-    @Test(expected = ReadFailedException.class)
-    public void testNegativeReadWithReadWriteTransactionClosed() throws Exception {
-
-        final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(RW, STORE.newReadWriteTransaction(nextTransactionId()), shard,
-                datastoreContext, shardStats);
-
-        final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
-                "testNegativeReadWithReadWriteTransactionClosed");
-
-        Future<Object> future = org.apache.pekko.pattern.Patterns.ask(subject,
-                new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
-        Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-
-        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
-
-        future = org.apache.pekko.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.of(),
-                DataStoreVersions.CURRENT_VERSION), 3000);
-        Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-    }
-
-    @Test(expected = ReadFailedException.class)
-    public void testNegativeExistsWithReadWriteTransactionClosed() throws Exception {
-
-        final ActorRef shard = createShard();
-        final Props props = ShardTransaction.props(RW, STORE.newReadWriteTransaction(nextTransactionId()), shard,
-                datastoreContext, shardStats);
-
-        final TestActorRef<ShardTransaction> subject = TestActorRef.create(getSystem(), props,
-                "testNegativeExistsWithReadWriteTransactionClosed");
-
-        Future<Object> future = org.apache.pekko.pattern.Patterns.ask(subject,
-                new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
-        Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-
-        subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
-
-        future = org.apache.pekko.pattern.Patterns.ask(subject,
-                new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION), 3000);
-        Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS));
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
deleted file mode 100644 (file)
index 3660c17..0000000
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Copyright (c) 2014, 2015 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.datastore;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-
-import com.google.common.base.Throwables;
-import java.time.Duration;
-import java.util.concurrent.TimeUnit;
-import org.apache.pekko.actor.ActorRef;
-import org.apache.pekko.actor.Props;
-import org.apache.pekko.actor.Status.Failure;
-import org.apache.pekko.actor.Terminated;
-import org.apache.pekko.dispatch.Dispatchers;
-import org.apache.pekko.testkit.TestActorRef;
-import org.apache.pekko.testkit.javadsl.TestKit;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.InOrder;
-import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DataExists;
-import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadData;
-import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
-import org.opendaylight.controller.cluster.datastore.modification.DeleteModification;
-import org.opendaylight.controller.cluster.datastore.modification.MergeModification;
-import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
-import org.opendaylight.controller.cluster.raft.TestActorFactory;
-import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier.NodeIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.ContainerNode;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.spi.node.ImmutableNodes;
-import org.opendaylight.yangtools.yang.data.tree.api.DataTreeModification;
-import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
-
-@Deprecated(since = "9.0.0", forRemoval = true)
-public class ShardTransactionTest extends AbstractActorTest {
-
-    private static final TransactionType RO = TransactionType.READ_ONLY;
-    private static final TransactionType RW = TransactionType.READ_WRITE;
-    private static final TransactionType WO = TransactionType.WRITE_ONLY;
-
-    private static final ShardIdentifier SHARD_IDENTIFIER =
-        ShardIdentifier.create("inventory", MEMBER_NAME, "config");
-    private static final EffectiveModelContext TEST_MODEL = TestModel.createTestContext();
-
-    private DatastoreContext datastoreContext = DatastoreContext.newBuilder().persistent(false).build();
-
-    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
-
-    private TestActorRef<Shard> shard;
-    private ShardDataTree store;
-    private TestKit testKit;
-
-    @Before
-    public void setUp() {
-        shard = actorFactory.createTestActor(Shard.builder().id(SHARD_IDENTIFIER).datastoreContext(datastoreContext)
-                .schemaContextProvider(() -> TEST_MODEL).props()
-                .withDispatcher(Dispatchers.DefaultDispatcherId()));
-        ShardTestKit.waitUntilLeader(shard);
-        store = shard.underlyingActor().getDataStore();
-        testKit = new TestKit(getSystem());
-    }
-
-    private ActorRef newTransactionActor(final TransactionType type,
-            final AbstractShardDataTreeTransaction<?> transaction, final String name) {
-        Props props = ShardTransaction.props(type, transaction, shard, datastoreContext,
-                shard.underlyingActor().shardStats());
-        return actorFactory.createActorNoVerify(props, name);
-    }
-
-    private ReadOnlyShardDataTreeTransaction readOnlyTransaction() {
-        return store.newReadOnlyTransaction(nextTransactionId());
-    }
-
-    private ReadWriteShardDataTreeTransaction readWriteTransaction() {
-        return store.newReadWriteTransaction(nextTransactionId());
-    }
-
-    @Test
-    public void testOnReceiveReadData() {
-        testOnReceiveReadData(newTransactionActor(RO, readOnlyTransaction(), "testReadDataRO"));
-        testOnReceiveReadData(newTransactionActor(RW, readWriteTransaction(), "testReadDataRW"));
-    }
-
-    private void testOnReceiveReadData(final ActorRef transaction) {
-        transaction.tell(new ReadData(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
-            testKit.getRef());
-
-        ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
-
-        assertNotNull(reply.getNormalizedNode());
-    }
-
-    @Test
-    public void testOnReceiveReadDataWhenDataNotFound() {
-        testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RO, readOnlyTransaction(),
-            "testReadDataWhenDataNotFoundRO"));
-        testOnReceiveReadDataWhenDataNotFound(newTransactionActor(RW, readWriteTransaction(),
-            "testReadDataWhenDataNotFoundRW"));
-    }
-
-    private void testOnReceiveReadDataWhenDataNotFound(final ActorRef transaction) {
-        transaction.tell(new ReadData(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
-
-        ReadDataReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), ReadDataReply.class);
-
-        assertNull(reply.getNormalizedNode());
-    }
-
-    @Test
-    public void testOnReceiveDataExistsPositive() {
-        testOnReceiveDataExistsPositive(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsPositiveRO"));
-        testOnReceiveDataExistsPositive(newTransactionActor(RW, readWriteTransaction(), "testDataExistsPositiveRW"));
-    }
-
-    private void testOnReceiveDataExistsPositive(final ActorRef transaction) {
-        transaction.tell(new DataExists(YangInstanceIdentifier.of(), DataStoreVersions.CURRENT_VERSION),
-            testKit.getRef());
-
-        DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
-
-        assertTrue(reply.exists());
-    }
-
-    @Test
-    public void testOnReceiveDataExistsNegative() {
-        testOnReceiveDataExistsNegative(newTransactionActor(RO, readOnlyTransaction(), "testDataExistsNegativeRO"));
-        testOnReceiveDataExistsNegative(newTransactionActor(RW, readWriteTransaction(), "testDataExistsNegativeRW"));
-    }
-
-    private void testOnReceiveDataExistsNegative(final ActorRef transaction) {
-        transaction.tell(new DataExists(TestModel.TEST_PATH, DataStoreVersions.CURRENT_VERSION), testKit.getRef());
-
-        DataExistsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5), DataExistsReply.class);
-
-        assertFalse(reply.exists());
-    }
-
-    @Test
-    public void testOnReceiveBatchedModifications() {
-        ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
-        DataTreeModification mockModification = mock(DataTreeModification.class);
-        ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
-            nextTransactionId(), mockModification);
-        final ActorRef transaction = newTransactionActor(RW, mockWriteTx, "testOnReceiveBatchedModifications");
-
-        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-        NormalizedNode writeData = ImmutableNodes.newContainerBuilder()
-            .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
-            .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
-            .build();
-
-        YangInstanceIdentifier mergePath = TestModel.OUTER_LIST_PATH;
-        NormalizedNode mergeData = TestModel.EMPTY_OUTER_LIST;
-
-        YangInstanceIdentifier deletePath = TestModel.TEST_PATH;
-
-        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-            DataStoreVersions.CURRENT_VERSION);
-        batched.addModification(new WriteModification(writePath, writeData));
-        batched.addModification(new MergeModification(mergePath, mergeData));
-        batched.addModification(new DeleteModification(deletePath));
-
-        transaction.tell(batched, testKit.getRef());
-
-        BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
-            BatchedModificationsReply.class);
-        assertEquals("getNumBatched", 3, reply.getNumBatched());
-
-        InOrder inOrder = inOrder(mockModification);
-        inOrder.verify(mockModification).write(writePath, writeData);
-        inOrder.verify(mockModification).merge(mergePath, mergeData);
-        inOrder.verify(mockModification).delete(deletePath);
-    }
-
-    @Test
-    public void testOnReceiveBatchedModificationsReadyWithoutImmediateCommit() {
-        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                "testOnReceiveBatchedModificationsReadyWithoutImmediateCommit");
-
-        TestKit watcher = new TestKit(getSystem());
-        watcher.watch(transaction);
-
-        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-        NormalizedNode writeData = ImmutableNodes.newContainerBuilder()
-            .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
-            .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
-            .build();
-
-        final TransactionIdentifier tx1 = nextTransactionId();
-        BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-        batched.addModification(new WriteModification(writePath, writeData));
-
-        transaction.tell(batched, testKit.getRef());
-        BatchedModificationsReply reply = testKit.expectMsgClass(Duration.ofSeconds(5),
-            BatchedModificationsReply.class);
-        assertEquals("getNumBatched", 1, reply.getNumBatched());
-
-        batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-        batched.setReady();
-        batched.setTotalMessagesSent(2);
-
-        transaction.tell(batched, testKit.getRef());
-        testKit.expectMsgClass(Duration.ofSeconds(5), ReadyTransactionReply.class);
-        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
-    }
-
-    @Test
-    public void testOnReceiveBatchedModificationsReadyWithImmediateCommit() {
-        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                "testOnReceiveBatchedModificationsReadyWithImmediateCommit");
-
-        TestKit watcher = new TestKit(getSystem());
-        watcher.watch(transaction);
-
-        YangInstanceIdentifier writePath = TestModel.TEST_PATH;
-        NormalizedNode writeData = ImmutableNodes.newContainerBuilder()
-            .withNodeIdentifier(new NodeIdentifier(TestModel.TEST_QNAME))
-            .withChild(ImmutableNodes.leafNode(TestModel.DESC_QNAME, "foo"))
-            .build();
-
-        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-            DataStoreVersions.CURRENT_VERSION);
-        batched.addModification(new WriteModification(writePath, writeData));
-        batched.setReady();
-        batched.setDoCommitOnReady(true);
-        batched.setTotalMessagesSent(1);
-
-        transaction.tell(batched, testKit.getRef());
-        testKit.expectMsgClass(Duration.ofSeconds(5), CommitTransactionReply.class);
-        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
-    }
-
-    @Test(expected = TestException.class)
-    public void testOnReceiveBatchedModificationsFailure() throws Exception {
-        ShardDataTreeTransactionParent parent = mock(ShardDataTreeTransactionParent.class);
-        DataTreeModification mockModification = mock(DataTreeModification.class);
-        ReadWriteShardDataTreeTransaction mockWriteTx = new ReadWriteShardDataTreeTransaction(parent,
-            nextTransactionId(), mockModification);
-        final ActorRef transaction = newTransactionActor(RW, mockWriteTx,
-            "testOnReceiveBatchedModificationsFailure");
-
-        TestKit watcher = new TestKit(getSystem());
-        watcher.watch(transaction);
-
-        YangInstanceIdentifier path = TestModel.TEST_PATH;
-        ContainerNode node = TestModel.EMPTY_TEST;
-
-        doThrow(new TestException()).when(mockModification).write(path, node);
-
-        final TransactionIdentifier tx1 = nextTransactionId();
-        BatchedModifications batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-        batched.addModification(new WriteModification(path, node));
-
-        transaction.tell(batched, testKit.getRef());
-        testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
-
-        batched = new BatchedModifications(tx1, DataStoreVersions.CURRENT_VERSION);
-        batched.setReady();
-        batched.setTotalMessagesSent(2);
-
-        transaction.tell(batched, testKit.getRef());
-        Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
-        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
-
-        if (failure != null) {
-            Throwables.propagateIfPossible(failure.cause(), Exception.class);
-            throw new RuntimeException(failure.cause());
-        }
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount() throws Exception {
-        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                "testOnReceiveBatchedModificationsReadyWithIncorrectTotalMessageCount");
-
-        TestKit watcher = new TestKit(getSystem());
-        watcher.watch(transaction);
-
-        BatchedModifications batched = new BatchedModifications(nextTransactionId(),
-            DataStoreVersions.CURRENT_VERSION);
-        batched.setReady();
-        batched.setTotalMessagesSent(2);
-
-        transaction.tell(batched, testKit.getRef());
-
-        Failure failure = testKit.expectMsgClass(Duration.ofSeconds(5), Failure.class);
-        watcher.expectMsgClass(Duration.ofSeconds(5), Terminated.class);
-
-        if (failure != null) {
-            Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
-            Throwables.throwIfUnchecked(failure.cause());
-            throw new RuntimeException(failure.cause());
-        }
-    }
-
-    @Test
-    public void testReadWriteTxOnReceiveCloseTransaction() {
-        final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
-                "testReadWriteTxOnReceiveCloseTransaction");
-
-        testKit.watch(transaction);
-
-        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
-
-        testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
-        testKit.expectTerminated(Duration.ofSeconds(3), transaction);
-    }
-
-    @Test
-    public void testWriteOnlyTxOnReceiveCloseTransaction() {
-        final ActorRef transaction = newTransactionActor(WO, readWriteTransaction(),
-                "testWriteTxOnReceiveCloseTransaction");
-
-        testKit.watch(transaction);
-
-        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
-
-        testKit.expectMsgClass(Duration.ofSeconds(3), CloseTransactionReply.class);
-        testKit.expectTerminated(Duration.ofSeconds(3), transaction);
-    }
-
-    @Test
-    public void testReadOnlyTxOnReceiveCloseTransaction() {
-        final ActorRef transaction = newTransactionActor(TransactionType.READ_ONLY, readOnlyTransaction(),
-                "testReadOnlyTxOnReceiveCloseTransaction");
-
-        testKit.watch(transaction);
-
-        transaction.tell(new CloseTransaction().toSerializable(), testKit.getRef());
-
-        testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
-    }
-
-    @Test
-    public void testShardTransactionInactivity() {
-        datastoreContext = DatastoreContext.newBuilder().shardTransactionIdleTimeout(
-                500, TimeUnit.MILLISECONDS).build();
-
-        final ActorRef transaction = newTransactionActor(RW, readWriteTransaction(),
-            "testShardTransactionInactivity");
-
-        testKit.watch(transaction);
-
-        testKit.expectMsgClass(Duration.ofSeconds(3), Terminated.class);
-    }
-
-    public static class TestException extends RuntimeException {
-        private static final long serialVersionUID = 1L;
-    }
-}