BUG-8618: add pause/unpause mechanics for tell-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
index 3c65b799d1831c498c6321df878305fb2c56a737..5a5e42637e6a5a4908a23dd85df8abd88589967b 100644 (file)
@@ -9,11 +9,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import com.google.common.collect.TreeRangeSet;
-import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -22,7 +19,7 @@ import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
-import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
@@ -32,6 +29,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,10 +46,10 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
 
     // Histories which have not been purged
-    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
 
     // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+    private final UnsignedLongRangeSet purgedHistories;
 
     // Used for all standalone transactions
     private final AbstractFrontendHistory standaloneHistory;
@@ -61,7 +60,6 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private long expectedTxSequence;
     private Long lastSeenHistory = null;
 
-
     // TODO: explicit failover notification
     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
     //       to the frontend client -- that way it can immediately start sending requests
@@ -72,10 +70,19 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+        this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
+            StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
+    }
+
+    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+        final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
+        final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
         this.tree = Preconditions.checkNotNull(tree);
-        standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+        this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
+        this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
+        this.localHistories = Preconditions.checkNotNull(localHistories);
     }
 
     @Override
@@ -83,9 +90,9 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         return clientId;
     }
 
-    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException {
+    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
         if (expectedTxSequence != envelope.getTxSequence()) {
-            throw new OutOfOrderRequestException(expectedTxSequence);
+            throw new OutOfSequenceEnvelopeException(expectedTxSequence);
         }
     }
 
@@ -94,17 +101,18 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         checkRequestSequence(envelope);
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request);
+                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof DestroyLocalHistoryRequest) {
-                return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+                return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
-                return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+                return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
             } else {
+                LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
                 throw new UnsupportedRequestException(request);
             }
         } finally {
@@ -112,33 +120,42 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(id);
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        final LocalHistoryIdentifier historyId = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(historyId);
         if (existing != null) {
             // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
+            LOG.debug("{}: history {} already exists", persistenceId, historyId);
+            return new LocalHistorySuccess(historyId, request.getSequence());
         }
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
-        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+        if (purgedHistories.contains(historyId.getHistoryId())) {
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
-            throw new DeadHistoryException(lastSeenHistory.longValue());
+            throw new DeadHistoryException(purgedHistories.toImmutable());
         }
 
         // Update last history we have seen
-        if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
-            lastSeenHistory = id.getHistoryId();
+        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+            lastSeenHistory = historyId.getHistoryId();
         }
 
-        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
-        LOG.debug("{}: created history {}", persistenceId, id);
-        return new LocalHistorySuccess(id, request.getSequence());
+        // We have to send the response only after persistence has completed
+        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+            LOG.debug("{}: persisted history {}", persistenceId, historyId);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
+        });
+
+        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+        LOG.debug("{}: created history {}", persistenceId, historyId);
+        return null;
     }
 
-    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now)
+            throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.get(id);
         if (existing == null) {
@@ -147,32 +164,27 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             return new LocalHistorySuccess(id, request.getSequence());
         }
 
-        return existing.destroy(request.getSequence());
+        existing.destroy(request.getSequence(), envelope, now);
+        return null;
     }
 
-    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.remove(id);
-        if (existing != null) {
-            purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
-
-            if (!existing.isDestroyed()) {
-                LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
-                existing.destroy(request.getSequence());
-            }
-
-            // FIXME: record a PURGE tombstone in the journal
-
-            LOG.debug("{}: purged history {}", persistenceId, id);
-        } else {
+        if (existing == null) {
             LOG.debug("{}: history {} has already been purged", persistenceId, id);
+            return new LocalHistorySuccess(id, request.getSequence());
         }
 
-        return new LocalHistorySuccess(id, request.getSequence());
+        LOG.debug("{}: purging history {}", persistenceId, id);
+        purgedHistories.add(id.getHistoryId());
+        existing.purge(request.getSequence(), envelope, now);
+        return null;
     }
 
     @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         checkRequestSequence(envelope);
 
         try {
@@ -182,14 +194,19 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             if (lhId.getHistoryId() != 0) {
                 history = localHistories.get(lhId);
                 if (history == null) {
-                    LOG.debug("{}: rejecting unknown history request {}", persistenceId, request);
+                    if (purgedHistories.contains(lhId.getHistoryId())) {
+                        LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
+                        throw new DeadHistoryException(purgedHistories.toImmutable());
+                    }
+
+                    LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
                     throw new UnknownHistoryException(lastSeenHistory);
                 }
             } else {
                 history = standaloneHistory;
             }
 
-            return history.handleTransactionRequest(request, envelope);
+            return history.handleTransactionRequest(request, envelope, now);
         } finally {
             expectNextRequest();
         }
@@ -200,7 +217,25 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     void retire() {
-        // FIXME: flush all state
+        // Hunt down any transactions associated with this frontend
+        final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+        while (it.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = it.next();
+            if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+                if (cohort.getState() != State.COMMIT_PENDING) {
+                    LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+                    it.remove();
+                } else {
+                    LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+                        cohort.getIdentifier());
+                }
+            }
+        }
+
+        // Clear out all transaction chains
+        localHistories.values().forEach(AbstractFrontendHistory::retire);
+        localHistories.clear();
+        standaloneHistory.retire();
     }
 
     @Override