BUG-8618: add pause/unpause mechanics for tell-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
index 7e5addaefd5b8f8180ad4f39300af7b2f9b1e549..5a5e42637e6a5a4908a23dd85df8abd88589967b 100644 (file)
@@ -9,11 +9,8 @@ package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Range;
-import com.google.common.collect.RangeSet;
-import com.google.common.collect.TreeRangeSet;
-import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -32,6 +29,8 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
+import org.opendaylight.controller.cluster.datastore.utils.UnsignedLongRangeSet;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +49,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
 
     // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final RangeSet<UnsignedLong> purgedHistories;
+    private final UnsignedLongRangeSet purgedHistories;
 
     // Used for all standalone transactions
     private final AbstractFrontendHistory standaloneHistory;
@@ -71,12 +70,12 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
-        this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
-            clientId, tree), new HashMap<>());
+        this(persistenceId, clientId, tree, UnsignedLongRangeSet.create(),
+            StandaloneFrontendHistory.create(persistenceId, clientId, tree), new HashMap<>());
     }
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
-        final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+        final UnsignedLongRangeSet purgedHistories, final AbstractFrontendHistory standaloneHistory,
         final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
@@ -133,9 +132,9 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
-        if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
+        if (purgedHistories.contains(historyId.getHistoryId())) {
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
-            throw new DeadHistoryException(purgedHistories);
+            throw new DeadHistoryException(purgedHistories.toImmutable());
         }
 
         // Update last history we have seen
@@ -146,7 +145,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         // We have to send the response only after persistence has completed
         final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
             LOG.debug("{}: persisted history {}", persistenceId, historyId);
-            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.ticker().read() - now);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
         });
 
         localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
@@ -179,7 +178,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
 
         LOG.debug("{}: purging history {}", persistenceId, id);
-        purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+        purgedHistories.add(id.getHistoryId());
         existing.purge(request.getSequence(), envelope, now);
         return null;
     }
@@ -195,9 +194,9 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             if (lhId.getHistoryId() != 0) {
                 history = localHistories.get(lhId);
                 if (history == null) {
-                    if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) {
+                    if (purgedHistories.contains(lhId.getHistoryId())) {
                         LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
-                        throw new DeadHistoryException(purgedHistories);
+                        throw new DeadHistoryException(purgedHistories.toImmutable());
                     }
 
                     LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
@@ -218,7 +217,25 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     void retire() {
-        // FIXME: flush all state
+        // Hunt down any transactions associated with this frontend
+        final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+        while (it.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = it.next();
+            if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+                if (cohort.getState() != State.COMMIT_PENDING) {
+                    LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+                    it.remove();
+                } else {
+                    LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+                        cohort.getIdentifier());
+                }
+            }
+        }
+
+        // Clear out all transaction chains
+        localHistories.values().forEach(AbstractFrontendHistory::retire);
+        localHistories.clear();
+        standaloneHistory.retire();
     }
 
     @Override