Allow transaction tracking to be disabled
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
index 2a1537bd28ab1ca8f91b23da68ce81581af24cf5..295dbdc0058a129193bb83464f3b40a9b9dc852e 100644 (file)
@@ -10,6 +10,7 @@ package org.opendaylight.controller.cluster.datastore;
 import static java.util.Objects.requireNonNull;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.base.MoreObjects.ToStringHelper;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -41,25 +42,206 @@ import org.slf4j.LoggerFactory;
  *
  * @author Robert Varga
  */
-final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
-    private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
+abstract class LeaderFrontendState implements Identifiable<ClientIdentifier> {
+    static final class Disabled extends LeaderFrontendState {
+        Disabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+            super(persistenceId, clientId, tree);
+        }
+
+        @Override
+        LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            throw new UnsupportedRequestException(request);
+        }
+
+        @Override
+        TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            throw new UnsupportedRequestException(request);
+        }
+    }
+
+    static final class Enabled extends LeaderFrontendState {
+        // Histories which have not been purged
+        private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
+
+        // RangeSet performs automatic merging, hence we keep minimal state tracking information
+        private final UnsignedLongRangeSet purgedHistories;
+
+        // Used for all standalone transactions
+        private final AbstractFrontendHistory standaloneHistory;
+
+        private long expectedTxSequence;
+        private Long lastSeenHistory = null;
+
+        Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+            this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
+                StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
+        }
+
+        Enabled(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+                final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
+                final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
+            super(persistenceId, clientId, tree);
+            this.purgedHistories = requireNonNull(purgedHistories);
+            this.standaloneHistory = requireNonNull(standaloneHistory);
+            this.localHistories = requireNonNull(localHistories);
+        }
+
+        @Override
+        @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            checkRequestSequence(envelope);
+
+            try {
+                if (request instanceof CreateLocalHistoryRequest) {
+                    return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
+                } else if (request instanceof DestroyLocalHistoryRequest) {
+                    return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
+                } else if (request instanceof PurgeLocalHistoryRequest) {
+                    return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
+                } else {
+                    LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
+                    throw new UnsupportedRequestException(request);
+                }
+            } finally {
+                expectNextRequest();
+            }
+        }
+
+        @Override
+        @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            checkRequestSequence(envelope);
+
+            try {
+                final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
+                final AbstractFrontendHistory history;
+
+                if (lhId.getHistoryId() != 0) {
+                    history = localHistories.get(lhId);
+                    if (history == null) {
+                        if (purgedHistories.contains(lhId.getHistoryId())) {
+                            LOG.warn("{}: rejecting request {} to purged history", persistenceId(), request);
+                            throw new DeadHistoryException(purgedHistories.toImmutable());
+                        }
+
+                        LOG.warn("{}: rejecting unknown history request {}", persistenceId(), request);
+                        throw new UnknownHistoryException(lastSeenHistory);
+                    }
+                } else {
+                    history = standaloneHistory;
+                }
+
+                return history.handleTransactionRequest(request, envelope, now);
+            } finally {
+                expectNextRequest();
+            }
+        }
+
+        @Override
+        void reconnect() {
+            expectedTxSequence = 0;
+            super.reconnect();
+        }
+
+        @Override
+        void retire() {
+            super.retire();
+
+            // Clear out all transaction chains
+            localHistories.values().forEach(AbstractFrontendHistory::retire);
+            localHistories.clear();
+            standaloneHistory.retire();
+        }
+
+        @Override
+        ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+            return super.addToStringAttributes(helper).add("purgedHistories", purgedHistories);
+        }
+
+        private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+                final RequestEnvelope envelope, final long now) throws RequestException {
+            final LocalHistoryIdentifier historyId = request.getTarget();
+            final AbstractFrontendHistory existing = localHistories.get(historyId);
+            if (existing != null) {
+                // History already exists: report success
+                LOG.debug("{}: history {} already exists", persistenceId(), historyId);
+                return new LocalHistorySuccess(historyId, request.getSequence());
+            }
+
+            // We have not found the history. Before we create it we need to check history ID sequencing so that we do
+            // not end up resurrecting a purged history.
+            if (purgedHistories.contains(historyId.getHistoryId())) {
+                LOG.debug("{}: rejecting purged request {}", persistenceId(), request);
+                throw new DeadHistoryException(purgedHistories.toImmutable());
+            }
+
+            // Update last history we have seen
+            if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+                lastSeenHistory = historyId.getHistoryId();
+            }
+
+            // We have to send the response only after persistence has completed
+            final ShardDataTreeTransactionChain chain = tree().ensureTransactionChain(historyId, () -> {
+                LOG.debug("{}: persisted history {}", persistenceId(), historyId);
+                envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()),
+                    tree().readTime() - now);
+            });
+
+            localHistories.put(historyId, LocalFrontendHistory.create(persistenceId(), tree(), chain));
+            LOG.debug("{}: created history {}", persistenceId(), historyId);
+            return null;
+        }
+
+        private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
+                final RequestEnvelope envelope, final long now) {
+            final LocalHistoryIdentifier id = request.getTarget();
+            final LocalFrontendHistory existing = localHistories.get(id);
+            if (existing == null) {
+                // History does not exist: report success
+                LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId(), id);
+                return new LocalHistorySuccess(id, request.getSequence());
+            }
+
+            existing.destroy(request.getSequence(), envelope, now);
+            return null;
+        }
+
+        private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
+                final RequestEnvelope envelope, final long now) {
+            final LocalHistoryIdentifier id = request.getTarget();
+            final LocalFrontendHistory existing = localHistories.remove(id);
+            if (existing == null) {
+                LOG.debug("{}: history {} has already been purged", persistenceId(), id);
+                return new LocalHistorySuccess(id, request.getSequence());
+            }
+
+            LOG.debug("{}: purging history {}", persistenceId(), id);
+            purgedHistories.add(id.getHistoryId());
+            existing.purge(request.getSequence(), envelope, now);
+            return null;
+        }
+
+        private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
+            if (expectedTxSequence != envelope.getTxSequence()) {
+                throw new OutOfSequenceEnvelopeException(expectedTxSequence);
+            }
+        }
 
