+ private ActorRef createStore() {
+ return kit.childActorOf(Props.create(TestingBucketStore.class,
+ new RemoteRpcProviderConfig(system.settings().config()), "testStore", new T()));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<Address, Bucket<T>> getBuckets(final ActorRef store) throws Exception {
+ final GetAllBucketsReply<T> result = (GetAllBucketsReply) Await.result(Patterns.ask(store, new GetAllBuckets(),
+ Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf());
+ return result.getBuckets();
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map<Address, Long> getVersions(final ActorRef store) throws Exception {
+ return ((GetBucketVersionsReply) Await.result(Patterns.ask(store, new GetBucketVersions(),
+ Timeout.apply(1, TimeUnit.SECONDS)), Duration.Inf())).getVersions();
+ }
+
+ private static final class TestingBucketStore extends BucketStore<T> {
+
+ private final List<ActorRef> toNotify = new ArrayList<>();
+
+ TestingBucketStore(final RemoteRpcProviderConfig config,
+ final String persistenceId,
+ final T initialData) {
+ super(config, persistenceId, initialData);
+ }
+
+ @Override
+ protected void handleCommand(Object message) throws Exception {
+ if (message instanceof WaitUntilDonePersisting) {
+ handlePersistAsk();
+ } else if (message instanceof SaveSnapshotSuccess) {
+ super.handleCommand(message);
+ handleSnapshotSuccess();
+ } else {
+ super.handleCommand(message);
+ }
+ }
+
+ private void handlePersistAsk() {
+ if (isPersisting()) {
+ toNotify.add(getSender());
+ } else {
+ getSender().tell(new Success(null), noSender());
+ }
+ }
+
+ private void handleSnapshotSuccess() {
+ toNotify.forEach(ref -> ref.tell(new Success(null), noSender()));
+ }
+ }
+
+ /**
+ * Message sent to the TestingBucketStore that replies with success once the actor is done persisting.
+ */
+ private static final class WaitUntilDonePersisting {
+