Allow transaction tracking to be disabled 83/81983/13
authorRobert Varga <robert.varga@pantheon.tech>
Fri, 10 May 2019 10:25:31 +0000 (12:25 +0200)
committerTom Pantelis <tompantelis@gmail.com>
Mon, 20 May 2019 14:23:51 +0000 (14:23 +0000)
Ask-based protocol does not need tracking of transactions and
histories, as it is not retransmitting requests. It also does
not inform backend about purely-local aborted transactions
(read-write and read-only), which leads to transaction tracking
rangesets having holes where those IDs are used.

This adds the prerequisite handling of disabling from the leader
without adding the actual mechanics.

JIRA: CONTROLLER-1879
Change-Id: I133e7688b492336937f394f0f6c3f080a05a820f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
Signed-off-by: Tomas Cere <tomas.cere@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendClientMetadataBuilder.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendMetadata.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java [new file with mode: 0644]

index fbaf76fbc5098b6ea2ca07929de5acb00ed9e587..7e6eced779bdf4026dca36134b2b13e7509f1bbb 100644 (file)
@@ -11,7 +11,13 @@ import static com.google.common.base.Verify.verify;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
 import com.google.common.collect.Collections2;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableRangeSet;
+import com.google.common.collect.RangeSet;
+import com.google.common.primitives.UnsignedLong;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import org.eclipse.jdt.annotation.NonNull;
@@ -29,173 +35,271 @@ import org.slf4j.LoggerFactory;
 /**
  * This class is NOT thread-safe.
  */
