BUG-8618: add pause/unpause mechanics for tell-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
index 7e5addaefd5b8f8180ad4f39300af7b2f9b1e549..061e035c6647c883c84916005e0c4b81b512820f 100644 (file)
@@ -14,6 +14,7 @@ import com.google.common.collect.RangeSet;
 import com.google.common.collect.TreeRangeSet;
 import com.google.common.primitives.UnsignedLong;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
@@ -32,6 +33,7 @@ import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifie
 import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.UnsupportedRequestException;
+import org.opendaylight.controller.cluster.datastore.ShardDataTreeCohort.State;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -146,7 +148,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         // We have to send the response only after persistence has completed
         final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
             LOG.debug("{}: persisted history {}", persistenceId, historyId);
-            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.ticker().read() - now);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
         });
 
         localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
@@ -179,7 +181,8 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
 
         LOG.debug("{}: purging history {}", persistenceId, id);
-        purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+        final UnsignedLong ul = UnsignedLong.fromLongBits(id.getHistoryId());
+        purgedHistories.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
         existing.purge(request.getSequence(), envelope, now);
         return null;
     }
@@ -218,7 +221,25 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     void retire() {
-        // FIXME: flush all state
+        // Hunt down any transactions associated with this frontend
+        final Iterator<SimpleShardDataTreeCohort> it = tree.cohortIterator();
+        while (it.hasNext()) {
+            final SimpleShardDataTreeCohort cohort = it.next();
+            if (clientId.equals(cohort.getIdentifier().getHistoryId().getClientId())) {
+                if (cohort.getState() != State.COMMIT_PENDING) {
+                    LOG.debug("{}: Retiring transaction {}", persistenceId, cohort.getIdentifier());
+                    it.remove();
+                } else {
+                    LOG.debug("{}: Transaction {} already committing, not retiring it", persistenceId,
+                        cohort.getIdentifier());
+                }
+            }
+        }
+
+        // Clear out all transaction chains
+        localHistories.values().forEach(AbstractFrontendHistory::retire);
+        localHistories.clear();
+        standaloneHistory.retire();
     }
 
     @Override