Improve segmented journal actor metrics
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index 9c1e856d7667a7fe6454b8990c74e62d3f5cca6e..72e7a545a7e36a41ea28f79e2b10e5dbfbb6c8e7 100644 (file)
@@ -35,7 +35,6 @@ import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Queue;
@@ -57,7 +56,6 @@ import org.opendaylight.controller.cluster.datastore.persisted.AbstractIdentifia
 import org.opendaylight.controller.cluster.datastore.persisted.CloseLocalHistoryPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.CreateLocalHistoryPayload;
-import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateInputOutput.DataTreeCandidateWithVersion;
 import org.opendaylight.controller.cluster.datastore.persisted.MetadataShardDataTreeSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.PayloadVersion;
 import org.opendaylight.controller.cluster.datastore.persisted.PurgeLocalHistoryPayload;
@@ -75,7 +73,7 @@ import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.concepts.Registration;
 import org.opendaylight.yangtools.yang.common.Empty;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -120,7 +118,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         @Override
         public String toString() {
-            return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+            return "CommitEntry [tx=" + cohort.transactionId() + ", state=" + cohort.getState() + "]";
         }
     }
 
@@ -333,35 +331,35 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void applyRecoveryCandidate(final CommitTransactionPayload payload) throws IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
-        final DataTreeModification unwrapped = newModification();
-        final PruningDataTreeModification mod = createPruningModification(unwrapped,
-            NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.getValue().version()) > 0);
+        final var entry = payload.acquireCandidate();
+        final var unwrapped = newModification();
+        final var pruningMod = createPruningModification(unwrapped,
+            NormalizedNodeStreamVersion.MAGNESIUM.compareTo(entry.streamVersion()) > 0);
 
-        DataTreeCandidates.applyToModification(mod, entry.getValue().candidate());
-        mod.ready();
+        DataTreeCandidates.applyToModification(pruningMod, entry.candidate());
+        pruningMod.ready();
         LOG.trace("{}: Applying recovery modification {}", logContext, unwrapped);
 
         try {
             dataTree.validate(unwrapped);
             dataTree.commit(dataTree.prepare(unwrapped));
         } catch (Exception e) {
-            File file = new File(System.getProperty("karaf.data", "."),
+            final var file = new File(System.getProperty("karaf.data", "."),
                     "failed-recovery-payload-" + logContext + ".out");
             DataTreeModificationOutput.toFile(file, unwrapped);
-            throw new IllegalStateException(String.format(
-                    "%s: Failed to apply recovery payload. Modification data was written to file %s",
-                    logContext, file), e);
+            throw new IllegalStateException(
+                "%s: Failed to apply recovery payload. Modification data was written to file %s".formatted(
+                    logContext, file),
+                e);
         }
 
-        allMetadataCommittedTransaction(entry.getKey());
+        allMetadataCommittedTransaction(entry.transactionId());
     }
 
     private PruningDataTreeModification createPruningModification(final DataTreeModification unwrapped,
             final boolean uintAdapting) {
         // TODO: we should be able to reuse the pruner, provided we are not reentrant
-        final ReusableNormalizedNodePruner pruner = ReusableNormalizedNodePruner.forDataSchemaContext(
-            dataSchemaContext);
+        final var pruner = ReusableNormalizedNodePruner.forDataSchemaContext(dataSchemaContext);
         return uintAdapting ? new PruningDataTreeModification.Proactive(unwrapped, dataTree, pruner.withUintAdaption())
                 : new PruningDataTreeModification.Reactive(unwrapped, dataTree, pruner);
     }