-final class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>, Identifiable<ClientIdentifier> {
-    private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
+abstract class FrontendClientMetadataBuilder implements Builder<FrontendClientMetadata>,
+        Identifiable<ClientIdentifier> {
+    static final class Disabled extends FrontendClientMetadataBuilder {
+        Disabled(final String shardName, final ClientIdentifier identifier) {
+            super(shardName, identifier);
+        }
 
-    private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
-    private final UnsignedLongRangeSet purgedHistories;
-    private final LocalHistoryIdentifier standaloneId;
-    private final ClientIdentifier identifier;
-    private final String shardName;
+        @Override
+        public FrontendClientMetadata build() {
+            return new FrontendClientMetadata(getIdentifier(), ImmutableRangeSet.of(), ImmutableList.of());
+        }
 
-    FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) {
-        this.shardName = requireNonNull(shardName);
-        this.identifier = requireNonNull(identifier);
-        purgedHistories = UnsignedLongRangeSet.create();
+        @Override
+        void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+            // No-op
+        }
 
-        // History for stand-alone transactions is always present
-        standaloneId = standaloneHistoryId();
-        currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
-    }
+        @Override
+        void onHistoryClosed(final LocalHistoryIdentifier historyId) {
+            // No-op
+        }
 
-    FrontendClientMetadataBuilder(final String shardName, final FrontendClientMetadata meta) {
-        this.shardName = requireNonNull(shardName);
-        this.identifier = meta.getIdentifier();
-        purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
+        @Override
+        void onHistoryPurged(final LocalHistoryIdentifier historyId) {
+            // No-op
+        }
 
-        for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
-            final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(identifier, h);
-            currentHistories.put(b.getIdentifier(), b);
+        @Override
+        void onTransactionAborted(final TransactionIdentifier txId) {
+            // No-op
         }
 
-        // Sanity check and recovery
-        standaloneId = standaloneHistoryId();
-        if (!currentHistories.containsKey(standaloneId)) {
-            LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
-                shardName, identifier, currentHistories);
-            currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
+        @Override
+        void onTransactionCommitted(final TransactionIdentifier txId) {
+            // No-op
         }
-    }
 
-    private LocalHistoryIdentifier standaloneHistoryId() {
-        return new LocalHistoryIdentifier(identifier, 0);
-    }
+        @Override
+        void onTransactionPurged(final TransactionIdentifier txId) {
+            // No-op
+        }
 
-    @Override
-    public FrontendClientMetadata build() {
-        return new FrontendClientMetadata(identifier, purgedHistories.toImmutable(),
-            Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
+        @Override
+        LeaderFrontendState toLeaderState(final Shard shard) {
+            return new LeaderFrontendState.Disabled(shard.persistenceId(), getIdentifier(), shard.getDataStore());
+        }
     }
 
-    @Override
-    public ClientIdentifier getIdentifier() {
-        return identifier;
-    }
+    static final class Enabled extends FrontendClientMetadataBuilder {
+
+        private final Map<LocalHistoryIdentifier, FrontendHistoryMetadataBuilder> currentHistories = new HashMap<>();
+        private final UnsignedLongRangeSet purgedHistories;
+        private final LocalHistoryIdentifier standaloneId;
+
+        Enabled(final String shardName, final ClientIdentifier identifier) {
+            super(shardName, identifier);
+
+            purgedHistories = UnsignedLongRangeSet.create();
 
-    void onHistoryCreated(final LocalHistoryIdentifier historyId) {
-        final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
-        final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
-        if (oldMeta != null) {
-            // This should not be happening, warn about it
-            LOG.warn("{}: Reused local history {}", shardName, historyId);
-        } else {
-            LOG.debug("{}: Created local history {}", shardName, historyId);
+            // History for stand-alone transactions is always present
+            standaloneId = standaloneHistoryId();
+            currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
         }
-    }
 
-    void onHistoryClosed(final LocalHistoryIdentifier historyId) {
-        final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
-        if (builder != null) {
-            builder.onHistoryClosed();
-            LOG.debug("{}: Closed history {}", shardName, historyId);
-        } else {
-            LOG.warn("{}: Closed unknown history {}, ignoring", shardName, historyId);
+        Enabled(final String shardName, final FrontendClientMetadata meta) {
+            super(shardName, meta.getIdentifier());
+
+            purgedHistories = UnsignedLongRangeSet.create(meta.getPurgedHistories());
+            for (FrontendHistoryMetadata h : meta.getCurrentHistories()) {
+                final FrontendHistoryMetadataBuilder b = new FrontendHistoryMetadataBuilder(getIdentifier(), h);
+                currentHistories.put(b.getIdentifier(), b);
+            }
+
+            // Sanity check and recovery
+            standaloneId = standaloneHistoryId();
+            if (!currentHistories.containsKey(standaloneId)) {
+                LOG.warn("{}: Client {} recovered histories {} do not contain stand-alone history, attempting recovery",
+                    shardName, getIdentifier(), currentHistories);
+                currentHistories.put(standaloneId, new FrontendHistoryMetadataBuilder(standaloneId));
+            }
+        }
+
+        @Override
+        public FrontendClientMetadata build() {
+            return new FrontendClientMetadata(getIdentifier(), purgedHistories.toImmutable(),
+                Collections2.transform(currentHistories.values(), FrontendHistoryMetadataBuilder::build));
+        }
+
+        @Override
+        void onHistoryCreated(final LocalHistoryIdentifier historyId) {
+            final FrontendHistoryMetadataBuilder newMeta = new FrontendHistoryMetadataBuilder(historyId);
+            final FrontendHistoryMetadataBuilder oldMeta = currentHistories.putIfAbsent(historyId, newMeta);
+            if (oldMeta != null) {
+                // This should not be happening, warn about it
+                LOG.warn("{}: Reused local history {}", shardName(), historyId);
+            } else {
+                LOG.debug("{}: Created local history {}", shardName(), historyId);
+            }
         }
-    }
 
-    void onHistoryPurged(final LocalHistoryIdentifier historyId) {
-        final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
-        final long historyBits = historyId.getHistoryId();
-        if (history == null) {
-            if (!purgedHistories.contains(historyBits)) {
+        @Override
+        void onHistoryClosed(final LocalHistoryIdentifier historyId) {
+            final FrontendHistoryMetadataBuilder builder = currentHistories.get(historyId);
+            if (builder != null) {
+                builder.onHistoryClosed();
+                LOG.debug("{}: Closed history {}", shardName(), historyId);
+            } else {
+                LOG.warn("{}: Closed unknown history {}, ignoring", shardName(), historyId);
+            }
+        }
+
+        @Override
+        void onHistoryPurged(final LocalHistoryIdentifier historyId) {
+            final FrontendHistoryMetadataBuilder history = currentHistories.remove(historyId);
+            final long historyBits = historyId.getHistoryId();
+            if (history == null) {
+                if (!purgedHistories.contains(historyBits)) {
+                    purgedHistories.add(historyBits);
+                    LOG.warn("{}: Purging unknown history {}", shardName(), historyId);
+                } else {
+                    LOG.warn("{}: Duplicate purge of history {}", shardName(), historyId);
+                }
+            } else {
                 purgedHistories.add(historyBits);
-                LOG.warn("{}: Purging unknown history {}", shardName, historyId);
+                LOG.debug("{}: Purged history {}", shardName(), historyId);
+            }
+        }
+
+        @Override
+        void onTransactionAborted(final TransactionIdentifier txId) {
+            final FrontendHistoryMetadataBuilder history = getHistory(txId);
+            if (history != null) {
+                history.onTransactionAborted(txId);
+                LOG.debug("{}: Aborted transaction {}", shardName(), txId);
             } else {
-                LOG.warn("{}: Duplicate purge of history {}", shardName, historyId);
+                LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName(), txId);
             }
-        } else {
-            purgedHistories.add(historyBits);
-            LOG.debug("{}: Purged history {}", shardName, historyId);
         }
-    }
 
-    void onTransactionAborted(final TransactionIdentifier txId) {
-        final FrontendHistoryMetadataBuilder history = getHistory(txId);
-        if (history != null) {
-            history.onTransactionAborted(txId);
-            LOG.debug("{}: Aborted transaction {}", shardName, txId);
-        } else {
-            LOG.warn("{}: Unknown history for aborted transaction {}, ignoring", shardName, txId);
+        @Override
+        void onTransactionCommitted(final TransactionIdentifier txId) {
+            final FrontendHistoryMetadataBuilder history = getHistory(txId);
+            if (history != null) {
+                history.onTransactionCommitted(txId);
+                LOG.debug("{}: Committed transaction {}", shardName(), txId);
+            } else {
+                LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName(), txId);
+            }
         }
-    }
 
-    void onTransactionCommitted(final TransactionIdentifier txId) {
-        final FrontendHistoryMetadataBuilder history = getHistory(txId);
-        if (history != null) {
-            history.onTransactionCommitted(txId);
-            LOG.debug("{}: Committed transaction {}", shardName, txId);
-        } else {
-            LOG.warn("{}: Unknown history for commited transaction {}, ignoring", shardName, txId);
+        @Override
+        void onTransactionPurged(final TransactionIdentifier txId) {
+            final FrontendHistoryMetadataBuilder history = getHistory(txId);
+            if (history != null) {
+                history.onTransactionPurged(txId);
+                LOG.debug("{}: Purged transaction {}", shardName(), txId);
+            } else {
+                LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName(), txId);
+            }
         }
-    }
 
-    void onTransactionPurged(final TransactionIdentifier txId) {
-        final FrontendHistoryMetadataBuilder history = getHistory(txId);
-        if (history != null) {
-            history.onTransactionPurged(txId);
-            LOG.debug("{}: Purged transaction {}", shardName, txId);
-        } else {
-            LOG.warn("{}: Unknown history for purged transaction {}, ignoring", shardName, txId);
+        @Override
+        LeaderFrontendState toLeaderState(final Shard shard) {
+            // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
+            //       interactions would get intertwined leading to inconsistencies.
+            final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
+            for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
+                if (e.getIdentifier().getHistoryId() != 0) {
+                    final AbstractFrontendHistory state = e.toLeaderState(shard);
+                    verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state);
+                    histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
+                }
+            }
+
+            final AbstractFrontendHistory singleHistory;
+            final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
+                new LocalHistoryIdentifier(getIdentifier(), 0));
+            if (singleHistoryMeta == null) {
+                final ShardDataTree tree = shard.getDataStore();
+                singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
+            } else {
+                singleHistory = singleHistoryMeta.toLeaderState(shard);
+            }
+
+            return new LeaderFrontendState.Enabled(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
+                purgedHistories.copy(), singleHistory, histories);
         }
-    }
 
-    /**
-     * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
-     *
-     * @param shard parent shard
-     * @return Leader frontend state
-     */
-    @NonNull LeaderFrontendState toLeaderState(final @NonNull Shard shard) {
-        // Note: we have to make sure to *copy* all current state and not leak any views, otherwise leader/follower
-        //       interactions would get intertwined leading to inconsistencies.
-        final Map<LocalHistoryIdentifier, LocalFrontendHistory> histories = new HashMap<>();
-        for (FrontendHistoryMetadataBuilder e : currentHistories.values()) {
-            if (e.getIdentifier().getHistoryId() != 0) {
-                final AbstractFrontendHistory state = e.toLeaderState(shard);
-                verify(state instanceof LocalFrontendHistory, "Unexpected state %s", state);
-                histories.put(e.getIdentifier(), (LocalFrontendHistory) state);
+        @Override
+        ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+            return super.addToStringAttributes(helper).add("current", currentHistories).add("purged", purgedHistories);
+        }
+
+        private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
+            LocalHistoryIdentifier historyId = txId.getHistoryId();
+            if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
+                // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
+                // needs to account for that.
+                LOG.debug("{}: looking up {} instead of {}", shardName(), standaloneId, historyId);
+                historyId = standaloneId;
             }
+
+            return currentHistories.get(historyId);
         }
 
-        final AbstractFrontendHistory singleHistory;
-        final FrontendHistoryMetadataBuilder singleHistoryMeta = currentHistories.get(
-            new LocalHistoryIdentifier(identifier, 0));
-        if (singleHistoryMeta == null) {
-            final ShardDataTree tree = shard.getDataStore();
-            singleHistory = StandaloneFrontendHistory.create(shard.persistenceId(), getIdentifier(), tree);
-        } else {
-            singleHistory = singleHistoryMeta.toLeaderState(shard);
+        private LocalHistoryIdentifier standaloneHistoryId() {
+            return new LocalHistoryIdentifier(getIdentifier(), 0);
         }
+    }
 
-        return new LeaderFrontendState(shard.persistenceId(), getIdentifier(), shard.getDataStore(),
-            purgedHistories.copy(), singleHistory, histories);
+    private static final Logger LOG = LoggerFactory.getLogger(FrontendClientMetadataBuilder.class);
+
+    private final ClientIdentifier identifier;
+    private final String shardName;
+
+    FrontendClientMetadataBuilder(final String shardName, final ClientIdentifier identifier) {
+        this.shardName = requireNonNull(shardName);
+        this.identifier = requireNonNull(identifier);
     }
 
-    private FrontendHistoryMetadataBuilder getHistory(final TransactionIdentifier txId) {
-        LocalHistoryIdentifier historyId = txId.getHistoryId();
-        if (historyId.getHistoryId() == 0 && historyId.getCookie() != 0) {
-            // We are pre-creating the history for free-standing transactions with a zero cookie, hence our lookup
-            // needs to account for that.
-            LOG.debug("{}: looking up {} instead of {}", shardName, standaloneId, historyId);
-            historyId = standaloneId;
-        }
+    static FrontendClientMetadataBuilder of(final String shardName, final FrontendClientMetadata meta) {
+        final Collection<FrontendHistoryMetadata> current = meta.getCurrentHistories();
+        final RangeSet<UnsignedLong> purged = meta.getPurgedHistories();
+
+        // Completely empty histories imply disabled state, as otherwise we'd have a record of the single history --
+        // either purged or active
+        return current.isEmpty() && purged.isEmpty() ? new Disabled(shardName, meta.getIdentifier())
+                : new Enabled(shardName, meta);
+    }
+
+    @Override
+    public final ClientIdentifier getIdentifier() {
+        return identifier;
+    }
 
-        return currentHistories.get(historyId);
+    final String shardName() {
+        return shardName;
     }
 
+    abstract void onHistoryCreated(LocalHistoryIdentifier historyId);
+
+    abstract void onHistoryClosed(LocalHistoryIdentifier historyId);
+
+    abstract void onHistoryPurged(LocalHistoryIdentifier historyId);
+
+    abstract void onTransactionAborted(TransactionIdentifier txId);
+
+    abstract void onTransactionCommitted(TransactionIdentifier txId);
+
+    abstract void onTransactionPurged(TransactionIdentifier txId);
+
+    /**
+     * Transform frontend metadata for a particular client into its {@link LeaderFrontendState} counterpart.
+     *
+     * @param shard parent shard
+     * @return Leader frontend state
+     */
+    abstract @NonNull LeaderFrontendState toLeaderState(@NonNull Shard shard);
+
     @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this).add("identifier", identifier).add("current", currentHistories)
-                .add("purged", purgedHistories).toString();
+    public final String toString() {
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
+    }
+
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return helper.add("identifier", identifier);
     }
 }
index 969accd583c130160177be9316992da98704d559..f651efa3fab3c14933aa3a386ea7b8c8b8477576 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Verify.verify;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Maps;
@@ -57,7 +59,7 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
 
         for (FrontendClientMetadata m : snapshot.getClients()) {
             LOG.debug("{}: applying metadata {}", shardName, m);
-            final FrontendClientMetadataBuilder b = new FrontendClientMetadataBuilder(shardName, m);
+            final FrontendClientMetadataBuilder b = FrontendClientMetadataBuilder.of(shardName, m);
             final FrontendIdentifier client = m.getIdentifier().getFrontendId();
 
             LOG.debug("{}: client {} updated to {}", shardName, client, b);
@@ -77,7 +79,7 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
             return existing;
         }
 
-        final FrontendClientMetadataBuilder client = new FrontendClientMetadataBuilder(shardName, id);
+        final FrontendClientMetadataBuilder client = new FrontendClientMetadataBuilder.Enabled(shardName, id);
         final FrontendClientMetadataBuilder previous = clients.put(id.getFrontendId(), client);
         if (previous != null) {
             LOG.debug("{}: Replaced client {} with {}", shardName, previous, client);
@@ -125,4 +127,23 @@ final class FrontendMetadata extends ShardDataTreeMetadata<FrontendShardDataTree
     @NonNull Map<FrontendIdentifier, LeaderFrontendState> toLeaderState(final @NonNull Shard shard) {
         return new HashMap<>(Maps.transformValues(clients, meta -> meta.toLeaderState(shard)));
     }
+
+    void disableTracking(final ClientIdentifier clientId) {
+        final FrontendIdentifier frontendId = clientId.getFrontendId();
+        final FrontendClientMetadataBuilder client = clients.get(frontendId);
+        if (client == null) {
+            LOG.debug("{}: disableTracking {} does not match any client, ignoring", shardName, clientId);
+            return;
+        }
+        if (!clientId.equals(client.getIdentifier())) {
+            LOG.debug("{}: disableTracking {} does not match client {}, ignoring", shardName, clientId, client);
+            return;
+        }
+        if (client instanceof FrontendClientMetadataBuilder.Disabled) {
+            LOG.debug("{}: client {} is has already disabled tracking", shardName, client);
+            return;
+        }
+
+        verify(clients.replace(frontendId, client, new FrontendClientMetadataBuilder.Disabled(shardName, clientId)));
+    }
 }
index 2a1537bd28ab1ca8f91b23da68ce81581af24cf5..295dbdc0058a129193bb83464f3b40a9b9dc852e 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -41,25 +42,206 @@ import org.slf4j.LoggerFactory;
  *
  * @author Robert Varga
  */
-final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
-    private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
+abstract class LeaderFrontendState implements Identifiable<ClientIdentifier> {
+    static final class Disabled extends LeaderFrontendState {
+        Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+            super(persistenceId, clientId, tree);
+        }
+
+        @Override
+        LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            throw new UnsupportedRequestException(request);
+        }
+
+        @Override
+        TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
+    static final class Enabled extends LeaderFrontendState {
+        // Histories which have not been purged
+        private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
+
+        // RangeSet performs automatic merging, hence we keep minimal state tracking information
+        private final UnsignedLongRangeSet purgedHistories;
+
+        // Used for all standalone transactions
+        private final AbstractFrontendHistory standaloneHistory;
+
+        private long expectedTxSequence;
+        private Long lastSeenHistory = null;
+
+        Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+            this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
+                StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
+        }
+
+        Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+                final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
+                final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
+            super(persistenceId, clientId, tree);
+            this.purgedHistories = requireNonNull(purgedHistories);
+            this.standaloneHistory = requireNonNull(standaloneHistory);
+            this.localHistories = requireNonNull(localHistories);
+        }
+
+        @Override
+        @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            checkRequestSequence(envelope);
+
+            try {
+                if (request instanceof CreateLocalHistoryRequest) {
+                    return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
+                } else if (request instanceof DestroyLocalHistoryRequest) {
+                    return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
+                } else if (request instanceof PurgeLocalHistoryRequest) {
+                    return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
+                } else {
+                    LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
+                    throw new UnsupportedRequestException(request);
+                }
+            } finally {
+                expectNextRequest();
+            }
+        }
+
+        @Override
+        @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            checkRequestSequence(envelope);
+
+            try {
+                final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
+                final AbstractFrontendHistory history;
+
+                if (lhId.getHistoryId() != 0) {
+                    history = localHistories.get(lhId);
+                    if (history == null) {
+                        if (purgedHistories.contains(lhId.getHistoryId())) {
+                            LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request);
+                            throw new DeadHistoryException(purgedHistories.toImmutable());
+                        }
+
+                        LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request);
+                        throw new UnknownHistoryException(lastSeenHistory);
+                    }
+                } else {
+                    history = standaloneHistory;
+                }
+
+                return history.handleTransactionRequest(request, envelope, now);
+            } finally {
+                expectNextRequest();
+            }
+        }
+
+        @Override
+        void reconnect() {
+            expectedTxSequence = 0;
+            super.reconnect();
+        }
+
+        @Override
+        void retire() {
+            super.retire();
+
+            // Clear out all transaction chains
+            localHistories.values().forEach(AbstractFrontendHistory::retire);
+            localHistories.clear();
+            standaloneHistory.retire();
+        }
+
+        @Override
+        ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+            return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories);
+        }
+
+        private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            final LocalHistoryIdentifier historyId = request.getTarget();
+            final AbstractFrontendHistory existing = localHistories.get(historyId);
+            if (existing != null) {
+                // History already exists: report success
+                LOG.debug("{}: history {} already exists", persistenceId(), historyId);
+                return new LocalHistorySuccess(historyId, request.getSequence());
+            }
+
+            // We have not found the history. Before we create it we need to check history ID sequencing so that we do
+            // not end up resurrecting a purged history.
+            if (purgedHistories.contains(historyId.getHistoryId())) {
+                LOG.debug("{}: rejecting purged request {}", persistenceId(), request);
+                throw new DeadHistoryException(purgedHistories.toImmutable());
+            }
+
+            // Update last history we have seen
+            if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+                lastSeenHistory = historyId.getHistoryId();
+            }
+
+            // We have to send the response only after persistence has completed
+            final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> {
+                LOG.debug("{}: persisted history {}", persistenceId(), historyId);
+                envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()),
+                    tree().readTime() - now);
+            });
+
+            localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain));
+            LOG.debug("{}: created history {}", persistenceId(), historyId);
+            return null;
+        }
+
+        private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
+                final RequestEnvelope envelope, final long now) {
+            final LocalHistoryIdentifier id = request.getTarget();
+            final LocalFrontendHistory existing = localHistories.get(id);
+            if (existing == null) {
+                // History does not exist: report success
+                LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id);
+                return new LocalHistorySuccess(id, request.getSequence());
+            }
+
+            existing.destroy(request.getSequence(), envelope, now);
+            return null;
+        }
+
+        private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
+                final RequestEnvelope envelope, final long now) {
+            final LocalHistoryIdentifier id = request.getTarget();
+            final LocalFrontendHistory existing = localHistories.remove(id);
+            if (existing == null) {
+                LOG.debug("{}: history {} has already been purged", persistenceId(), id);
+                return new LocalHistorySuccess(id, request.getSequence());
+            }
+
+            LOG.debug("{}: purging history {}", persistenceId(), id);
+            purgedHistories.add(id.getHistoryId());
+            existing.purge(request.getSequence(), envelope, now);
+            return null;
+        }
+
+        private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
+            if (expectedTxSequence != envelope.getTxSequence()) {
+                throw new OutOfSequenceEnvelopeException(expectedTxSequence);
+            }
+        }
 
