X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=d9520c5d5c6596ab8c9a9a7a626d3b1ebe657c71;hp=85e1345e0b03e5e765c0640010e59452485d563c;hb=HEAD;hpb=bb04f62f467cc6e6f0922ba8957b09715707940d diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 85e1345e0b..946203b6b7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -7,18 +7,19 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Deque; import java.util.HashMap; import java.util.LinkedList; import java.util.Map; @@ -37,7 +38,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage; import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.common.Empty; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; /** @@ -45,6 +47,7 @@ import org.slf4j.Logger; * * @author Thomas Pantelis */ +@Deprecated(since = "9.0.0", forRemoval = true) final class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts. @@ -70,7 +73,7 @@ final class ShardCommitCoordinator { ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) { this.log = log; this.name = name; - this.dataTree = Preconditions.checkNotNull(dataTree); + this.dataTree = requireNonNull(dataTree); } int getCohortCacheSize() { @@ -127,6 +130,7 @@ final class ShardCommitCoordinator { * @param batched the BatchedModifications message to process * @param sender the sender of the message */ + @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Replay of captured failure") void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId()); if (cohortEntry == null || cohortEntry.isSealed()) { @@ -156,8 +160,8 @@ final class ShardCommitCoordinator { } if (log.isDebugEnabled()) { - log.debug("{}: Readying Tx {}, client version {}", name, - batched.getTransactionId(), batched.getVersion()); + log.debug("{}: Readying Tx {} of {} operations, client version {}", name, + batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion()); } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); @@ -203,6 +207,7 @@ final class ShardCommitCoordinator { } } + @Deprecated(since = "9.0.0", forRemoval = true) Collection createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) { CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId()); @@ -235,9 +240,9 @@ final class ShardCommitCoordinator { } private void handleCanCommit(final CohortEntry cohortEntry) { - cohortEntry.canCommit(new FutureCallback() { + cohortEntry.canCommit(new FutureCallback<>() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final Empty result) { log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId()); if (cohortEntry.isDoImmediateCommit()) { @@ -320,7 +325,6 @@ final class ShardCommitCoordinator { final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result, sender); - cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), @@ -331,7 +335,6 @@ final class ShardCommitCoordinator { public void onFailure(final Throwable failure) { final TransactionIdentifier txId = cohortEntry.getTransactionId(); log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure); - cohortEntry.getShard().getDataStore().purgeTransaction(txId, null); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(new Failure(failure), cohortEntry.getShard().self()); @@ -372,11 +375,9 @@ final class ShardCommitCoordinator { log.debug("{}: Aborting transaction {}", name, transactionID); final ActorRef self = shard.getSelf(); - cohortEntry.abort(new FutureCallback() { + cohortEntry.abort(new FutureCallback<>() { @Override - public void onSuccess(final Void result) { - shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); - + public void onSuccess(final Empty result) { if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } @@ -385,7 +386,6 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { log.error("{}: An exception happened during abort", name, failure); - shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null); if (sender != null) { sender.tell(new Failure(failure), self); @@ -401,19 +401,18 @@ final class ShardCommitCoordinator { } void abortPendingTransactions(final String reason, final Shard shard) { - final Failure failure = new Failure(new RuntimeException(reason)); - Collection pending = dataTree.getAndClearPendingTransactions(); + final var failure = new Failure(new RuntimeException(reason)); + final var pending = dataTree.getAndClearPendingTransactions(); log.debug("{}: Aborting {} pending queued transactions", name, pending.size()); - for (ShardDataTreeCohort cohort : pending) { - CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); - if (cohortEntry == null) { - continue; - } - - if (cohortEntry.getReplySender() != null) { - cohortEntry.getReplySender().tell(failure, shard.self()); + for (var cohort : pending) { + final var cohortEntry = cohortCache.remove(cohort.transactionId()); + if (cohortEntry != null) { + final var replySender = cohortEntry.getReplySender(); + if (replySender != null) { + replySender.tell(failure, shard.self()); + } } } @@ -421,32 +420,31 @@ final class ShardCommitCoordinator { } Collection convertPendingTransactionsToMessages(final int maxModificationsPerBatch) { - final Collection messages = new ArrayList<>(); - for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) { - CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); + final var messages = new ArrayList(); + for (var cohort : dataTree.getAndClearPendingTransactions()) { + final var cohortEntry = cohortCache.remove(cohort.transactionId()); if (cohortEntry == null) { continue; } - final Deque newMessages = new ArrayDeque<>(); + final var newMessages = new ArrayDeque(); cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() { @Override protected BatchedModifications getModifications() { - final BatchedModifications lastBatch = newMessages.peekLast(); - + final var lastBatch = newMessages.peekLast(); if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) { return lastBatch; } // Allocate a new message - final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(), + final var ret = new BatchedModifications(cohortEntry.getTransactionId(), cohortEntry.getClientVersion()); newMessages.add(ret); return ret; } }); - final BatchedModifications last = newMessages.peekLast(); + final var last = newMessages.peekLast(); if (last != null) { final boolean immediate = cohortEntry.isDoImmediateCommit(); last.setDoCommitOnReady(immediate);