@@ -396,21 +394,21 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private void applyReplicatedCandidate(final CommitTransactionPayload payload)
             throws DataValidationFailedException, IOException {
-        final Entry<TransactionIdentifier, DataTreeCandidateWithVersion> entry = payload.acquireCandidate();
-        final TransactionIdentifier identifier = entry.getKey();
-        LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
+        final var payloadCandidate = payload.acquireCandidate();
+        final var transactionId = payloadCandidate.transactionId();
+        LOG.debug("{}: Applying foreign transaction {}", logContext, transactionId);
 
-        final DataTreeModification mod = newModification();
+        final var mod = newModification();
         // TODO: check version here, which will enable us to perform forward-compatibility transformations
-        DataTreeCandidates.applyToModification(mod, entry.getValue().candidate());
+        DataTreeCandidates.applyToModification(mod, payloadCandidate.candidate());
         mod.ready();
 
         LOG.trace("{}: Applying foreign modification {}", logContext, mod);
         dataTree.validate(mod);
-        final DataTreeCandidate candidate = dataTree.prepare(mod);
+        final var candidate = dataTree.prepare(mod);
         dataTree.commit(candidate);
 
-        allMetadataCommittedTransaction(identifier);
+        allMetadataCommittedTransaction(transactionId);
         notifyListeners(candidate);
     }
 
@@ -450,7 +448,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             }
 
             // make sure acquireCandidate() is the last call touching the payload data as we want it to be GC-ed.
-            checkRootOverwrite(((CommitTransactionPayload) payload).acquireCandidate().getValue().candidate());
+            checkRootOverwrite(commit.acquireCandidate().candidate());
         } else if (payload instanceof AbortTransactionPayload abort) {
             if (identifier != null) {
                 payloadReplicationComplete(abort);
@@ -522,16 +520,17 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     private boolean payloadReplicationComplete(final TransactionIdentifier txId) {
-        final CommitEntry current = pendingFinishCommits.peek();
+        final var current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
             allMetadataCommittedTransaction(txId);
             return false;
         }
 
-        if (!current.cohort.getIdentifier().equals(txId)) {
+        final var cohortTxId = current.cohort.transactionId();
+        if (!cohortTxId.equals(txId)) {
             LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
-                current.cohort.getIdentifier(), txId);
+                cohortTxId, txId);
             allMetadataCommittedTransaction(txId);
             return false;
         }
@@ -735,8 +734,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     final void registerTreeChangeListener(final YangInstanceIdentifier path, final DOMDataTreeChangeListener listener,
-            final Optional<DataTreeCandidate> initialState,
-            final Consumer<ListenerRegistration<DOMDataTreeChangeListener>> onRegistration) {
+            final Optional<DataTreeCandidate> initialState, final Consumer<Registration> onRegistration) {
         treeChangeListenerPublisher.registerTreeChangeListener(path, listener, initialState, onRegistration);
     }
 
@@ -832,25 +830,25 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
             final DataTreeModification modification = cohort.getDataTreeModification();
 
-            LOG.debug("{}: Validating transaction {}", logContext, cohort.getIdentifier());
+            LOG.debug("{}: Validating transaction {}", logContext, cohort.transactionId());
             Exception cause;
             try {
                 tip.validate(modification);
-                LOG.debug("{}: Transaction {} validated", logContext, cohort.getIdentifier());
+                LOG.debug("{}: Transaction {} validated", logContext, cohort.transactionId());
                 cohort.successfulCanCommit();
                 entry.lastAccess = readTime();
                 return;
             } catch (ConflictingModificationAppliedException e) {
-                LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.getIdentifier(),
+                LOG.warn("{}: Store Tx {}: Conflicting modification for path {}.", logContext, cohort.transactionId(),
                     e.getPath());
                 cause = new OptimisticLockFailedException("Optimistic lock failed for path " + e.getPath(), e);
             } catch (DataValidationFailedException e) {
-                LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.getIdentifier(),
+                LOG.warn("{}: Store Tx {}: Data validation failed for path {}.", logContext, cohort.transactionId(),
                     e.getPath(), e);
 
                 // For debugging purposes, allow dumping of the modification. Coupled with the above
                 // precondition log, it should allow us to understand what went on.
-                LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.getIdentifier(), modification);
+                LOG.debug("{}: Store Tx {}: modifications: {}", logContext, cohort.transactionId(), modification);
                 LOG.trace("{}: Current tree: {}", logContext, dataTree);
                 cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
             } catch (Exception e) {
@@ -875,7 +873,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             final SimpleShardDataTreeCohort cohort = entry.cohort;
 
             if (cohort.isFailed()) {
-                LOG.debug("{}: Removing failed transaction {}", logContext, cohort.getIdentifier());
+                LOG.debug("{}: Removing failed transaction {}", logContext, cohort.transactionId());
                 queue.remove();
                 continue;
             }
@@ -921,12 +919,12 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
             Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
             if (precedingShardNames.isEmpty()) {
-                LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+                LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.transactionId());
                 return;
             }
 
             LOG.debug("{}: Evaluating tx {} for canCommit -  preceding participating shard names {}",
-                    logContext, cohort.getIdentifier(), precedingShardNames);
+                    logContext, cohort.transactionId(), precedingShardNames);
             final Iterator<CommitEntry> iter = pendingTransactions.iterator();
             int index = -1;
             int moveToIndex = -1;