-    // Histories which have not been purged
-    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
+        private void expectNextRequest() {
+            expectedTxSequence++;
+        }
+    }
 
-    // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final UnsignedLongRangeSet purgedHistories;
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
 
-    // Used for all standalone transactions
-    private final AbstractFrontendHistory standaloneHistory;
     private final ShardDataTree tree;
     private final ClientIdentifier clientId;
     private final String persistenceId;
 
     private long lastConnectTicks;
     private long lastSeenTicks;
-    private long expectedTxSequence;
-    private Long lastSeenHistory = null;
 
     // TODO: explicit failover notification
     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
@@ -71,150 +253,44 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
-        this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
-            StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
-    }
-
-    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
-        final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
-        final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
         this.persistenceId = requireNonNull(persistenceId);
         this.clientId = requireNonNull(clientId);
         this.tree = requireNonNull(tree);
-        this.purgedHistories = requireNonNull(purgedHistories);
-        this.standaloneHistory = requireNonNull(standaloneHistory);
-        this.localHistories = requireNonNull(localHistories);
         this.lastSeenTicks = tree.readTime();
     }
 
     @Override
-    public ClientIdentifier getIdentifier() {
+    public final ClientIdentifier getIdentifier() {
         return clientId;
     }
 
-    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
-        if (expectedTxSequence != envelope.getTxSequence()) {
-            throw new OutOfSequenceEnvelopeException(expectedTxSequence);
-        }
-    }
-
-    private void expectNextRequest() {
-        expectedTxSequence++;
+    final String persistenceId() {
+        return persistenceId;
     }
 
-    @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        checkRequestSequence(envelope);
-
-        try {
-            if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
-            } else if (request instanceof DestroyLocalHistoryRequest) {
-                return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
-            } else if (request instanceof PurgeLocalHistoryRequest) {
-                return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
-            } else {
-                LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
-                throw new UnsupportedRequestException(request);
-            }
-        } finally {
-            expectNextRequest();
-        }
+    final long getLastConnectTicks() {
+        return lastConnectTicks;
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        final LocalHistoryIdentifier historyId = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(historyId);
-        if (existing != null) {
-            // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, historyId);
-            return new LocalHistorySuccess(historyId, request.getSequence());
-        }
-
-        // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
-        // end up resurrecting a purged history.
-        if (purgedHistories.contains(historyId.getHistoryId())) {
-            LOG.debug("{}: rejecting purged request {}", persistenceId, request);
-            throw new DeadHistoryException(purgedHistories.toImmutable());
-        }
-
-        // Update last history we have seen
-        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
-            lastSeenHistory = historyId.getHistoryId();
-        }
-
-        // We have to send the response only after persistence has completed
-        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
-            LOG.debug("{}: persisted history {}", persistenceId, historyId);
-            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
-        });
-
-        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
-        LOG.debug("{}: created history {}", persistenceId, historyId);
-        return null;
+    final long getLastSeenTicks() {
+        return lastSeenTicks;
     }
 
