BUG-8618: add pause/unpause mechanics for tell-based protocol
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / AbstractFrontendHistory.java
index 8262afa099154c32ffea89a9ad3896d709556f01..c1d2db309410dd831a31b3ec4845f02534749c9b 100644 (file)
@@ -21,6 +21,7 @@ import org.opendaylight.controller.cluster.access.commands.AbstractReadTransacti
 import org.opendaylight.controller.cluster.access.commands.ClosedTransactionException;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
+import org.opendaylight.controller.cluster.access.commands.IncrementTransactionSequenceRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
 import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
@@ -69,7 +70,7 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
     }
 
     final long readTime() {
-        return tree.ticker().read();
+        return tree.readTime();
     }
 
     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
@@ -93,7 +94,7 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
                         closedTransactions = ImmutableMap.of();
                     }
 
-                    purgedTransactions.add(Range.singleton(ul));
+                    purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
                     LOG.debug("{}: finished purging inherited transaction {}", persistenceId(), id);
                     envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
                 });
@@ -106,12 +107,12 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
                 // purged transactions in one go. If it does, we warn about the situation and
                 LOG.warn("{}: transaction {} not tracked in {}, but not present in active transactions", persistenceId,
                     id, purgedTransactions);
-                purgedTransactions.add(Range.singleton(ul));
+                purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
                 return new TransactionPurgeResponse(id, request.getSequence());
             }
 
             tree.purgeTransaction(id, () -> {
-                purgedTransactions.add(Range.singleton(ul));
+                purgedTransactions.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
                 transactions.remove(id);
                 LOG.debug("{}: finished purging transaction {}", persistenceId(), id);
                 envelope.sendSuccess(new TransactionPurgeResponse(id, request.getSequence()), readTime() - now);
@@ -135,13 +136,13 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
         if (tx == null) {
             // The transaction does not exist and we are about to create it, check sequence number
             if (request.getSequence() != 0) {
-                LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+                LOG.warn("{}: no transaction state present, unexpected request {}", persistenceId(), request);
                 throw new OutOfOrderRequestException(0);
             }
 
             tx = createTransaction(request, id);
             transactions.put(id, tx);
-        } else {
+        } else if (!(request instanceof IncrementTransactionSequenceRequest)) {
             final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
             if (maybeReplay.isPresent()) {
                 final TransactionSuccess<?> replay = maybeReplay.get();
@@ -153,18 +154,23 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
         return tx.handleRequest(request, envelope, now);
     }
 
-    void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
+    final void destroy(final long sequence, final RequestEnvelope envelope, final long now) {
         LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
         tree.closeTransactionChain(getIdentifier(),
             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
     }
 
-    void purge(final long sequence, final RequestEnvelope envelope, final long now) {
+    final void purge(final long sequence, final RequestEnvelope envelope, final long now) {
         LOG.debug("{}: purging history {}", persistenceId(), getIdentifier());
         tree.purgeTransactionChain(getIdentifier(),
             () -> envelope.sendSuccess(new LocalHistorySuccess(getIdentifier(), sequence), readTime() - now));
     }
 
+    final void retire() {
+        transactions.values().forEach(FrontendTransaction::retire);
+        tree.removeTransactionChain(getIdentifier());
+    }
+
     private FrontendTransaction createTransaction(final TransactionRequest<?> request, final TransactionIdentifier id)
             throws RequestException {
         if (request instanceof CommitLocalTransactionRequest) {