Add transaction debugs
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index c37703c3359b503bf1e65cab8e46d49fdd9b5353..ba831c6c1d1f3104e478f2d380ec70d9054157f5 100644 (file)
@@ -80,10 +80,11 @@ import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeTip;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
 import org.opendaylight.yangtools.yang.data.api.schema.tree.TreeType;
 import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFactory;
+import org.opendaylight.yangtools.yang.data.util.DataSchemaContextTree;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
@@ -110,7 +111,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     /**
@@ -148,6 +149,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private DataTreeTip tip;
 
     private SchemaContext schemaContext;
+    private DataSchemaContextTree dataSchemaContext;
 
     private int currentTransactionBatch;
 
@@ -207,6 +209,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     void updateSchemaContext(final SchemaContext newSchemaContext) {
         dataTree.setSchemaContext(newSchemaContext);
         this.schemaContext = Preconditions.checkNotNull(newSchemaContext);
+        this.dataSchemaContext = DataSchemaContextTree.from(newSchemaContext);
     }
 
     void resetTransactionBatch() {
@@ -293,7 +296,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private PruningDataTreeModification wrapWithPruning(final DataTreeModification delegate) {
-        return new PruningDataTreeModification(delegate, dataTree, schemaContext);
+        return new PruningDataTreeModification(delegate, dataTree, dataSchemaContext);
     }
 
     private static DataTreeModification unwrap(final DataTreeModification modification) {
@@ -365,7 +368,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+    private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
             throws DataValidationFailedException {
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -378,6 +381,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeCandidate candidate = dataTree.prepare(mod);
         dataTree.commit(candidate);
 
+        allMetadataCommittedTransaction(identifier);
         notifyListeners(candidate);
     }
 
@@ -404,18 +408,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
-            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                txId = e.getKey();
-                applyReplicatedCandidate(txId, e.getValue());
+                applyReplicatedCandidate(e.getKey(), e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                txId = (TransactionIdentifier) identifier;
-                payloadReplicationComplete(txId);
+                payloadReplicationComplete((TransactionIdentifier) identifier);
             }
-            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -467,12 +467,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
             LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
@@ -538,7 +540,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(
+                    historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
         } else if (callback != null) {
             callback.run();
         }
@@ -588,17 +591,33 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @param callback Callback to invoke upon completion, may be null
      */
     void closeTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
+        if (commonCloseTransactionChain(id, callback)) {
+            replicatePayload(id, CloseLocalHistoryPayload.create(id,
+                shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
+        }
+    }
+
+    /**
+     * Close a single transaction chain which is received through ask-based protocol. It does not keep a commit record.
+     *
+     * @param id History identifier
+     */
+    void closeTransactionChain(final LocalHistoryIdentifier id) {
+        commonCloseTransactionChain(id, null);
+    }
+
+    private boolean commonCloseTransactionChain(final LocalHistoryIdentifier id, @Nullable final Runnable callback) {
         final ShardDataTreeTransactionChain chain = transactionChains.get(id);
         if (chain == null) {
             LOG.debug("{}: Closing non-existent transaction chain {}", logContext, id);
             if (callback != null) {
                 callback.run();
             }
-            return;
+            return false;
         }
 
         chain.close();
-        replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+        return true;
     }
 
     /**
@@ -617,7 +636,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+        replicatePayload(id, PurgeLocalHistoryPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     Optional<DataTreeCandidate> readCurrentData() {
@@ -641,7 +661,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         final TransactionIdentifier id = transaction.getIdentifier();
         LOG.debug("{}: aborting transaction {}", logContext, id);
-        replicatePayload(id, AbortTransactionPayload.create(id), callback);
+        replicatePayload(id, AbortTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     @Override
@@ -654,14 +675,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
             final java.util.Optional<SortedSet<String>> participatingShardNames) {
         final DataTreeModification snapshot = transaction.getSnapshot();
+        final TransactionIdentifier id = transaction.getIdentifier();
+        LOG.debug("{}: readying transaction {}", logContext, id);
         snapshot.ready();
+        LOG.debug("{}: transaction {} ready", logContext, id);
 
         return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
     }
 
     void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
         LOG.debug("{}: purging transaction {}", logContext, id);
-        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+        replicatePayload(id, PurgeTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
@@ -873,7 +898,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         processNextPendingTransaction();
     }
 
-    private void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
+    private static void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
         if (atIndex == 0) {
             queue.addFirst(entry);
             return;
@@ -911,11 +936,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final SimpleShardDataTreeCohort current = entry.cohort;
         Verify.verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
 
-        LOG.debug("{}: Preparing transaction {}", logContext, current.getIdentifier());
+        final TransactionIdentifier currentId = current.getIdentifier();
+        LOG.debug("{}: Preparing transaction {}", logContext, currentId);
 
         final DataTreeCandidateTip candidate;
         try {
             candidate = tip.prepare(cohort.getDataTreeModification());
+            LOG.debug("{}: Transaction {} candidate ready", logContext, currentId);
         } catch (RuntimeException e) {
             failPreCommit(e);
             return;
@@ -932,7 +959,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 pendingTransactions.remove();
                 pendingCommits.add(entry);
 
-                LOG.debug("{}: Transaction {} prepared", logContext, current.getIdentifier());
+                LOG.debug("{}: Transaction {} prepared", logContext, currentId);
 
                 cohort.successfulPreCommit(candidate);
 
@@ -972,6 +999,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        allMetadataCommittedTransaction(txId);
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
@@ -999,7 +1027,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
-            payload = CommitTransactionPayload.create(txId, candidate);
+            payload = CommitTransactionPayload.create(txId, candidate,
+                    shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
             pendingCommits.poll().cohort.failedCommit(e);