Specify initial serialization buffer capacity for Payloads
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / ShardDataTree.java
index e8b41d97474f49951b8b67846d22c7a06b73d185..8b826015c4eaf7df45572dd747b5da90ec3dce6e 100644 (file)
@@ -27,11 +27,14 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
+import java.util.SortedSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
@@ -58,8 +61,6 @@ import org.opendaylight.controller.cluster.datastore.persisted.ShardDataTreeSnap
 import org.opendaylight.controller.cluster.datastore.utils.DataTreeModificationOutput;
 import org.opendaylight.controller.cluster.datastore.utils.PruningDataTreeModification;
 import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker.DataChangeScope;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.mdsal.common.api.OptimisticLockFailedException;
 import org.opendaylight.mdsal.common.api.TransactionCommitFailedException;
 import org.opendaylight.mdsal.dom.api.DOMDataTreeChangeListener;
@@ -82,7 +83,7 @@ import org.opendaylight.yangtools.yang.data.impl.schema.tree.InMemoryDataTreeFac
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Internal shard state, similar to a DOMStore, but optimized for use in the actor system,
@@ -102,9 +103,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             this.cohort = Preconditions.checkNotNull(cohort);
             lastAccess = now;
         }
+
+        @Override
+        public String toString() {
+            return "CommitEntry [tx=" + cohort.getIdentifier() + ", state=" + cohort.getState() + "]";
+        }
     }
 
-    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(Duration.create(5, TimeUnit.SECONDS));
+    private static final Timeout COMMIT_STEP_TIMEOUT = new Timeout(FiniteDuration.create(5, TimeUnit.SECONDS));
     private static final Logger LOG = LoggerFactory.getLogger(ShardDataTree.class);
 
     /**
@@ -117,7 +123,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     private final Map<LocalHistoryIdentifier, ShardDataTreeTransactionChain> transactionChains = new HashMap<>();
     private final DataTreeCohortActorRegistry cohortRegistry = new DataTreeCohortActorRegistry();
-    private final Queue<CommitEntry> pendingTransactions = new ArrayDeque<>();
+    private final Deque<CommitEntry> pendingTransactions = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingCommits = new ArrayDeque<>();
     private final Queue<CommitEntry> pendingFinishCommits = new ArrayDeque<>();
 
@@ -127,7 +133,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     private final Map<Payload, Runnable> replicationCallbacks = new HashMap<>();
 
     private final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher;
-    private final ShardDataChangeListenerPublisher dataChangeListenerPublisher;
     private final Collection<ShardDataTreeMetadata<?>> metadata;
     private final DataTree dataTree;
     private final String logContext;
@@ -148,14 +153,13 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final DataTree dataTree,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final String logContext,
             final ShardDataTreeMetadata<?>... metadata) {
         this.dataTree = Preconditions.checkNotNull(dataTree);
         updateSchemaContext(schemaContext);
 
         this.shard = Preconditions.checkNotNull(shard);
         this.treeChangeListenerPublisher = Preconditions.checkNotNull(treeChangeListenerPublisher);
-        this.dataChangeListenerPublisher = Preconditions.checkNotNull(dataChangeListenerPublisher);
         this.logContext = Preconditions.checkNotNull(logContext);
         this.metadata = ImmutableList.copyOf(metadata);
         tip = dataTree;
@@ -164,10 +168,9 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType,
             final YangInstanceIdentifier root,
             final ShardDataTreeChangeListenerPublisher treeChangeListenerPublisher,
-            final ShardDataChangeListenerPublisher dataChangeListenerPublisher, final String logContext,
+            final String logContext,
             final ShardDataTreeMetadata<?>... metadata) {
-        this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher,
-            dataChangeListenerPublisher, logContext, metadata);
+        this(shard, schemaContext, createDataTree(treeType, root), treeChangeListenerPublisher, logContext, metadata);
     }
 
     private static DataTree createDataTree(final TreeType treeType, final YangInstanceIdentifier root) {
@@ -182,8 +185,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public ShardDataTree(final Shard shard, final SchemaContext schemaContext, final TreeType treeType) {
         this(shard, schemaContext, treeType, YangInstanceIdentifier.EMPTY,
-                new DefaultShardDataTreeChangeListenerPublisher(""),
-                new DefaultShardDataChangeListenerPublisher(""), "");
+                new DefaultShardDataTreeChangeListenerPublisher(""), "");
     }
 
     final String logContext() {
@@ -313,7 +315,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private void applyRecoveryCandidate(final DataTreeCandidate candidate) throws DataValidationFailedException {
+    private void applyRecoveryCandidate(final DataTreeCandidate candidate) {
         final PruningDataTreeModification mod = wrapWithPruning(dataTree.takeSnapshot().newModification());
         DataTreeCandidates.applyToModification(mod, candidate);
         mod.ready();
@@ -342,7 +344,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
      * @throws IOException when the snapshot fails to deserialize
      * @throws DataValidationFailedException when the snapshot fails to apply
      */
