Cleanup warnings
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
index deca605a5b97ec18d041d050d5fe0c467feb436e..d860bfa289e39f7b9a144aa652a5a030ec4c4546 100644 (file)
@@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
-import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
@@ -47,10 +47,10 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private static final Logger LOG = LoggerFactory.getLogger(LeaderFrontendState.class);
 
     // Histories which have not been purged
-    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories = new HashMap<>();
+    private final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories;
 
     // RangeSet performs automatic merging, hence we keep minimal state tracking information
-    private final RangeSet<UnsignedLong> purgedHistories = TreeRangeSet.create();
+    private final RangeSet<UnsignedLong> purgedHistories;
 
     // Used for all standalone transactions
     private final AbstractFrontendHistory standaloneHistory;
@@ -61,7 +61,6 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     private long expectedTxSequence;
     private Long lastSeenHistory = null;
 
-
     // TODO: explicit failover notification
     //       Record the ActorRef for the originating actor and when we switch to being a leader send a notification
     //       to the frontend client -- that way it can immediately start sending requests
@@ -72,10 +71,19 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     // - per-RequestException throw counters
 
     LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
+        this(persistenceId, clientId, tree, TreeRangeSet.create(), StandaloneFrontendHistory.create(persistenceId,
+            clientId, tree), new HashMap<>());
+    }
+
+    LeaderFrontendState(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree,
+        final RangeSet<UnsignedLong> purgedHistories, final AbstractFrontendHistory standaloneHistory,
+        final Map<LocalHistoryIdentifier, LocalFrontendHistory> localHistories) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
         this.tree = Preconditions.checkNotNull(tree);
-        standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
+        this.purgedHistories = Preconditions.checkNotNull(purgedHistories);
+        this.standaloneHistory = Preconditions.checkNotNull(standaloneHistory);
+        this.localHistories = Preconditions.checkNotNull(localHistories);
     }
 
     @Override
@@ -83,9 +91,9 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         return clientId;
     }
 
-    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException {
+    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
         if (expectedTxSequence != envelope.getTxSequence()) {
-            throw new OutOfOrderRequestException(expectedTxSequence);
+            throw new OutOfSequenceEnvelopeException(expectedTxSequence);
         }
     }
 
@@ -99,12 +107,13 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request);
+                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof DestroyLocalHistoryRequest) {
                 return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
                 return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
             } else {
+                LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
                 throw new UnsupportedRequestException(request);
             }
         } finally {
@@ -112,30 +121,37 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(id);
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        final LocalHistoryIdentifier historyId = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(historyId);
         if (existing != null) {
             // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
+            LOG.debug("{}: history {} already exists", persistenceId, historyId);
+            return new LocalHistorySuccess(historyId, request.getSequence());
         }
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
-        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+        if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
-            throw new DeadHistoryException(lastSeenHistory.longValue());
+            throw new DeadHistoryException(purgedHistories);
         }
 
         // Update last history we have seen
-        if (lastSeenHistory != null && Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
-            lastSeenHistory = id.getHistoryId();
+        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+            lastSeenHistory = historyId.getHistoryId();
         }
 
-        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(id)));
-        LOG.debug("{}: created history {}", persistenceId, id);
-        return new LocalHistorySuccess(id, request.getSequence());
+        // We have to send the response only after persistence has completed
+        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+            LOG.debug("{}: persisted history {}", persistenceId, historyId);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
+        });
+
+        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+        LOG.debug("{}: created history {}", persistenceId, historyId);
+        return null;
     }
 
     private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
@@ -163,7 +179,8 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
 
         LOG.debug("{}: purging history {}", persistenceId, id);
-        purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+        final UnsignedLong ul = UnsignedLong.fromLongBits(id.getHistoryId());
+        purgedHistories.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
         existing.purge(request.getSequence(), envelope, now);
         return null;
     }
@@ -179,7 +196,12 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             if (lhId.getHistoryId() != 0) {
                 history = localHistories.get(lhId);
                 if (history == null) {
-                    LOG.debug("{}: rejecting unknown history request {}", persistenceId, request);
+                    if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) {
+                        LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
+                        throw new DeadHistoryException(purgedHistories);
+                    }
+
+                    LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
                     throw new UnknownHistoryException(lastSeenHistory);
                 }
             } else {