+ private static final class TestingBucketStore extends BucketStore<T> {
+
+ private final List<ActorRef> toNotify = new ArrayList<>();
+
+ TestingBucketStore(final RemoteRpcProviderConfig config,
+ final String persistenceId,
+ final T initialData) {
+ super(config, persistenceId, initialData);
+ }
+
+ @Override
+ protected void handleCommand(Object message) throws Exception {
+ if (message instanceof WaitUntilDonePersisting) {
+ handlePersistAsk();
+ } else if (message instanceof SaveSnapshotSuccess) {
+ super.handleCommand(message);
+ handleSnapshotSuccess();
+ } else {
+ super.handleCommand(message);
+ }
+ }
+
+ private void handlePersistAsk() {
+ if (isPersisting()) {
+ toNotify.add(getSender());
+ } else {
+ getSender().tell(new Success(null), noSender());
+ }
+ }
+
+ private void handleSnapshotSuccess() {
+ toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
+ }
+ }
+
+ /**
+ * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
+ */
+ private static final class WaitUntilDonePersisting {
+
+ }