-    // Histories which have not been purged
-    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
+        private void expectNextRequest() {
+            expectedTxSequence++;
+        }
+    }
 
-    // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final UnsignedLongRangeSet purgedHistories;
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
 
-    // Used for all standalone transactions
-    private final AbstractFrontendHistory standaloneHistory;
     private final ShardDataTree tree;
     private final ClientIdentifier clientId;
     private final String persistenceId;
 
     private long lastConnectTicks;
     private long lastSeenTicks;
-    private long expectedTxSequence;
-    private Long lastSeenHistory = null;
 
     // TODO: explicit failover notification
     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
@@ -71,150 +253,44 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
-        this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
-            StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
-    }
-
-    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
-        final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
-        final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
         this.persistenceId = requireNonNull(persistenceId);
         this.clientId = requireNonNull(clientId);
         this.tree = requireNonNull(tree);
-        this.purgedHistories = requireNonNull(purgedHistories);
-        this.standaloneHistory = requireNonNull(standaloneHistory);
-        this.localHistories = requireNonNull(localHistories);
         this.lastSeenTicks = tree.readTime();
     }
 
     @Override
-    public ClientIdentifier getIdentifier() {
+    public final ClientIdentifier getIdentifier() {
         return clientId;
     }
 
-    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
-        if (expectedTxSequence != envelope.getTxSequence()) {
-            throw new OutOfSequenceEnvelopeException(expectedTxSequence);
-        }
-    }
-
-    private void expectNextRequest() {
-        expectedTxSequence++;
+    final String persistenceId() {
+        return persistenceId;
     }
 
-    @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        checkRequestSequence(envelope);
-
-        try {
-            if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
-            } else if (request instanceof DestroyLocalHistoryRequest) {
-                return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
-            } else if (request instanceof PurgeLocalHistoryRequest) {
-                return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
-            } else {
-                LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
-                throw new UnsupportedRequestException(request);
-            }
-        } finally {
-            expectNextRequest();
-        }
+    final long getLastConnectTicks() {
+        return lastConnectTicks;
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        final LocalHistoryIdentifier historyId = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(historyId);
-        if (existing != null) {
-            // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, historyId);
-            return new LocalHistorySuccess(historyId, request.getSequence());
-        }
-
-        // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
-        // end up resurrecting a purged history.
-        if (purgedHistories.contains(historyId.getHistoryId())) {
-            LOG.debug("{}: rejecting purged request {}", persistenceId, request);
-            throw new DeadHistoryException(purgedHistories.toImmutable());
-        }
-
-        // Update last history we have seen
-        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
-            lastSeenHistory = historyId.getHistoryId();
-        }
-
-        // We have to send the response only after persistence has completed
-        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
-            LOG.debug("{}: persisted history {}", persistenceId, historyId);
-            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
-        });
-
-        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
-        LOG.debug("{}: created history {}", persistenceId, historyId);
-        return null;
+    final long getLastSeenTicks() {
+        return lastSeenTicks;
     }
 
