import akka.dispatch.OnComplete;
import akka.japi.Creator;
import akka.pattern.Patterns;
+import akka.persistence.SaveSnapshotSuccess;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
import com.google.common.base.Function;
@SuppressWarnings("serial")
public void testCreateSnapshot(final boolean persistent, final String shardActorName) throws Exception{
+ final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
+
final AtomicReference<Object> savedSnapshot = new AtomicReference<>();
class TestPersistentDataProvider extends DelegatingPersistentDataProvider {
TestPersistentDataProvider(DataPersistenceProvider delegate) {
dataStoreContextBuilder.persistent(persistent);
new ShardTestKit(getSystem()) {{
- final AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
-
class TestShard extends Shard {
protected TestShard(ShardIdentifier name, Map<String, String> peerAddresses,
}
@Override
- protected void commitSnapshot(final long sequenceNumber) {
- super.commitSnapshot(sequenceNumber);
- latch.get().countDown();
+ public void handleCommand(Object message) {
+ super.handleCommand(message);
+
+ if (message instanceof SaveSnapshotSuccess || message.equals("commit_snapshot")) {
+ latch.get().countDown();
+ }
}
@Override