@@ -937,29 +935,29 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 if (cohort.equals(entry.cohort)) {
                     if (moveToIndex < 0) {
                         LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
-                                logContext, cohort.getIdentifier());
+                                logContext, cohort.transactionId());
                         return;
                     }
 
                     LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
-                            logContext, cohort.getIdentifier(), moveToIndex);
+                            logContext, cohort.transactionId(), moveToIndex);
                     iter.remove();
                     insertEntry(pendingTransactions, entry, moveToIndex);
 
                     if (!cohort.equals(pendingTransactions.peek().cohort)) {
                         LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
-                                logContext, cohort.getIdentifier());
+                                logContext, cohort.transactionId());
                         return;
                     }
 
                     LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
-                            logContext, cohort.getIdentifier());
+                            logContext, cohort.transactionId());
                     break;
                 }
 
                 if (entry.cohort.getState() != State.READY) {
                     LOG.debug("{}: Skipping pending transaction {} in state {}",
-                            logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+                            logContext, entry.cohort.transactionId(), entry.cohort.getState());
                     continue;
                 }
 
@@ -969,16 +967,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 if (precedingShardNames.equals(pendingPrecedingShardNames)) {
                     if (moveToIndex < 0) {
                         LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
-                                logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+                                logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), index);
                         moveToIndex = index;
                     } else {
                         LOG.debug(
                             "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
-                            logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+                            logContext, pendingPrecedingShardNames, entry.cohort.transactionId(), moveToIndex);
                     }
                 } else {
                     LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
-                        logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+                        logContext, pendingPrecedingShardNames, entry.cohort.transactionId());
                 }
             }
         }
@@ -1024,7 +1022,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final SimpleShardDataTreeCohort current = entry.cohort;
         verify(cohort.equals(current), "Attempted to pre-commit %s while %s is pending", cohort, current);
 
-        final TransactionIdentifier currentId = current.getIdentifier();
+        final TransactionIdentifier currentId = current.transactionId();
         LOG.debug("{}: Preparing transaction {}", logContext, currentId);
 
         final DataTreeCandidateTip candidate;
@@ -1069,7 +1067,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     @SuppressWarnings("checkstyle:IllegalCatch")
     private void finishCommit(final SimpleShardDataTreeCohort cohort) {
-        final TransactionIdentifier txId = cohort.getIdentifier();
+        final TransactionIdentifier txId = cohort.transactionId();
         final DataTreeCandidate candidate = cohort.getCandidate();
 
         LOG.debug("{}: Resuming commit of transaction {}", logContext, txId);
@@ -1107,13 +1105,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
         final SimpleShardDataTreeCohort current = entry.cohort;
         if (!cohort.equals(current)) {
-            LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.getIdentifier());
+            LOG.debug("{}: Transaction {} scheduled for commit step", logContext, cohort.transactionId());
             return;
         }
 
-        LOG.debug("{}: Starting commit for transaction {}", logContext, current.getIdentifier());
+        LOG.debug("{}: Starting commit for transaction {}", logContext, current.transactionId());
 
