BUG-8665: fix memory leak around RangeSets
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / LeaderFrontendState.java
index 2b18123..d860bfa 100644 (file)
@@ -22,7 +22,7 @@ import org.opendaylight.controller.cluster.access.commands.DeadHistoryException;
 import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
-import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.OutOfSequenceEnvelopeException;
 import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
@@ -91,9 +91,9 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         return clientId;
     }
 
-    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfOrderRequestException {
+    private void checkRequestSequence(final RequestEnvelope envelope) throws OutOfSequenceEnvelopeException {
         if (expectedTxSequence != envelope.getTxSequence()) {
-            throw new OutOfOrderRequestException(expectedTxSequence);
+            throw new OutOfSequenceEnvelopeException(expectedTxSequence);
         }
     }
 
@@ -107,12 +107,13 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request);
+                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof DestroyLocalHistoryRequest) {
                 return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
                 return handlePurgeHistory((PurgeLocalHistoryRequest)request, envelope, now);
             } else {
+                LOG.warn("{}: rejecting unsupported request {}", persistenceId, request);
                 throw new UnsupportedRequestException(request);
             }
         } finally {
@@ -120,30 +121,37 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(id);
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        final LocalHistoryIdentifier historyId = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(historyId);
         if (existing != null) {
             // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
+            LOG.debug("{}: history {} already exists", persistenceId, historyId);
+            return new LocalHistorySuccess(historyId, request.getSequence());
         }
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
-        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+        if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
-            throw new DeadHistoryException(lastSeenHistory.longValue());
+            throw new DeadHistoryException(purgedHistories);
         }
 
         // Update last history we have seen
-        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
-            lastSeenHistory = id.getHistoryId();
+        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+            lastSeenHistory = historyId.getHistoryId();
         }
 
-        localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
-        LOG.debug("{}: created history {}", persistenceId, id);
-        return new LocalHistorySuccess(id, request.getSequence());
+        // We have to send the response only after persistence has completed
+        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+            LOG.debug("{}: persisted history {}", persistenceId, historyId);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.readTime() - now);
+        });
+
+        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+        LOG.debug("{}: created history {}", persistenceId, historyId);
+        return null;
     }
 
     private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
@@ -171,7 +179,8 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
 
         LOG.debug("{}: purging history {}", persistenceId, id);
-        purgedHistories.add(Range.singleton(UnsignedLong.fromLongBits(id.getHistoryId())));
+        final UnsignedLong ul = UnsignedLong.fromLongBits(id.getHistoryId());
+        purgedHistories.add(Range.closedOpen(ul, UnsignedLong.ONE.plus(ul)));
         existing.purge(request.getSequence(), envelope, now);
         return null;
     }
@@ -187,7 +196,12 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             if (lhId.getHistoryId() != 0) {
                 history = localHistories.get(lhId);
                 if (history == null) {
-                    LOG.debug("{}: rejecting unknown history request {}", persistenceId, request);
+                    if (purgedHistories.contains(UnsignedLong.fromLongBits(lhId.getHistoryId()))) {
+                        LOG.warn("{}: rejecting request {} to purged history", persistenceId, request);
+                        throw new DeadHistoryException(purgedHistories);
+                    }
+
+                    LOG.warn("{}: rejecting unknown history request {}", persistenceId, request);
                     throw new UnknownHistoryException(lastSeenHistory);
                 }
             } else {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.