X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShardCommitCoordinator.java;h=d9520c5d5c6596ab8c9a9a7a626d3b1ebe657c71;hp=33634b1d6c3e322642e9b458e05d59feb74a577e;hb=HEAD;hpb=5fd8e6506248cc34da72281a1662612f6c2b2f9a diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java index 33634b1d6c..946203b6b7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardCommitCoordinator.java @@ -7,23 +7,23 @@ */ package org.opendaylight.controller.cluster.datastore; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; import akka.actor.Status.Failure; import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.primitives.UnsignedLong; import com.google.common.util.concurrent.FutureCallback; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Deque; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.Map; -import javax.annotation.Nonnull; +import org.eclipse.jdt.annotation.NonNull; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications; @@ -38,7 +38,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage; import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor; import org.opendaylight.yangtools.concepts.Identifier; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate; +import org.opendaylight.yangtools.yang.common.Empty; +import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate; import org.slf4j.Logger; /** @@ -46,6 +47,7 @@ import org.slf4j.Logger; * * @author Thomas Pantelis */ +@Deprecated(since = "9.0.0", forRemoval = true) final class ShardCommitCoordinator { // Interface hook for unit tests to replace or decorate the ShardDataTreeCohorts. @@ -71,7 +73,7 @@ final class ShardCommitCoordinator { ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) { this.log = log; this.name = name; - this.dataTree = Preconditions.checkNotNull(dataTree); + this.dataTree = requireNonNull(dataTree); } int getCohortCacheSize() { @@ -103,7 +105,7 @@ final class ShardCommitCoordinator { log.debug("{}: Readying transaction {}, client version {}", name, ready.getTransactionId(), ready.getTxnClientVersion()); - final ShardDataTreeCohort cohort = ready.getTransaction().ready(); + final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -128,9 +130,10 @@ final class ShardCommitCoordinator { * @param batched the BatchedModifications message to process * @param sender the sender of the message */ + @SuppressFBWarnings(value = "THROWS_METHOD_THROWS_RUNTIMEEXCEPTION", justification = "Replay of captured failure") void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) { CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId()); - if (cohortEntry == null) { + if (cohortEntry == null || cohortEntry.isSealed()) { cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()), batched.getVersion()); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); @@ -157,12 +160,12 @@ final class ShardCommitCoordinator { } if (log.isDebugEnabled()) { - log.debug("{}: Readying Tx {}, client version {}", name, - batched.getTransactionId(), batched.getVersion()); + log.debug("{}: Readying Tx {} of {} operations, client version {}", name, + batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion()); } cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady()); - cohortEntry.ready(cohortDecorator); + cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator); if (batched.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -186,13 +189,14 @@ final class ShardCommitCoordinator { * @param shard the transaction's shard actor */ void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) { - final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(), - message.getModification()); + final TransactionIdentifier txId = message.getTransactionId(); + final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(), + message.getParticipatingShardNames()); final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION); cohortCache.put(cohortEntry.getTransactionId(), cohortEntry); cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady()); - log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId()); + log.debug("{}: Applying local modifications for Tx {}", name, txId); if (message.isDoCommitOnReady()) { cohortEntry.setReplySender(sender); @@ -203,6 +207,7 @@ final class ShardCommitCoordinator { } } + @Deprecated(since = "9.0.0", forRemoval = true) Collection createForwardedBatchedModifications(final BatchedModifications from, final int maxModificationsPerBatch) { CohortEntry cohortEntry = cohortCache.remove(from.getTransactionId()); @@ -227,15 +232,17 @@ final class ShardCommitCoordinator { BatchedModifications last = newModifications.getLast(); last.setDoCommitOnReady(from.isDoCommitOnReady()); - last.setReady(from.isReady()); + if (from.isReady()) { + last.setReady(from.getParticipatingShardNames()); + } last.setTotalMessagesSent(newModifications.size()); return newModifications; } private void handleCanCommit(final CohortEntry cohortEntry) { - cohortEntry.canCommit(new FutureCallback() { + cohortEntry.canCommit(new FutureCallback<>() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final Empty result) { log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId()); if (cohortEntry.isDoImmediateCommit()) { @@ -249,8 +256,8 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.debug("{}: An exception occurred during canCommit for {}: {}", name, - cohortEntry.getTransactionId(), failure); + log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(), + failure); cohortCache.remove(cohortEntry.getTransactionId()); cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self()); @@ -274,7 +281,7 @@ final class ShardCommitCoordinator { // between canCommit and ready and the entry was expired from the cache or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -285,7 +292,7 @@ final class ShardCommitCoordinator { handleCanCommit(cohortEntry); } - private void doCommit(final CohortEntry cohortEntry) { + void doCommit(final CohortEntry cohortEntry) { log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId()); // We perform the preCommit phase here atomically with the commit phase. This is an @@ -309,7 +316,7 @@ final class ShardCommitCoordinator { }); } - private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) { + void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) { log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId()); cohortEntry.commit(new FutureCallback() { @@ -326,8 +333,8 @@ final class ShardCommitCoordinator { @Override public void onFailure(final Throwable failure) { - log.error("{}, An exception occurred while committing transaction {}", persistenceId(), - cohortEntry.getTransactionId(), failure); + final TransactionIdentifier txId = cohortEntry.getTransactionId(); + log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure); cohortCache.remove(cohortEntry.getTransactionId()); sender.tell(new Failure(failure), cohortEntry.getShard().self()); @@ -349,7 +356,7 @@ final class ShardCommitCoordinator { // or it was aborted. IllegalStateException ex = new IllegalStateException( String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID)); - log.error(ex.getMessage()); + log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex); sender.tell(new Failure(ex), shard.self()); return; } @@ -368,9 +375,9 @@ final class ShardCommitCoordinator { log.debug("{}: Aborting transaction {}", name, transactionID); final ActorRef self = shard.getSelf(); - cohortEntry.abort(new FutureCallback() { + cohortEntry.abort(new FutureCallback<>() { @Override - public void onSuccess(final Void result) { + public void onSuccess(final Empty result) { if (sender != null) { sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self); } @@ -390,29 +397,22 @@ final class ShardCommitCoordinator { } void checkForExpiredTransactions(final long timeout, final Shard shard) { - Iterator iter = cohortCache.values().iterator(); - while (iter.hasNext()) { - CohortEntry cohortEntry = iter.next(); - if (cohortEntry.isFailed()) { - iter.remove(); - } - } + cohortCache.values().removeIf(CohortEntry::isFailed); } void abortPendingTransactions(final String reason, final Shard shard) { - final Failure failure = new Failure(new RuntimeException(reason)); - Collection pending = dataTree.getAndClearPendingTransactions(); + final var failure = new Failure(new RuntimeException(reason)); + final var pending = dataTree.getAndClearPendingTransactions(); log.debug("{}: Aborting {} pending queued transactions", name, pending.size()); - for (ShardDataTreeCohort cohort : pending) { - CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); - if (cohortEntry == null) { - continue; - } - - if (cohortEntry.getReplySender() != null) { - cohortEntry.getReplySender().tell(failure, shard.self()); + for (var cohort : pending) { + final var cohortEntry = cohortCache.remove(cohort.transactionId()); + if (cohortEntry != null) { + final var replySender = cohortEntry.getReplySender(); + if (replySender != null) { + replySender.tell(failure, shard.self()); + } } } @@ -420,36 +420,35 @@ final class ShardCommitCoordinator { } Collection convertPendingTransactionsToMessages(final int maxModificationsPerBatch) { - final Collection messages = new ArrayList<>(); - for (ShardDataTreeCohort cohort : dataTree.getAndClearPendingTransactions()) { - CohortEntry cohortEntry = cohortCache.remove(cohort.getIdentifier()); + final var messages = new ArrayList(); + for (var cohort : dataTree.getAndClearPendingTransactions()) { + final var cohortEntry = cohortCache.remove(cohort.transactionId()); if (cohortEntry == null) { continue; } - final Deque newMessages = new ArrayDeque<>(); + final var newMessages = new ArrayDeque(); cohortEntry.getDataTreeModification().applyToCursor(new AbstractBatchedModificationsCursor() { @Override protected BatchedModifications getModifications() { - final BatchedModifications lastBatch = newMessages.peekLast(); - + final var lastBatch = newMessages.peekLast(); if (lastBatch != null && lastBatch.getModifications().size() >= maxModificationsPerBatch) { return lastBatch; } // Allocate a new message - final BatchedModifications ret = new BatchedModifications(cohortEntry.getTransactionId(), + final var ret = new BatchedModifications(cohortEntry.getTransactionId(), cohortEntry.getClientVersion()); newMessages.add(ret); return ret; } }); - final BatchedModifications last = newMessages.peekLast(); + final var last = newMessages.peekLast(); if (last != null) { final boolean immediate = cohortEntry.isDoImmediateCommit(); last.setDoCommitOnReady(immediate); - last.setReady(true); + last.setReady(cohortEntry.getParticipatingShardNames()); last.setTotalMessagesSent(newMessages.size()); messages.addAll(newMessages);