Improve LocalProxyTransaction.doExists()
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 85e1345e0b03e5e765c0640010e59452485d563c..5177f7ed2bcbaee7d03b53a894c5279d17a3f9f2 100644 (file)
@@ -7,11 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import java.util.ArrayDeque;
@@ -37,7 +38,8 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionRe
 import org.opendaylight.controller.cluster.datastore.messages.VersionedExternalizableMessage;
 import org.opendaylight.controller.cluster.datastore.utils.AbstractBatchedModificationsCursor;
 import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
 import org.slf4j.Logger;
 
 /**
@@ -70,7 +72,7 @@ final class ShardCommitCoordinator {
     ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
         this.log = log;
         this.name = name;
-        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.dataTree = requireNonNull(dataTree);
     }
 
     int getCohortCacheSize() {
@@ -156,8 +158,8 @@ final class ShardCommitCoordinator {
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("{}: Readying Tx {}, client version {}", name,
-                        batched.getTransactionId(), batched.getVersion());
+                log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
+                        batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
@@ -235,9 +237,9 @@ final class ShardCommitCoordinator {
     }
 
     private void handleCanCommit(final CohortEntry cohortEntry) {
-        cohortEntry.canCommit(new FutureCallback<Void>() {
+        cohortEntry.canCommit(new FutureCallback<>() {
             @Override
-            public void onSuccess(final Void result) {
+            public void onSuccess(final Empty result) {
                 log.debug("{}: canCommit for {}: success", name, cohortEntry.getTransactionId());
 
                 if (cohortEntry.isDoImmediateCommit()) {
@@ -320,7 +322,6 @@ final class ShardCommitCoordinator {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
-                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@@ -331,7 +332,6 @@ final class ShardCommitCoordinator {
             public void onFailure(final Throwable failure) {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
-                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
@@ -372,11 +372,9 @@ final class ShardCommitCoordinator {
         log.debug("{}: Aborting transaction {}", name, transactionID);
 
         final ActorRef self = shard.getSelf();
-        cohortEntry.abort(new FutureCallback<Void>() {
+        cohortEntry.abort(new FutureCallback<>() {
             @Override
-            public void onSuccess(final Void result) {
-                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
-
+            public void onSuccess(final Empty result) {
                 if (sender != null) {
                     sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
                 }
@@ -385,7 +383,6 @@ final class ShardCommitCoordinator {
             @Override
             public void onFailure(final Throwable failure) {
                 log.error("{}: An exception happened during abort", name, failure);
-                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
 
                 if (sender != null) {
                     sender.tell(new Failure(failure), self);