-    void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException, DataValidationFailedException {
+    void applyRecoveryPayload(@Nonnull final Payload payload) throws IOException {
         if (payload instanceof CommitTransactionPayload) {
             final Entry<TransactionIdentifier, DataTreeCandidate> e =
                     ((CommitTransactionPayload) payload).getCandidate();
@@ -363,7 +365,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
     }
 
-    private void applyReplicatedCandidate(final Identifier identifier, final DataTreeCandidate foreign)
+    private void applyReplicatedCandidate(final TransactionIdentifier identifier, final DataTreeCandidate foreign)
             throws DataValidationFailedException {
         LOG.debug("{}: Applying foreign transaction {}", logContext, identifier);
 
@@ -376,6 +378,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final DataTreeCandidate candidate = dataTree.prepare(mod);
         dataTree.commit(candidate);
 
+        allMetadataCommittedTransaction(identifier);
         notifyListeners(candidate);
     }
 
@@ -402,18 +405,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
          * pre-Boron state -- which limits the number of options here.
          */
         if (payload instanceof CommitTransactionPayload) {
-            final TransactionIdentifier txId;
             if (identifier == null) {
                 final Entry<TransactionIdentifier, DataTreeCandidate> e =
                         ((CommitTransactionPayload) payload).getCandidate();
-                txId = e.getKey();
-                applyReplicatedCandidate(txId, e.getValue());
+                applyReplicatedCandidate(e.getKey(), e.getValue());
             } else {
                 Verify.verify(identifier instanceof TransactionIdentifier);
-                txId = (TransactionIdentifier) identifier;
-                payloadReplicationComplete(txId);
+                payloadReplicationComplete((TransactionIdentifier) identifier);
             }
-            allMetadataCommittedTransaction(txId);
         } else if (payload instanceof AbortTransactionPayload) {
             if (identifier != null) {
                 payloadReplicationComplete((AbortTransactionPayload) payload);
@@ -465,12 +464,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final CommitEntry current = pendingFinishCommits.peek();
         if (current == null) {
             LOG.warn("{}: No outstanding transactions, ignoring consensus on transaction {}", logContext, txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
         if (!current.cohort.getIdentifier().equals(txId)) {
             LOG.debug("{}: Head of pendingFinishCommits queue is {}, ignoring consensus on transaction {}", logContext,
                 current.cohort.getIdentifier(), txId);
+            allMetadataCommittedTransaction(txId);
             return;
         }
 
@@ -536,7 +537,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(
+                    historyId, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
         } else if (callback != null) {
             callback.run();
         }
@@ -564,7 +566,6 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     @VisibleForTesting
     public void notifyListeners(final DataTreeCandidate candidate) {
         treeChangeListenerPublisher.publishChanges(candidate);
-        dataChangeListenerPublisher.publishChanges(candidate);
     }
 
     /**
@@ -597,7 +598,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         }
 
         chain.close();
-        replicatePayload(id, CloseLocalHistoryPayload.create(id), callback);
+        replicatePayload(id, CloseLocalHistoryPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     /**
@@ -616,15 +618,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
-        replicatePayload(id, PurgeLocalHistoryPayload.create(id), callback);
-    }
-
-    void registerDataChangeListener(final YangInstanceIdentifier path,
-            final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener,
-            final DataChangeScope scope, final Optional<DataTreeCandidate> initialState,
-            final Consumer<ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>>
-                    onRegistration) {
-        dataChangeListenerPublisher.registerDataChangeListener(path, listener, scope, initialState, onRegistration);
+        replicatePayload(id, PurgeLocalHistoryPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     Optional<DataTreeCandidate> readCurrentData() {
@@ -648,7 +643,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
         final TransactionIdentifier id = transaction.getIdentifier();
         LOG.debug("{}: aborting transaction {}", logContext, id);
-        replicatePayload(id, AbortTransactionPayload.create(id), callback);
+        replicatePayload(id, AbortTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     @Override
@@ -658,16 +654,18 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
+    ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         final DataTreeModification snapshot = transaction.getSnapshot();
         snapshot.ready();
 
-        return createReadyCohort(transaction.getIdentifier(), snapshot);
+        return createReadyCohort(transaction.getIdentifier(), snapshot, participatingShardNames);
     }
 
     void purgeTransaction(final TransactionIdentifier id, final Runnable callback) {
         LOG.debug("{}: purging transaction {}", logContext, id);
-        replicatePayload(id, PurgeTransactionPayload.create(id), callback);
+        replicatePayload(id, PurgeTransactionPayload.create(
+                id, shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity()), callback);
     }
 
     public Optional<NormalizedNode<?, ?>> readNode(final YangInstanceIdentifier path) {
@@ -744,8 +742,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
 
                 // For debugging purposes, allow dumping of the modification. Coupled with the above
                 // precondition log, it should allow us to understand what went on.
-                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", cohort.getIdentifier(), modification,
-                        dataTree);
+                LOG.debug("{}: Store Tx {}: modifications: {} tree: {}", logContext, cohort.getIdentifier(),
+                    modification, dataTree);
                 cause = new TransactionCommitFailedException("Data did not pass validation for path " + e.getPath(), e);
             } catch (Exception e) {
                 LOG.warn("{}: Unexpected failure in validation phase", logContext, e);
@@ -801,13 +799,108 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
         if (!cohort.equals(head.cohort)) {
-            LOG.debug("{}: Transaction {} scheduled for canCommit step", logContext, cohort.getIdentifier());
-            return;
+            // The tx isn't at the head of the queue so we can't start canCommit at this point. Here we check if this
+            // tx should be moved ahead of other tx's in the READY state in the pendingTransactions queue. If this tx
+            // has other participating shards, it could deadlock with other tx's accessing the same shards
+            // depending on the order the tx's are readied on each shard
+            // (see https://jira.opendaylight.org/browse/CONTROLLER-1836). Therefore, if the preceding participating
+            // shard names for a preceding pending tx, call it A, in the queue matches that of this tx, then this tx
+            // is allowed to be moved ahead of tx A in the queue so it is processed first to avoid potential deadlock
+            // if tx A is behind this tx in the pendingTransactions queue for a preceding shard. In other words, since
+            // canCommmit for this tx was requested before tx A, honor that request. If this tx is moved to the head of
+            // the queue as a result, then proceed with canCommit.
+
+            Collection<String> precedingShardNames = extractPrecedingShardNames(cohort.getParticipatingShardNames());
+            if (precedingShardNames.isEmpty()) {
+                LOG.debug("{}: Tx {} is scheduled for canCommit step", logContext, cohort.getIdentifier());
+                return;
+            }
+
+            LOG.debug("{}: Evaluating tx {} for canCommit -  preceding participating shard names {}",
+                    logContext, cohort.getIdentifier(), precedingShardNames);
+            final Iterator<CommitEntry> iter = pendingTransactions.iterator();
+            int index = -1;
+            int moveToIndex = -1;
+            while (iter.hasNext()) {
+                final CommitEntry entry = iter.next();
+                ++index;
+
+                if (cohort.equals(entry.cohort)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Not moving tx {} - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Moving {} to index {} in the pendingTransactions queue",
+                            logContext, cohort.getIdentifier(), moveToIndex);
+                    iter.remove();
+                    insertEntry(pendingTransactions, entry, moveToIndex);
+
+                    if (!cohort.equals(pendingTransactions.peek().cohort)) {
+                        LOG.debug("{}: Tx {} is not at the head of the queue - cannot proceed with canCommit",
+                                logContext, cohort.getIdentifier());
+                        return;
+                    }
+
+                    LOG.debug("{}: Tx {} is now at the head of the queue - proceeding with canCommit",
+                            logContext, cohort.getIdentifier());
+                    break;
+                }
+
+                if (entry.cohort.getState() != State.READY) {
+                    LOG.debug("{}: Skipping pending transaction {} in state {}",
+                            logContext, entry.cohort.getIdentifier(), entry.cohort.getState());
+                    continue;
+                }
+
+                final Collection<String> pendingPrecedingShardNames = extractPrecedingShardNames(
+                        entry.cohort.getParticipatingShardNames());
+
+                if (precedingShardNames.equals(pendingPrecedingShardNames)) {
+                    if (moveToIndex < 0) {
+                        LOG.debug("{}: Preceding shard names {} for pending tx {} match - saving moveToIndex {}",
+                                logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), index);
+                        moveToIndex = index;
+                    } else {
+                        LOG.debug(
+                            "{}: Preceding shard names {} for pending tx {} match but moveToIndex already set to {}",
+                            logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier(), moveToIndex);
+                    }
+                } else {
+                    LOG.debug("{}: Preceding shard names {} for pending tx {} differ - skipping",
+                        logContext, pendingPrecedingShardNames, entry.cohort.getIdentifier());
+                }
+            }
         }
 
         processNextPendingTransaction();
     }
 
+    private static void insertEntry(final Deque<CommitEntry> queue, final CommitEntry entry, final int atIndex) {
+        if (atIndex == 0) {
+            queue.addFirst(entry);
+            return;
+        }
+
+        LOG.trace("Inserting into Deque at index {}", atIndex);
+
+        Deque<CommitEntry> tempStack = new ArrayDeque<>(atIndex);
+        for (int i = 0; i < atIndex; i++) {
+            tempStack.push(queue.poll());
+        }
+
+        queue.addFirst(entry);
+
+        tempStack.forEach(queue::addFirst);
+    }
+
+    private Collection<String> extractPrecedingShardNames(
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
+        return participatingShardNames.map((Function<SortedSet<String>, Collection<String>>)
+            set -> set.headSet(shard.getShardName())).orElse(Collections.<String>emptyList());
+    }
+
     private void failPreCommit(final Throwable cause) {
         shard.getShardMBean().incrementFailedTransactionsCount();
         pendingTransactions.poll().cohort.failedPreCommit(cause);
@@ -883,6 +976,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return;
         }
 
+        allMetadataCommittedTransaction(txId);
         shard.getShardMBean().incrementCommittedTransactionCount();
         shard.getShardMBean().setLastCommittedTransactionTime(System.currentTimeMillis());
 
@@ -910,7 +1004,8 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         final TransactionIdentifier txId = cohort.getIdentifier();
         final Payload payload;
         try {
-            payload = CommitTransactionPayload.create(txId, candidate);
+            payload = CommitTransactionPayload.create(txId, candidate,
+                    shard.getDatastoreContext().getInitialPayloadSerializedBufferCapacity());
         } catch (IOException e) {
             LOG.error("{}: Failed to encode transaction {} candidate {}", logContext, txId, candidate, e);
             pendingCommits.poll().cohort.failedCommit(e);
@@ -965,22 +1060,24 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
     }
 
     @Override
-    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort createReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         SimpleShardDataTreeCohort cohort = new SimpleShardDataTreeCohort(this, mod, txId,
-                cohortRegistry.createCohort(schemaContext, txId, runnable -> shard.executeInSelf(runnable),
-                        COMMIT_STEP_TIMEOUT));
+                cohortRegistry.createCohort(schemaContext, txId, shard::executeInSelf,
+                        COMMIT_STEP_TIMEOUT), participatingShardNames);
         pendingTransactions.add(new CommitEntry(cohort, readTime()));
         return cohort;
     }
 
     // Exposed for ShardCommitCoordinator so it does not have deal with local histories (it does not care), this mimics
     // the newReadWriteTransaction()
-    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod) {
+    ShardDataTreeCohort newReadyCohort(final TransactionIdentifier txId, final DataTreeModification mod,
+            final java.util.Optional<SortedSet<String>> participatingShardNames) {
         if (txId.getHistoryId().getHistoryId() == 0) {
-            return createReadyCohort(txId, mod);
+            return createReadyCohort(txId, mod, participatingShardNames);
         }
 
-        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod, participatingShardNames);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")