-    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now) {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final LocalFrontendHistory existing = localHistories.get(id);
-        if (existing == null) {
-            // History does not exist: report success
-            LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
-        }
-
-        existing.destroy(request.getSequence(), envelope, now);
-        return null;
+    final ShardDataTree tree() {
+        return tree;
     }
 
-    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now) {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final LocalFrontendHistory existing = localHistories.remove(id);
-        if (existing == null) {
-            LOG.debug("{}: history {} has already been purged", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
-        }
-
-        LOG.debug("{}: purging history {}", persistenceId, id);
-        purgedHistories.add(id.getHistoryId());
-        existing.purge(request.getSequence(), envelope, now);
-        return null;
+    final void touch() {
+        this.lastSeenTicks = tree.readTime();
     }
 
-    @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        checkRequestSequence(envelope);
-
-        try {
-            final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
-            final AbstractFrontendHistory history;
-
-            if (lhId.getHistoryId() != 0) {
-                history = localHistories.get(lhId);
-                if (history == null) {
-                    if (purgedHistories.contains(lhId.getHistoryId())) {
-                        LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
-                        throw new DeadHistoryException(purgedHistories.toImmutable());
-                    }
-
-                    LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
-                    throw new UnknownHistoryException(lastSeenHistory);
-                }
-            } else {
-                history = standaloneHistory;
-            }
+    abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest<?> request,
+            RequestEnvelope envelope, long now) throws RequestException;
 
-            return history.handleTransactionRequest(request, envelope, now);
-        } finally {
-            expectNextRequest();
-        }
-    }
+    abstract @Nullable TransactionSuccess<?> handleTransactionRequest(TransactionRequest<?> request,
+            RequestEnvelope envelope, long now) throws RequestException;
 
     void reconnect() {
-        expectedTxSequence = 0;
         lastConnectTicks = tree.readTime();
     }
 