-        final TransactionIdentifier txId = cohort.getIdentifier();
+        final TransactionIdentifier txId = cohort.transactionId();
         final Payload payload;
         try {
             payload = CommitTransactionPayload.create(txId, candidate, PayloadVersion.current(),
@@ -1218,7 +1216,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             final long newDelta = now - newAccess;
             if (newDelta < delta) {
                 LOG.debug("{}: Updated current transaction {} access time", logContext,
-                    currentTx.cohort.getIdentifier());
+                    currentTx.cohort.transactionId());
                 currentTx.lastAccess = newAccess;
                 delta = newDelta;
             }
@@ -1233,7 +1231,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final State state = currentTx.cohort.getState();
 
         LOG.warn("{}: Current transaction {} has timed out after {} ms in state {}", logContext,
-            currentTx.cohort.getIdentifier(), deltaMillis, state);
+            currentTx.cohort.transactionId(), deltaMillis, state);
         boolean processNext = true;
         final TimeoutException cohortFailure = new TimeoutException("Backend timeout in state " + state + " after "
                 + deltaMillis + "ms");
@@ -1273,7 +1271,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 break;
             case COMMIT_PENDING:
                 LOG.warn("{}: Transaction {} is still committing, cannot abort", logContext,
-                    currentTx.cohort.getIdentifier());
+                    currentTx.cohort.transactionId());
                 currentTx.lastAccess = now;
                 processNext = false;
                 return;
@@ -1297,7 +1295,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final Iterator<CommitEntry> it = Iterables.concat(pendingFinishCommits, pendingCommits,
                 pendingTransactions).iterator();
         if (!it.hasNext()) {
-            LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.getIdentifier());
+            LOG.debug("{}: no open transaction while attempting to abort {}", logContext, cohort.transactionId());
             return true;
         }
 
@@ -1305,8 +1303,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final CommitEntry first = it.next();
         if (cohort.equals(first.cohort)) {
             if (cohort.getState() != State.COMMIT_PENDING) {
-                LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.getIdentifier(),
-                    cohort.getIdentifier());
+                LOG.debug("{}: aborting head of queue {} in state {}", logContext, cohort.transactionId(),
+                    cohort.transactionId());
 
                 it.remove();
                 if (cohort.getCandidate() != null) {
@@ -1317,7 +1315,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 return true;
             }
 
-            LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.getIdentifier());
+            LOG.warn("{}: transaction {} is committing, skipping abort", logContext, cohort.transactionId());
             return false;
         }
 
@@ -1325,7 +1323,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         while (it.hasNext()) {
             final CommitEntry e = it.next();
             if (cohort.equals(e.cohort)) {
-                LOG.debug("{}: aborting queued transaction {}", logContext, cohort.getIdentifier());
+                LOG.debug("{}: aborting queued transaction {}", logContext, cohort.transactionId());
 
                 it.remove();
                 if (cohort.getCandidate() != null) {
@@ -1338,7 +1336,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             newTip = requireNonNullElse(e.cohort.getCandidate(), newTip);
         }
 
-        LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.getIdentifier());
+        LOG.debug("{}: aborted transaction {} not found in the queue", logContext, cohort.transactionId());
         return true;
     }
 
@@ -1348,16 +1346,16 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         while (iter.hasNext()) {
             final SimpleShardDataTreeCohort cohort = iter.next().cohort;
             if (cohort.getState() == State.CAN_COMMIT_COMPLETE) {
-                LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.getIdentifier());
+                LOG.debug("{}: Revalidating queued transaction {}", logContext, cohort.transactionId());
 
                 try {
                     tip.validate(cohort.getDataTreeModification());
                 } catch (DataValidationFailedException | RuntimeException e) {
-                    LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    LOG.debug("{}: Failed to revalidate queued transaction {}", logContext, cohort.transactionId(), e);
                     cohort.reportFailure(e);
                 }
             } else if (cohort.getState() == State.PRE_COMMIT_COMPLETE) {
-                LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.getIdentifier());
+                LOG.debug("{}: Repreparing queued transaction {}", logContext, cohort.transactionId());
 
                 try {
                     tip.validate(cohort.getDataTreeModification());
@@ -1366,7 +1364,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     cohort.setNewCandidate(candidate);
                     tip = candidate;
                 } catch (RuntimeException | DataValidationFailedException e) {
-                    LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.getIdentifier(), e);
+                    LOG.debug("{}: Failed to reprepare queued transaction {}", logContext, cohort.transactionId(), e);
                     cohort.reportFailure(e);
                 }
             }