CDS: Fix deleteSnapshots criteria in SnapshotManager 00/29900/4
authorTom Pantelis <tpanteli@brocade.com>
Wed, 18 Nov 2015 22:35:50 +0000 (17:35 -0500)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 20 Nov 2015 02:52:13 +0000 (02:52 +0000)
The SnapshotManager specifies a magic number 43200000 as the timestamp
for the criteria passed to deleteSnapshots. It's unclear where this
number came from but it prevents prior snapshots from getting deleted
as stored snapshot timestamps will be greater than this value (unless
one was created back in the 70's or 80's :)). Since the SnapshotManager
passes a valid upper bounds for the criteria's maxSequenceNr, I changed
it to pass Long.MAX_VALUE for the timestamp.

The ReplicationAndSnapshotsIntegrationTest actually verifies prior
snapshots were deleted by checking for size 1 when querying the
InMemorySnapshotStore. However this only passes b/c the
InMemorySnapshotStore::doDelete is incorrect in that it doesn't compare
the stored snapshot timestamp against the criteria timestamp. So I
changed the InMemorySnapshotStore to correctly compare the timestamps as
well. I found the source for an InMemorySnapshotStore on line and that's
what it does.

Change-Id: Ie7d5eec14f684a469f4b6ff84732c9a9c6042360
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java