@@ -233,31 +309,14 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
                 }
             }
         }
-
-        // Clear out all transaction chains
-        localHistories.values().forEach(AbstractFrontendHistory::retire);
-        localHistories.clear();
-        standaloneHistory.retire();
-    }
-
-    long getLastConnectTicks() {
-        return lastConnectTicks;
     }
 
-    long getLastSeenTicks() {
-        return lastSeenTicks;
-    }
-
-    void touch() {
-        this.lastSeenTicks = tree.readTime();
+    @Override
+    public final String toString() {
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
     }
 
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(LeaderFrontendState.class)
-                .add("clientId", clientId)
-                .add("nanosAgo", tree.readTime() - lastSeenTicks)
-                .add("purgedHistories", purgedHistories)
-                .toString();
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks);
     }
 }
index 97b4a48f1023e3959d2056bc2f110cdc25f60e24..fb6b0142fe440905bbdd55c81c3ff23772ee8c9b 100644 (file)
@@ -7,6 +7,8 @@
  */
 package org.opendaylight.controller.cluster.datastore;
 
+import static com.google.common.base.Verify.verify;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.Cancellable;
@@ -83,6 +85,7 @@ import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContex
 import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
 import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DisableTrackingPayload;
 import org.opendaylight.controller.cluster.datastore.persisted.PurgeTransactionPayload;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.messaging.MessageSlicer;
