Migrate DatastoreContextIntrospectorFactory to OSGi DS
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardCommitCoordinator.java
index 403a96819f392726d7b24ef715827e1187f29841..d9520c5d5c6596ab8c9a9a7a626d3b1ebe657c71 100644 (file)
@@ -7,11 +7,12 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static java.util.Objects.requireNonNull;
+
 import akka.actor.ActorRef;
 import akka.actor.Status.Failure;
 import akka.serialization.Serialization;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
 import java.util.ArrayDeque;
@@ -22,7 +23,7 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
-import javax.annotation.Nonnull;
+import org.eclipse.jdt.annotation.NonNull;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
@@ -70,7 +71,7 @@ final class ShardCommitCoordinator {
     ShardCommitCoordinator(final ShardDataTree dataTree, final Logger log, final String name) {
         this.log = log;
         this.name = name;
-        this.dataTree = Preconditions.checkNotNull(dataTree);
+        this.dataTree = requireNonNull(dataTree);
     }
 
     int getCohortCacheSize() {
@@ -129,7 +130,7 @@ final class ShardCommitCoordinator {
      */
     void handleBatchedModifications(final BatchedModifications batched, final ActorRef sender, final Shard shard) {
         CohortEntry cohortEntry = cohortCache.get(batched.getTransactionId());
-        if (cohortEntry == null) {
+        if (cohortEntry == null || cohortEntry.isSealed()) {
             cohortEntry = CohortEntry.createOpen(dataTree.newReadWriteTransaction(batched.getTransactionId()),
                 batched.getVersion());
             cohortCache.put(cohortEntry.getTransactionId(), cohortEntry);
@@ -156,8 +157,8 @@ final class ShardCommitCoordinator {
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("{}: Readying Tx {}, client version {}", name,
-                        batched.getTransactionId(), batched.getVersion());
+                log.debug("{}: Readying Tx {} of {} operations, client version {}", name,
+                        batched.getTransactionId(), cohortEntry.getTotalOperationsProcessed(), batched.getVersion());
             }
 
             cohortEntry.setDoImmediateCommit(batched.isDoCommitOnReady());
@@ -311,7 +312,7 @@ final class ShardCommitCoordinator {
         });
     }
 
-    void finishCommit(@Nonnull final ActorRef sender, @Nonnull final CohortEntry cohortEntry) {
+    void finishCommit(final @NonNull ActorRef sender, final @NonNull CohortEntry cohortEntry) {
         log.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionId());
 
         cohortEntry.commit(new FutureCallback<UnsignedLong>() {
@@ -320,7 +321,6 @@ final class ShardCommitCoordinator {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.debug("{}: Transaction {} committed as {}, sending response to {}", persistenceId(), txId, result,
                     sender);
-                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(CommitTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(),
@@ -331,7 +331,6 @@ final class ShardCommitCoordinator {
             public void onFailure(final Throwable failure) {
                 final TransactionIdentifier txId = cohortEntry.getTransactionId();
                 log.error("{}, An exception occurred while committing transaction {}", persistenceId(), txId, failure);
-                cohortEntry.getShard().getDataStore().purgeTransaction(txId, null);
 
                 cohortCache.remove(cohortEntry.getTransactionId());
                 sender.tell(new Failure(failure), cohortEntry.getShard().self());
@@ -375,8 +374,6 @@ final class ShardCommitCoordinator {
         cohortEntry.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
-
                 if (sender != null) {
                     sender.tell(AbortTransactionReply.instance(cohortEntry.getClientVersion()).toSerializable(), self);
                 }
@@ -385,7 +382,6 @@ final class ShardCommitCoordinator {
             @Override
             public void onFailure(final Throwable failure) {
                 log.error("{}: An exception happened during abort", name, failure);
-                shard.getDataStore().purgeTransaction(cohortEntry.getTransactionId(), null);
 
                 if (sender != null) {
                     sender.tell(new Failure(failure), self);