-    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now) {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final LocalFrontendHistory existing = localHistories.get(id);
-        if (existing == null) {
-            // History does not exist: report success
-            LOG.debug("{}: history {} does not exist, nothing to destroy", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
-        }
-
-        existing.destroy(request.getSequence(), envelope, now);
-        return null;
+    final ShardDataTree tree() {
+        return tree;
     }
 
-    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
-            final RequestEnvelope envelope, final long now) {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final LocalFrontendHistory existing = localHistories.remove(id);
-        if (existing == null) {
-            LOG.debug("{}: history {} has already been purged", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
-        }
-
-        LOG.debug("{}: purging history {}", persistenceId, id);
-        purgedHistories.add(id.getHistoryId());
-        existing.purge(request.getSequence(), envelope, now);
-        return null;
+    final void touch() {
+        this.lastSeenTicks = tree.readTime();
     }
 
-    @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope, final long now) throws RequestException {
-        checkRequestSequence(envelope);
-
-        try {
-            final LocalHistoryIdentifier lhId = request.getTarget().getHistoryId();
-            final AbstractFrontendHistory history;
-
-            if (lhId.getHistoryId() != 0) {
-                history = localHistories.get(lhId);
-                if (history == null) {
-                    if (purgedHistories.contains(lhId.getHistoryId())) {
-                        LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
-                        throw new DeadHistoryException(purgedHistories.toImmutable());
-                    }
-
-                    LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
-                    throw new UnknownHistoryException(lastSeenHistory);
-                }
-            } else {
-                history = standaloneHistory;
-            }
+    abstract @Nullable LocalHistorySuccess handleLocalHistoryRequest(LocalHistoryRequest<?> request,
+            RequestEnvelope envelope, long now) throws RequestException;
 
-            return history.handleTransactionRequest(request, envelope, now);
-        } finally {
-            expectNextRequest();
-        }
-    }
+    abstract @Nullable TransactionSuccess<?> handleTransactionRequest(TransactionRequest<?> request,
+            RequestEnvelope envelope, long now) throws RequestException;
 
     void reconnect() {
-        expectedTxSequence = 0;
         lastConnectTicks = tree.readTime();
     }
 
@@ -233,31 +309,14 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
                 }
             }
         }
-
-        // Clear out all transaction chains
-        localHistories.values().forEach(AbstractFrontendHistory::retire);
-        localHistories.clear();
-        standaloneHistory.retire();
-    }
-
-    long getLastConnectTicks() {
-        return lastConnectTicks;
     }
 
-    long getLastSeenTicks() {
-        return lastSeenTicks;
-    }
-
-    void touch() {
-        this.lastSeenTicks = tree.readTime();
+    @Override
+    public final String toString() {
+        return addToStringAttributes(MoreObjects.toStringHelper(this)).toString();
     }
 
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(LeaderFrontendState.class)
-                .add("clientId", clientId)
-                .add("nanosAgo", tree.readTime() - lastSeenTicks)
-                .add("purgedHistories", purgedHistories)
-                .toString();
+    ToStringHelper addToStringAttributes(final ToStringHelper helper) {
+        return helper.add("clientId", clientId).add("nanosAgo", tree.readTime() - lastSeenTicks);
     }
 }