Track skipped transactions
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / databroker / actors / dds / AbstractClientHistory.java
index d306d13e2e29f50063b89f42136a5dffd4fc03ed..95552b382e21e4c25cf63d8db25ab3e9caec99d7 100644 (file)
@@ -15,6 +15,7 @@ import static java.util.Objects.requireNonNull;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -239,6 +240,28 @@ public abstract class AbstractClientHistory extends LocalAbortable implements Id
     @Holding("this")
     abstract ClientTransaction doCreateTransaction();
 
+    /**
+     * Callback invoked from {@link AbstractClientHandle}'s lifecycle to inform that a particular transaction is
+     * completing with a set of participating shards.
+     *
+     * @param txId Transaction identifier
+     * @param participatingShards Participating shard cookies
+     */
+    final void onTransactionShardsBound(final TransactionIdentifier txId, final Set<Long> participatingShards) {
+        // Guard against startReconnect() kicking in. It is okay to connect new participants concurrently, as those
+        // will not see the holes caused by this.
+        final long stamp = lock.readLock();
+        try {
+            for (var entry : histories.entrySet()) {
+                if (!participatingShards.contains(entry.getKey())) {
+                    entry.getValue().skipTransaction(txId);
+                }
+            }
+        } finally {
+            lock.unlockRead(stamp);
+        }
+    }
+
     /**
      * Callback invoked from {@link ClientTransaction} when a child transaction readied for submission.
      *