From: kalaiselvik Date: Mon, 23 Nov 2015 08:49:30 +0000 (+0530) Subject: BUG 2187 - Removal of old snapshots in ShardManager X-Git-Tag: release/beryllium~115 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=06af5e406a6698a7211bf4e4435c6aa2e8e3f628 BUG 2187 - Removal of old snapshots in ShardManager In the ShardManager, when persisting the shard information as snapshots, old snapshots are still retained. These are removed when the latest snapshot persistence is successful. Change-Id: I6f138ae364bcaa577fa2e5e3758b3ca75486a469 Signed-off-by: kalaiselvik --- diff --git a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java index 130c707e3a..81b2d7844f 100644 --- a/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java +++ b/opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/utils/InMemorySnapshotStore.java @@ -39,6 +39,7 @@ public class InMemorySnapshotStore extends SnapshotStore { private static Map> snapshots = new ConcurrentHashMap<>(); private static final Map snapshotSavedLatches = new ConcurrentHashMap<>(); + private static final Map snapshotDeletedLatches = new ConcurrentHashMap<>(); public static void addSnapshot(String persistentId, Object snapshot) { List snapshotList = snapshots.get(persistentId); @@ -82,6 +83,10 @@ public class InMemorySnapshotStore extends SnapshotStore { snapshotSavedLatches.put(persistenceId, new CountDownLatch(1)); } + public static void addSnapshotDeletedLatch(String persistenceId) { + snapshotDeletedLatches.put(persistenceId, new CountDownLatch(1)); + } + public static T waitForSavedSnapshot(String persistenceId, Class type) { if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { throw new AssertionError("Snapshot was not saved"); @@ -90,6 +95,12 @@ public class InMemorySnapshotStore extends SnapshotStore { return getSnapshots(persistenceId, type).get(0); } + public static void waitForDeletedSnapshot(String persistenceId) { + if(!Uninterruptibles.awaitUninterruptibly(snapshotDeletedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) { + throw new AssertionError("Snapshot was not deleted"); + } + } + @Override public Future> doLoadAsync(String persistenceId, SnapshotSelectionCriteria snapshotSelectionCriteria) { @@ -120,6 +131,9 @@ public class InMemorySnapshotStore extends SnapshotStore { public Future doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) { List snapshotList = snapshots.get(snapshotMetadata.persistenceId()); + LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(), + snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), o); + if(snapshotList == null){ snapshotList = new ArrayList<>(); snapshots.put(snapshotMetadata.persistenceId(), snapshotList); @@ -166,22 +180,25 @@ public class InMemorySnapshotStore extends SnapshotStore { snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp()); List snapshotList = snapshots.get(persistentId); - if(snapshotList == null){ - return; - } - - synchronized (snapshotList) { - Iterator iter = snapshotList.iterator(); - while(iter.hasNext()) { - StoredSnapshot s = iter.next(); - if(matches(s, snapshotSelectionCriteria)) { - LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}", - s.metadata.sequenceNr(), s.metadata.timestamp()); - - iter.remove(); + if(snapshotList != null){ + synchronized (snapshotList) { + Iterator iter = snapshotList.iterator(); + while(iter.hasNext()) { + StoredSnapshot s = iter.next(); + if(matches(s, snapshotSelectionCriteria)) { + LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}", + s.metadata.sequenceNr(), s.metadata.timestamp(), s.data); + + iter.remove(); + } } } } + + CountDownLatch latch = snapshotDeletedLatches.get(persistentId); + if(latch != null) { + latch.countDown(); + } } private static class StoredSnapshot { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index ce473bbc49..d61e12e1cb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -24,6 +24,7 @@ import akka.persistence.RecoveryCompleted; import akka.persistence.SaveSnapshotFailure; import akka.persistence.SaveSnapshotSuccess; import akka.persistence.SnapshotOffer; +import akka.persistence.SnapshotSelectionCriteria; import akka.serialization.Serialization; import akka.util.Timeout; import com.google.common.annotations.VisibleForTesting; @@ -238,7 +239,7 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } else if(message instanceof ServerRemoved){ onShardReplicaRemoved((ServerRemoved) message); } else if (message instanceof SaveSnapshotSuccess) { - LOG.debug("{} saved ShardManager snapshot successfully", persistenceId()); + onSaveSnapshotSuccess((SaveSnapshotSuccess)message); } else if (message instanceof SaveSnapshotFailure) { LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards", persistenceId(), ((SaveSnapshotFailure) message).cause()); @@ -1125,6 +1126,12 @@ public class ShardManager extends AbstractUntypedPersistentActorWithMetering { } } + private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) { + LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available", + persistenceId()); + deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1))); + } + private static class ForwardedAddServerReply { ShardInformation shardInfo; AddServerReply addServerReply; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java index 2525ba78b9..5bccee198d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java @@ -1377,14 +1377,13 @@ public class ShardManagerTest extends AbstractActorTest { @Test public void testAddShardReplica() throws Exception { - LOG.info("testAddShardReplica starting"); - MockConfiguration mockConfig = new MockConfiguration(ImmutableMap.>builder(). put("default", Arrays.asList("member-1", "member-2")). put("astronauts", Arrays.asList("member-2")).build()); - String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString(); + datastoreContextBuilder.shardManagerPersistenceId(shardManagerID); // Create an ActorSystem ShardManager actor for member-1. final ActorSystem system1 = newActorSystem("Member1"); @@ -1422,6 +1421,15 @@ public class ShardManagerTest extends AbstractActorTest { newReplicaShardManager.underlyingActor().waitForMemberUp(); leaderShardManager.underlyingActor().waitForMemberUp(); + //Have a dummy snapshot to be overwritten by the new data persisted. + String[] restoredShards = {"default", "people"}; + ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards)); + InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot); + Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS); + + InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID); + InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID); + //construct a mock response message AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2); mockShardLeaderActor.underlyingActor().updateResponse(response); @@ -1430,9 +1438,16 @@ public class ShardManagerTest extends AbstractActorTest { AddServer.class); String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix; assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId()); - newReplicaShardManager.underlyingActor() - .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts")); expectMsgClass(duration("5 seconds"), Status.Success.class); + + InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class); + InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID); + List persistedSnapshots = + InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class); + assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size()); + ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0); + assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"), + Sets.newHashSet(shardManagerSnapshot.getShardList())); }}; LOG.info("testAddShardReplica ending"); @@ -1867,6 +1882,7 @@ public class ShardManagerTest extends AbstractActorTest { public void saveSnapshot(Object obj) { snapshot = (ShardManagerSnapshot) obj; snapshotPersist.countDown(); + super.saveSnapshot(obj); } void verifySnapshotPersisted(Set shardList) {