Specify initial serialization buffer capacity for Payloads
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 5f1e67e988e60ccca5f434f2e04358cd484ba8b9..04af7f60fce1adeb7c01dba530c64f45d2d5c747 100644 (file)
@@ -315,7 +315,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+    private void applyRecoveryCandidate(final DataTreeCandidate candidate) {
         final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
         DataTreeCandidates.applyToModification(mod, candidate);
         mod.ready();
@@ -344,7 +344,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @throws IOException when the snapshot fails to deserialize
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException {
+    void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException {
         if (payload instanceof CommitTransactionPayload) {
             final Entry<TransactionIdentifier, DataTreeCandidate> e =
                     ((CommitTransactionPayload) payload).getCandidate();
@@ -365,7 +365,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+    private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
             throws DataValidationFailedException {
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -378,6 +378,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeCandidate candidate = dataTree.prepare(mod);
         dataTree.commit(candidate);
 
+        allMetadataCommittedTransaction(identifier);
         notifyListeners(candidate);
     }
 
@@ -404,18 +405,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
-            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                txId = e.getKey();
-                applyReplicatedCandidate(txId, e.getValue());
+                applyReplicatedCandidate(e.getKey(), e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                txId = (TransactionIdentifier) identifier;
-                payloadReplicationComplete(txId);
+                payloadReplicationComplete((TransactionIdentifier) identifier);
             }
-            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -467,12 +464,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
             LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
@@ -538,7 +537,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(
+                    historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
         } else if (callback != null) {
             callback.run();
         }
@@ -598,7 +598,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         chain.close();
-        replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+        replicatePayload(id, CloseLocalHistoryPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     /**
@@ -617,7 +618,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
+        replicatePayload(id, PurgeLocalHistoryPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     Optional<DataTreeCandidate> readCurrentData() {
@@ -641,7 +643,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         final TransactionIdentifier id = transaction.getIdentifier();
         LOG.debug("{}: aborting transaction {}", logContext, id);
-        replicatePayload(id, AbortTransactionPayload.create(id), callback);
+        replicatePayload(id, AbortTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     @Override
@@ -661,7 +664,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
         LOG.debug("{}: purging transaction {}", logContext, id);
-        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+        replicatePayload(id, PurgeTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
@@ -972,6 +976,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        allMetadataCommittedTransaction(txId);
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
@@ -999,7 +1004,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
-            payload = CommitTransactionPayload.create(txId, candidate);
+            payload = CommitTransactionPayload.create(txId, candidate,
+                    shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
             pendingCommits.poll().cohort.failedCommit(e);
@@ -1057,7 +1063,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
             final java.util.Optional<SortedSet<String>> participatingShardNames) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
-                cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
+                cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
                         COMMIT_STEP_TIMEOUT), participatingShardNames);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;