index 10574e3a02cf881c689a88e6f81de0f021ffe70b..856c8f5ba8fbd1f21f8a6b9cbc33208013db1eb1 100644 (file)
@@ -139,8 +139,8 @@ public class SnapshotManager implements SnapshotState {
             lastLogEntryIndex = lastLogEntry.getIndex();
             lastLogEntryTerm = lastLogEntry.getTerm();
         } else {
-            LOG.debug("Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
-                lastAppliedIndex, lastAppliedTerm);
+            LOG.debug("{}: Capturing Snapshot : lastLogEntry is null. Using lastAppliedIndex {} and lastAppliedTerm {} instead.",
+                    persistenceId(), lastAppliedIndex, lastAppliedTerm);
         }
 
         return new CaptureSnapshot(lastLogEntryIndex, lastLogEntryTerm, lastAppliedIndex, lastAppliedTerm,
@@ -242,7 +242,7 @@ public class SnapshotManager implements SnapshotState {
 
             lastSequenceNumber = context.getPersistenceProvider().getLastSequenceNumber();
 
-            LOG.debug("lastSequenceNumber prior to capture: {}", lastSequenceNumber);
+            LOG.debug("{}: lastSequenceNumber prior to capture: {}", persistenceId(), lastSequenceNumber);
 
             SnapshotManager.this.currentState = CREATING;
 
@@ -382,7 +382,7 @@ public class SnapshotManager implements SnapshotState {
 
         @Override
         public void commit(long sequenceNumber, RaftActorBehavior currentBehavior) {
-            LOG.debug("Snapshot success sequence number: {}", sequenceNumber);
+            LOG.debug("{}: Snapshot success -  sequence number: {}", persistenceId(), sequenceNumber);
 
             if(applySnapshot != null) {
                 try {
@@ -400,14 +400,14 @@ public class SnapshotManager implements SnapshotState {
 
                     applySnapshot.getCallback().onSuccess();
                 } catch (Exception e) {
-                    LOG.error("Error applying snapshot", e);
+                    LOG.error("{}: Error applying snapshot", context.getId(), e);
                 }
             } else {
                 context.getReplicatedLog().snapshotCommit();
             }
 
             context.getPersistenceProvider().deleteSnapshots(new SnapshotSelectionCriteria(
-                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), 43200000));
+                    sequenceNumber - context.getConfigParams().getSnapshotBatchCount(), Long.MAX_VALUE));
 
             context.getPersistenceProvider().deleteMessages(lastSequenceNumber);
 
index e16d7949745004ca91dbf20e4d186beb8d84cf50..130c707e3a1f06d6b2c51b112eb4ba316e69d0ae 100644 (file)
@@ -14,7 +14,6 @@ import akka.persistence.SelectedSnapshot;
 import akka.persistence.SnapshotMetadata;
 import akka.persistence.SnapshotSelectionCriteria;
 import akka.persistence.snapshot.japi.SnapshotStore;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.Uninterruptibles;
 import java.util.ArrayList;
@@ -66,8 +65,8 @@ public class InMemorySnapshotStore extends SnapshotStore {
         synchronized (stored) {
             retList = Lists.newArrayListWithCapacity(stored.size());
             for(StoredSnapshot s: stored) {
-                if(type.isInstance(s.getData())) {
-                    retList.add((T) s.getData());
+                if(type.isInstance(s.data)) {
+                    retList.add((T) s.data);
                 }
             }
         }
@@ -92,17 +91,29 @@ public class InMemorySnapshotStore extends SnapshotStore {
     }
 
     @Override
-    public Future<Option<SelectedSnapshot>> doLoadAsync(String s,
-        SnapshotSelectionCriteria snapshotSelectionCriteria) {
-        List<StoredSnapshot> snapshotList = snapshots.get(s);
+    public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
+            SnapshotSelectionCriteria snapshotSelectionCriteria) {
+        List<StoredSnapshot> snapshotList = snapshots.get(persistenceId);
         if(snapshotList == null){
             return Futures.successful(Option.<SelectedSnapshot>none());
         }
 
-        StoredSnapshot snapshot = Iterables.getLast(snapshotList);
-        SelectedSnapshot selectedSnapshot =
-            new SelectedSnapshot(snapshot.getMetadata(), snapshot.getData());
-        return Futures.successful(Option.some(selectedSnapshot));
+        synchronized(snapshotList) {
+            for(int i = snapshotList.size() - 1; i >= 0; i--) {
+                StoredSnapshot snapshot = snapshotList.get(i);
+                if(matches(snapshot, snapshotSelectionCriteria)) {
+                    return Futures.successful(Option.some(new SelectedSnapshot(snapshot.metadata,
+                            snapshot.data)));
+                }
+            }
+        }
+
+        return Futures.successful(Option.<SelectedSnapshot>none());
+    }
+
+    private boolean matches(StoredSnapshot snapshot, SnapshotSelectionCriteria criteria) {
+        return snapshot.metadata.sequenceNr() <= criteria.maxSequenceNr() &&
+                snapshot.metadata.timestamp() <= criteria.maxTimestamp();
     }
 
     @Override
@@ -137,28 +148,24 @@ public class InMemorySnapshotStore extends SnapshotStore {
             return;
         }
 
-        int deleteIndex = -1;
-
         synchronized (snapshotList) {
             for(int i=0;i<snapshotList.size(); i++){
                 StoredSnapshot snapshot = snapshotList.get(i);
-                if(snapshotMetadata.equals(snapshot.getMetadata())){
-                    deleteIndex = i;
+                if(snapshotMetadata.equals(snapshot.metadata)){
+                    snapshotList.remove(i);
                     break;
                 }
             }
-
-            if(deleteIndex != -1){
-                snapshotList.remove(deleteIndex);
-            }
         }
     }
 
     @Override
     public void doDelete(String persistentId, SnapshotSelectionCriteria snapshotSelectionCriteria)
-        throws Exception {
-        List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
+            throws Exception {
+        LOG.trace("doDelete: persistentId {}: maxSequenceNr: {}: maxTimestamp {}", persistentId,
+                snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
 
+        List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
         if(snapshotList == null){
             return;
         }
@@ -167,10 +174,10 @@ public class InMemorySnapshotStore extends SnapshotStore {
             Iterator<StoredSnapshot> iter = snapshotList.iterator();
             while(iter.hasNext()) {
                 StoredSnapshot s = iter.next();
-                LOG.trace("doDelete: sequenceNr: {}, maxSequenceNr: {}", s.getMetadata().sequenceNr(),
-                        snapshotSelectionCriteria.maxSequenceNr());
+                if(matches(s, snapshotSelectionCriteria)) {
+                    LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}",
+                            s.metadata.sequenceNr(), s.metadata.timestamp());
 
-                if(s.getMetadata().sequenceNr() <= snapshotSelectionCriteria.maxSequenceNr()) {
                     iter.remove();
                 }
             }
@@ -185,13 +192,5 @@ public class InMemorySnapshotStore extends SnapshotStore {
             this.metadata = metadata;
             this.data = data;
         }
-
-        public SnapshotMetadata getMetadata() {
-            return metadata;
-        }
-
-        public Object getData() {
-            return data;
-        }
     }
 }