@@ -436,6 +439,32 @@ public class Shard extends RaftActor {
         return Optional.of(state.getLastConnectTicks());
     }
 
+    private void disableTracking(final DisableTrackingPayload payload) {
+        final ClientIdentifier clientId = payload.getIdentifier();
+        LOG.debug("{}: disabling tracking of {}", persistenceId(), clientId);
+        frontendMetadata.disableTracking(clientId);
+
+        if (isLeader()) {
+            final FrontendIdentifier frontendId = clientId.getFrontendId();
+            final LeaderFrontendState frontend = knownFrontends.get(frontendId);
+            if (frontend != null) {
+                if (clientId.equals(frontend.getIdentifier())) {
+                    if (!(frontend instanceof LeaderFrontendState.Disabled)) {
+                        verify(knownFrontends.replace(frontendId, frontend,
+                            new LeaderFrontendState.Disabled(persistenceId(), clientId, store)));
+                        LOG.debug("{}: leader state for {} disabled", persistenceId(), clientId);
+                    } else {
+                        LOG.debug("{}: leader state {} is already disabled", persistenceId(), frontend);
+                    }
+                } else {
+                    LOG.debug("{}: leader state {} does not match {}", persistenceId(), frontend, clientId);
+                }
+            } else {
+                LOG.debug("{}: leader state for {} not found", persistenceId(), clientId);
+            }
+        }
+    }
+
     private void onMakeLeaderLocal() {
         LOG.debug("{}: onMakeLeaderLocal received", persistenceId());
         if (isLeader()) {
@@ -529,7 +558,7 @@ public class Shard extends RaftActor {
             final ABIVersion selectedVersion = selectVersion(message);
             final LeaderFrontendState frontend;
             if (existing == null) {
-                frontend = new LeaderFrontendState(persistenceId(), clientId, store);
+                frontend = new LeaderFrontendState.Enabled(persistenceId(), clientId, store);
                 knownFrontends.put(clientId.getFrontendId(), frontend);
                 LOG.debug("{}: created state {} for client {}", persistenceId(), frontend, clientId);
             } else {
@@ -873,6 +902,11 @@ public class Shard extends RaftActor {
     @Override
     protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
         if (data instanceof Payload) {
+            if (data instanceof DisableTrackingPayload) {
+                disableTracking((DisableTrackingPayload) data);
+                return;
+            }
+
             try {
                 store.applyReplicatedPayload(identifier, (Payload)data);
             } catch (DataValidationFailedException | IOException e) {
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/persisted/DisableTrackingPayload.java
new file mode 100644 (file)
index 0000000..29dd072
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Copyright (c) 2019 PANTHEON.tech, s.r.o. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.persisted;
+
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import java.io.DataInput;
+import java.io.IOException;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class DisableTrackingPayload extends AbstractIdentifiablePayload<ClientIdentifier> {
+    private static final class Proxy extends AbstractProxy<ClientIdentifier> {
+        @SuppressWarnings("checkstyle:RedundantModifier")
+        public Proxy() {
+            // For Externalizable
+        }
+
+        Proxy(final byte[] serialized) {
+            super(serialized);
+        }
+
+        @Override
+        protected ClientIdentifier readIdentifier(final DataInput in) throws IOException {
+            return ClientIdentifier.readFrom(in);
+        }
+
+        @Override
+        protected DisableTrackingPayload createObject(final ClientIdentifier identifier,
+                final byte[] serialized) {
+            return new DisableTrackingPayload(identifier, serialized);
+        }
+    }
+
+    private static final Logger LOG = LoggerFactory.getLogger(DisableTrackingPayload.class);
+    private static final long serialVersionUID = 1L;
+
+    DisableTrackingPayload(final ClientIdentifier clientId, final byte[] serialized) {
+        super(clientId, serialized);
+    }
+
+    public static DisableTrackingPayload create(final ClientIdentifier clientId,
+            final int initialSerializedBufferCapacity) {
+        final ByteArrayDataOutput out = ByteStreams.newDataOutput(initialSerializedBufferCapacity);
+        try {
+            clientId.writeTo(out);
+        } catch (IOException e) {
+            // This should never happen
+            LOG.error("Failed to serialize {}", clientId, e);
+            throw new RuntimeException("Failed to serialize " + clientId, e);
+        }
+        return new DisableTrackingPayload(clientId, out.toByteArray());
+    }
+
+    @Override
+    protected Proxy externalizableProxy(final byte[] serialized) {
+        return new Proxy(serialized);
+    }
+}