*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import javax.annotation.Nonnull;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
this.log = log;
this.name = name;
- this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.dataTree = requireNonNull(dataTree);
}
int getCohortCacheSize() {
log.debug("{}: Readying transaction {}, client version {}", name,
ready.getTransactionId(), ready.getTxnClientVersion());
- final ShardDataTreeCohort cohort = ready.getTransaction().ready();
+ final ShardDataTreeCohort cohort = ready.getTransaction().ready(ready.getParticipatingShardNames());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, ready.getTxnClientVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
- if (cohortEntry == null) {
+ if (cohortEntry == null || cohortEntry.isSealed()) {
cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
batched.getVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
}
if (log.isDebugEnabled()) {
- log.debug("{}: Readying Tx {}, client version {}", name,
- batched.getTransactionId(), batched.getVersion());
+ log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
+ batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
}
cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
- cohortEntry.ready(cohortDecorator);
+ cohortEntry.ready(batched.getParticipatingShardNames(), cohortDecorator);
if (batched.isDoCommitOnReady()) {
cohortEntry.setReplySender(sender);
* @param shard the transaction's shard actor
*/
void handleReadyLocalTransaction(final ReadyLocalTransaction message, final ActorRef sender, final Shard shard) {
- final ShardDataTreeCohort cohort = dataTree.createReadyCohort(message.getTransactionId(),
- message.getModification());
+ final TransactionIdentifier txId = message.getTransactionId();
+ final ShardDataTreeCohort cohort = dataTree.newReadyCohort(txId, message.getModification(),
+ message.getParticipatingShardNames());
final CohortEntry cohortEntry = CohortEntry.createReady(cohort, DataStoreVersions.CURRENT_VERSION);
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
cohortEntry.setDoImmediateCommit(message.isDoCommitOnReady());
- log.debug("{}: Applying local modifications for Tx {}", name, message.getTransactionId());
+ log.debug("{}: Applying local modifications for Tx {}", name, txId);
if (message.isDoCommitOnReady()) {
cohortEntry.setReplySender(sender);
BatchedModifications last = newModifications.getLast();
last.setDoCommitOnReady(from.isDoCommitOnReady());
- last.setReady(from.isReady());
+ if (from.isReady()) {
+ last.setReady(from.getParticipatingShardNames());
+ }
last.setTotalMessagesSent(newModifications.size());
return newModifications;
}
@Override
public void onFailure(final Throwable failure) {
- log.debug("{}: An exception occurred during canCommit for {}: {}", name,
- cohortEntry.getTransactionId(), failure);
+ log.debug("{}: An exception occurred during canCommit for {}", name, cohortEntry.getTransactionId(),
+ failure);
cohortCache.remove(cohortEntry.getTransactionId());
cohortEntry.getReplySender().tell(new Failure(failure), cohortEntry.getShard().self());
// between canCommit and ready and the entry was expired from the cache or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot canCommit transaction %s - no cohort entry found", name, transactionID));
- log.error(ex.getMessage());
+ log.error("{}: Inconsistency during transaction {} canCommit", name, transactionID, ex);
sender.tell(new Failure(ex), shard.self());
return;
}
handleCanCommit(cohortEntry);
}
- private void doCommit(final CohortEntry cohortEntry) {
+ void doCommit(final CohortEntry cohortEntry) {
log.debug("{}: Committing transaction {}", name, cohortEntry.getTransactionId());
// We perform the preCommit phase here atomically with the commit phase. This is an
});
}
- private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+ void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onFailure(final Throwable failure) {
- log.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- cohortEntry.getTransactionId(), failure);
+ final TransactionIdentifier txId = cohortEntry.getTransactionId();
+ log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(new Failure(failure), cohortEntry.getShard().self());
// or it was aborted.
IllegalStateException ex = new IllegalStateException(
String.format("%s: Cannot commit transaction %s - no cohort entry found", name, transactionID));
- log.error(ex.getMessage());
+ log.error("{}: Inconsistency during transaction {} commit", name, transactionID, ex);
sender.tell(new Failure(ex), shard.self());
return;
}
}
void checkForExpiredTransactions(final long timeout, final Shard shard) {
- Iterator<CohortEntry> iter = cohortCache.values().iterator();
- while (iter.hasNext()) {
- CohortEntry cohortEntry = iter.next();
- if (cohortEntry.isFailed()) {
- iter.remove();
- }
- }
+ cohortCache.values().removeIf(CohortEntry::isFailed);
}
void abortPendingTransactions(final String reason, final Shard shard) {
if (last != null) {
final boolean immediate = cohortEntry.isDoImmediateCommit();
last.setDoCommitOnReady(immediate);
- last.setReady(true);
+ last.setReady(cohortEntry.getParticipatingShardNames());
last.setTotalMessagesSent(newMessages.size());
messages.addAll(newMessages);