Implement scatter/gather on module shards
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / FrontendClientMetadataBuilder.java
index 475d2e62f2faeaf8a8b52b58b8ddaa878e5d7181..7f281ab0f34eee7240a06ea1ac7f23b40dccaa73 100644 (file)
@@ -14,10 +14,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableRangeSet;
-import com.google.common.collect.RangeSet;
-import com.google.common.primitives.UnsignedLong;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import org.eclipse.jdt.annotation.NonNull;
@@ -26,7 +22,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendClientMetadata;
 import org.opendaylight.controller.cluster.datastore.persisted.FrontendHistoryMetadata;
-import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
+import org.opendaylight.controller.cluster.datastore.utils.ImmutableUnsignedLongSet;
+import org.opendaylight.controller.cluster.datastore.utils.MutableUnsignedLongSet;
 import org.opendaylight.yangtools.concepts.Builder;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
@@ -44,7 +41,7 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
 
         @Override
         public FrontendClientMetadata build() {
-            return new FrontendClientMetadata(getIdentifier(), ImmutableRangeSet.of(), ImmutableList.of());
+            return new FrontendClientMetadata(getIdentifier(), ImmutableUnsignedLongSet.of(), ImmutableList.of());
         }
 
         @Override
@@ -77,6 +74,11 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
             // No-op
         }
 
+        @Override
+        void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+            // No-op
+        }
+
         @Override
         LeaderFrontendState toLeaderState(final Shard shard) {
             return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore());
@@ -84,15 +86,14 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
     }
 
     static final class Enabled extends FrontendClientMetadataBuilder {
-
         private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
-        private final UnsignedLongRangeSet purgedHistories;
+        private final MutableUnsignedLongSet purgedHistories;
         private final LocalHistoryIdentifier standaloneId;
 
         Enabled(final String shardName, final ClientIdentifier identifier) {
             super(shardName, identifier);
 
-            purgedHistories = UnsignedLongRangeSet.create();
+            purgedHistories = MutableUnsignedLongSet.of();
 
             // History for stand-alone transactions is always present
             standaloneId = standaloneHistoryId();
@@ -102,7 +103,7 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
         Enabled(final String shardName, final FrontendClientMetadata meta) {
             super(shardName, meta.getIdentifier());
 
-            purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
+            purgedHistories = meta.getPurgedHistories().mutableCopy();
             for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
                 final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h);
                 currentHistories.put(b.getIdentifier(), b);
@@ -119,7 +120,7 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
 
         @Override
         public FrontendClientMetadata build() {
-            return new FrontendClientMetadata(getIdentifier(), purgedHistories.toImmutable(),
+            return new FrontendClientMetadata(getIdentifier(), purgedHistories.immutableCopy(),
                 Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
         }
 
@@ -196,6 +197,17 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
             }
         }
 
+        @Override
+        void onTransactionsSkipped(final LocalHistoryIdentifier historyId, final ImmutableUnsignedLongSet txIds) {
+            final FrontendHistoryMetadataBuilder history = getHistory(historyId);
+            if (history != null) {
+                history.onTransactionsSkipped(txIds);
+                LOG.debug("{}: History {} skipped transactions {}", shardName(), historyId, txIds);
+            } else {
+                LOG.warn("{}: Unknown history {} for skipped transactions, ignoring", shardName(), historyId);
+            }
+        }
+
         @Override
         LeaderFrontendState toLeaderState(final Shard shard) {
             // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
@@ -220,7 +232,7 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
             }
 
             return new LeaderFrontendState.Enabled(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
-                purgedHistories.copy(), singleHistory, histories);
+                purgedHistories.mutableCopy(), singleHistory, histories);
         }
 
         @Override
@@ -229,15 +241,21 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
         }
 
         private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
-            LocalHistoryIdentifier historyId = txId.getHistoryId();
+            return getHistory(txId.getHistoryId());
+        }
+
+        private FrontendHistoryMetadataBuilder getHistory(final LocalHistoryIdentifier historyId) {
+            final LocalHistoryIdentifier local;
             if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
                 // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
                 // needs to account for that.
                 LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
-                historyId = standaloneId;
+                local = standaloneId;
+            } else {
+                local = historyId;
             }
 
-            return currentHistories.get(historyId);
+            return currentHistories.get(local);
         }
 
         private LocalHistoryIdentifier standaloneHistoryId() {
@@ -256,13 +274,10 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
     }
 
     static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
-        final Collection<FrontendHistoryMetadata> current = meta.getCurrentHistories();
-        final RangeSet<UnsignedLong> purged = meta.getPurgedHistories();
-
         // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
         // either purged or active
-        return current.isEmpty() && purged.isEmpty() ? new Disabled(shardName, meta.getIdentifier())
-                : new Enabled(shardName, meta);
+        return meta.getCurrentHistories().isEmpty() && meta.getPurgedHistories().isEmpty()
+            ? new Disabled(shardName, meta.getIdentifier()) : new Enabled(shardName, meta);
     }
 
     @Override
@@ -286,6 +301,8 @@ abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMe
 
     abstract void onTransactionPurged(TransactionIdentifier txId);
 
+    abstract void onTransactionsSkipped(LocalHistoryIdentifier historyId, ImmutableUnsignedLongSet txIds);
+
     /**
      * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
      *