*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Status.Failure;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.primitives.UnsignedLong;
import com.google.common.util.concurrent.FutureCallback;
import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
-import javax.annotation.Nonnull;
+import org.eclipse.jdt.annotation.NonNull;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
import org.slf4j.Logger;
/**
ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
this.log = log;
this.name = name;
- this.dataTree = Preconditions.checkNotNull(dataTree);
+ this.dataTree = requireNonNull(dataTree);
}
int getCohortCacheSize() {
*/
void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
- if (cohortEntry == null) {
+ if (cohortEntry == null || cohortEntry.isSealed()) {
cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
batched.getVersion());
cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
}
if (log.isDebugEnabled()) {
- log.debug("{}: Readying Tx {}, client version {}", name,
- batched.getTransactionId(), batched.getVersion());
+ log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
+ batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
}
cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
}
private void handleCanCommit(final CohortEntry cohortEntry) {
- cohortEntry.canCommit(new FutureCallback<Void>() {
+ cohortEntry.canCommit(new FutureCallback<>() {
@Override
- public void onSuccess(final Void result) {
+ public void onSuccess(final Empty result) {
log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId());
if (cohortEntry.isDoImmediateCommit()) {
});
}
- void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+ void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
cohortEntry.commit(new FutureCallback<UnsignedLong>() {
final TransactionIdentifier txId = cohortEntry.getTransactionId();
log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
sender);
- cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
public void onFailure(final Throwable failure) {
final TransactionIdentifier txId = cohortEntry.getTransactionId();
log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
- cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
cohortCache.remove(cohortEntry.getTransactionId());
sender.tell(new Failure(failure), cohortEntry.getShard().self());
log.debug("{}: Aborting transaction {}", name, transactionID);
final ActorRef self = shard.getSelf();
- cohortEntry.abort(new FutureCallback<Void>() {
+ cohortEntry.abort(new FutureCallback<>() {
@Override
- public void onSuccess(final Void result) {
- shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
-
+ public void onSuccess(final Empty result) {
if (sender != null) {
sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
}
@Override
public void onFailure(final Throwable failure) {
log.error("{}: An exception happened during abort", name, failure);
- shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
if (sender != null) {
sender.tell(new Failure(failure), self);