BUG 2187 - Removal of old snapshots in ShardManager 50/29850/5
authorkalaiselvik <Kalaiselvi_K@Dell.com>
Mon, 23 Nov 2015 08:49:30 +0000 (14:19 +0530)
committerTom Pantelis <tpanteli@brocade.com>
Tue, 1 Dec 2015 15:53:55 +0000 (10:53 -0500)
In the ShardManager, when persisting the shard information as
snapshots, old snapshots are still retained. These are removed
when the latest snapshot persistence is successful.

Change-Id: I6f138ae364bcaa577fa2e5e3758b3ca75486a469
Signed-off-by: kalaiselvik <Kalaiselvi_K@Dell.com>
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java

index 130c707..81b2d78 100644 (file)
@@ -39,6 +39,7 @@ public class InMemorySnapshotStore extends SnapshotStore {
 
     private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
     private static final Map<String, CountDownLatch> snapshotSavedLatches = new ConcurrentHashMap<>();
+    private static final Map<String, CountDownLatch> snapshotDeletedLatches = new ConcurrentHashMap<>();
 
     public static void addSnapshot(String persistentId, Object snapshot) {
         List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
@@ -82,6 +83,10 @@ public class InMemorySnapshotStore extends SnapshotStore {
         snapshotSavedLatches.put(persistenceId, new CountDownLatch(1));
     }
 
+    public static void addSnapshotDeletedLatch(String persistenceId) {
+        snapshotDeletedLatches.put(persistenceId, new CountDownLatch(1));
+    }
+
     public static <T> T waitForSavedSnapshot(String persistenceId, Class<T> type) {
         if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
             throw new AssertionError("Snapshot was not saved");
@@ -90,6 +95,12 @@ public class InMemorySnapshotStore extends SnapshotStore {
         return getSnapshots(persistenceId, type).get(0);
     }
 
+    public static void waitForDeletedSnapshot(String persistenceId) {
+        if(!Uninterruptibles.awaitUninterruptibly(snapshotDeletedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+            throw new AssertionError("Snapshot was not deleted");
+        }
+    }
+
     @Override
     public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
             SnapshotSelectionCriteria snapshotSelectionCriteria) {
@@ -120,6 +131,9 @@ public class InMemorySnapshotStore extends SnapshotStore {
     public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
         List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
 
+        LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(),
+                snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), o);
+
         if(snapshotList == null){
             snapshotList = new ArrayList<>();
             snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
@@ -166,22 +180,25 @@ public class InMemorySnapshotStore extends SnapshotStore {
                 snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
 
         List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
-        if(snapshotList == null){
-            return;
-        }
-
-        synchronized (snapshotList) {
-            Iterator<StoredSnapshot> iter = snapshotList.iterator();
-            while(iter.hasNext()) {
-                StoredSnapshot s = iter.next();
-                if(matches(s, snapshotSelectionCriteria)) {
-                    LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}",
-                            s.metadata.sequenceNr(), s.metadata.timestamp());
-
-                    iter.remove();
+        if(snapshotList != null){
+            synchronized (snapshotList) {
+                Iterator<StoredSnapshot> iter = snapshotList.iterator();
+                while(iter.hasNext()) {
+                    StoredSnapshot s = iter.next();
+                    if(matches(s, snapshotSelectionCriteria)) {
+                        LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}",
+                                s.metadata.sequenceNr(), s.metadata.timestamp(), s.data);
+
+                        iter.remove();
+                    }
                 }
             }
         }
+
+        CountDownLatch latch = snapshotDeletedLatches.get(persistentId);
+        if(latch != null) {
+            latch.countDown();
+        }
     }
 
     private static class StoredSnapshot {
index ce473bb..d61e12e 100644 (file)
@@ -24,6 +24,7 @@ import akka.persistence.RecoveryCompleted;
 import akka.persistence.SaveSnapshotFailure;
 import akka.persistence.SaveSnapshotSuccess;
 import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
 import akka.serialization.Serialization;
 import akka.util.Timeout;
 import com.google.common.annotations.VisibleForTesting;
@@ -238,7 +239,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         } else if(message instanceof ServerRemoved){
             onShardReplicaRemoved((ServerRemoved) message);
         } else if (message instanceof SaveSnapshotSuccess) {
-            LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
+            onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
         } else if (message instanceof SaveSnapshotFailure) {
             LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
                     persistenceId(), ((SaveSnapshotFailure) message).cause());
@@ -1125,6 +1126,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering {
         }
     }
 
+    private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+        LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+            persistenceId());
+        deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+    }
+
     private static class ForwardedAddServerReply {
         ShardInformation shardInfo;
         AddServerReply addServerReply;
index 2525ba7..5bccee1 100644 (file)
@@ -1377,14 +1377,13 @@ public class ShardManagerTest extends AbstractActorTest {
 
     @Test
     public void testAddShardReplica() throws Exception {
-        LOG.info("testAddShardReplica starting");
-
         MockConfiguration mockConfig =
                 new MockConfiguration(ImmutableMap.<String, List<String>>builder().
                    put("default", Arrays.asList("member-1", "member-2")).
                    put("astronauts", Arrays.asList("member-2")).build());
 
-        String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+        final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+        datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
 
         // Create an ActorSystem ShardManager actor for member-1.
         final ActorSystem system1 = newActorSystem("Member1");
@@ -1422,6 +1421,15 @@ public class ShardManagerTest extends AbstractActorTest {
             newReplicaShardManager.underlyingActor().waitForMemberUp();
             leaderShardManager.underlyingActor().waitForMemberUp();
 
+            //Have a dummy snapshot to be overwritten by the new data persisted.
+            String[] restoredShards = {"default", "people"};
+            ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+            InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
+
+            InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
+            InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
+
             //construct a mock response message
             AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
             mockShardLeaderActor.underlyingActor().updateResponse(response);
@@ -1430,9 +1438,16 @@ public class ShardManagerTest extends AbstractActorTest {
                 AddServer.class);
             String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
             assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
-            newReplicaShardManager.underlyingActor()
-                .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
             expectMsgClass(duration("5 seconds"), Status.Success.class);
+
+            InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
+            InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
+            List<ShardManagerSnapshot> persistedSnapshots =
+                InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
+            assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
+            ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
+            assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
+                    Sets.newHashSet(shardManagerSnapshot.getShardList()));
         }};
 
         LOG.info("testAddShardReplica ending");
@@ -1867,6 +1882,7 @@ public class ShardManagerTest extends AbstractActorTest {
         public void saveSnapshot(Object obj) {
             snapshot = (ShardManagerSnapshot) obj;
             snapshotPersist.countDown();
+            super.saveSnapshot(obj);
         }
 
         void verifySnapshotPersisted(Set<String> shardList) {