private static Map<String, List<StoredSnapshot>> snapshots = new ConcurrentHashMap<>();
private static final Map<String, CountDownLatch> snapshotSavedLatches = new ConcurrentHashMap<>();
+ private static final Map<String, CountDownLatch> snapshotDeletedLatches = new ConcurrentHashMap<>();
public static void addSnapshot(String persistentId, Object snapshot) {
List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
snapshotSavedLatches.put(persistenceId, new CountDownLatch(1));
}
+ public static void addSnapshotDeletedLatch(String persistenceId) {
+ snapshotDeletedLatches.put(persistenceId, new CountDownLatch(1));
+ }
+
public static <T> T waitForSavedSnapshot(String persistenceId, Class<T> type) {
if(!Uninterruptibles.awaitUninterruptibly(snapshotSavedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
throw new AssertionError("Snapshot was not saved");
return getSnapshots(persistenceId, type).get(0);
}
+ public static void waitForDeletedSnapshot(String persistenceId) {
+ if(!Uninterruptibles.awaitUninterruptibly(snapshotDeletedLatches.get(persistenceId), 5, TimeUnit.SECONDS)) {
+ throw new AssertionError("Snapshot was not deleted");
+ }
+ }
+
@Override
public Future<Option<SelectedSnapshot>> doLoadAsync(String persistenceId,
SnapshotSelectionCriteria snapshotSelectionCriteria) {
public Future<Void> doSaveAsync(SnapshotMetadata snapshotMetadata, Object o) {
List<StoredSnapshot> snapshotList = snapshots.get(snapshotMetadata.persistenceId());
+ LOG.trace("doSaveAsync: persistentId {}: sequenceNr: {}: timestamp {}: {}", snapshotMetadata.persistenceId(),
+ snapshotMetadata.sequenceNr(), snapshotMetadata.timestamp(), o);
+
if(snapshotList == null){
snapshotList = new ArrayList<>();
snapshots.put(snapshotMetadata.persistenceId(), snapshotList);
snapshotSelectionCriteria.maxSequenceNr(), snapshotSelectionCriteria.maxTimestamp());
List<StoredSnapshot> snapshotList = snapshots.get(persistentId);
- if(snapshotList == null){
- return;
- }
-
- synchronized (snapshotList) {
- Iterator<StoredSnapshot> iter = snapshotList.iterator();
- while(iter.hasNext()) {
- StoredSnapshot s = iter.next();
- if(matches(s, snapshotSelectionCriteria)) {
- LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}",
- s.metadata.sequenceNr(), s.metadata.timestamp());
-
- iter.remove();
+ if(snapshotList != null){
+ synchronized (snapshotList) {
+ Iterator<StoredSnapshot> iter = snapshotList.iterator();
+ while(iter.hasNext()) {
+ StoredSnapshot s = iter.next();
+ if(matches(s, snapshotSelectionCriteria)) {
+ LOG.trace("Deleting snapshot for sequenceNr: {}, timestamp: {}: {}",
+ s.metadata.sequenceNr(), s.metadata.timestamp(), s.data);
+
+ iter.remove();
+ }
}
}
}
+
+ CountDownLatch latch = snapshotDeletedLatches.get(persistentId);
+ if(latch != null) {
+ latch.countDown();
+ }
}
private static class StoredSnapshot {
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SnapshotOffer;
+import akka.persistence.SnapshotSelectionCriteria;
import akka.serialization.Serialization;
import akka.util.Timeout;
import com.google.common.annotations.VisibleForTesting;
} else if(message instanceof ServerRemoved){
onShardReplicaRemoved((ServerRemoved) message);
} else if (message instanceof SaveSnapshotSuccess) {
- LOG.debug("{} saved ShardManager snapshot successfully", persistenceId());
+ onSaveSnapshotSuccess((SaveSnapshotSuccess)message);
} else if (message instanceof SaveSnapshotFailure) {
LOG.error("{}: SaveSnapshotFailure received for saving snapshot of shards",
persistenceId(), ((SaveSnapshotFailure) message).cause());
}
}
+ private void onSaveSnapshotSuccess (SaveSnapshotSuccess successMessage) {
+ LOG.debug ("{} saved ShardManager snapshot successfully. Deleting the prev snapshot if available",
+ persistenceId());
+ deleteSnapshots(new SnapshotSelectionCriteria(scala.Long.MaxValue(), (successMessage.metadata().timestamp() - 1)));
+ }
+
private static class ForwardedAddServerReply {
ShardInformation shardInfo;
AddServerReply addServerReply;
@Test
public void testAddShardReplica() throws Exception {
- LOG.info("testAddShardReplica starting");
-
MockConfiguration mockConfig =
new MockConfiguration(ImmutableMap.<String, List<String>>builder().
put("default", Arrays.asList("member-1", "member-2")).
put("astronauts", Arrays.asList("member-2")).build());
- String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+ final String shardManagerID = ShardManagerIdentifier.builder().type(shardMrgIDSuffix).build().toString();
+ datastoreContextBuilder.shardManagerPersistenceId(shardManagerID);
// Create an ActorSystem ShardManager actor for member-1.
final ActorSystem system1 = newActorSystem("Member1");
newReplicaShardManager.underlyingActor().waitForMemberUp();
leaderShardManager.underlyingActor().waitForMemberUp();
+ //Have a dummy snapshot to be overwritten by the new data persisted.
+ String[] restoredShards = {"default", "people"};
+ ShardManagerSnapshot snapshot = new ShardManagerSnapshot(Arrays.asList(restoredShards));
+ InMemorySnapshotStore.addSnapshot(shardManagerID, snapshot);
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.MILLISECONDS);
+
+ InMemorySnapshotStore.addSnapshotSavedLatch(shardManagerID);
+ InMemorySnapshotStore.addSnapshotDeletedLatch(shardManagerID);
+
//construct a mock response message
AddServerReply response = new AddServerReply(ServerChangeStatus.OK, memberId2);
mockShardLeaderActor.underlyingActor().updateResponse(response);
AddServer.class);
String addServerId = "member-1-shard-astronauts-" + shardMrgIDSuffix;
assertEquals("AddServer serverId", addServerId, addServerMsg.getNewServerId());
- newReplicaShardManager.underlyingActor()
- .verifySnapshotPersisted(Sets.newHashSet("default", "astronauts"));
expectMsgClass(duration("5 seconds"), Status.Success.class);
+
+ InMemorySnapshotStore.waitForSavedSnapshot(shardManagerID, ShardManagerSnapshot.class);
+ InMemorySnapshotStore.waitForDeletedSnapshot(shardManagerID);
+ List<ShardManagerSnapshot> persistedSnapshots =
+ InMemorySnapshotStore.getSnapshots(shardManagerID, ShardManagerSnapshot.class);
+ assertEquals("Number of snapshots persisted", 1, persistedSnapshots.size());
+ ShardManagerSnapshot shardManagerSnapshot = persistedSnapshots.get(0);
+ assertEquals("Persisted local shards", Sets.newHashSet("default", "astronauts"),
+ Sets.newHashSet(shardManagerSnapshot.getShardList()));
}};
LOG.info("testAddShardReplica ending");
public void saveSnapshot(Object obj) {
snapshot = (ShardManagerSnapshot) obj;
snapshotPersist.countDown();
+ super.saveSnapshot(obj);
}
void verifySnapshotPersisted(Set<String> shardList) {