+ public static Map<Long, Object> get(String persistenceId) {
+ Map<Long, Object> journal = journals.get(persistenceId);
+ return journal != null ? journal : Collections.<Long, Object>emptyMap();
+ }
+
+ public static void waitForDeleteMessagesComplete(String persistenceId) {
+ assertEquals("Recovery complete", true, Uninterruptibles.awaitUninterruptibly(
+ deleteMessagesCompleteLatches.get(persistenceId), 5, TimeUnit.SECONDS));
+ }
+
+ public static void addDeleteMessagesCompleteLatch(String persistenceId) {
+ deleteMessagesCompleteLatches.put(persistenceId, new CountDownLatch(1));
+ }
+
+ public static void addBlockReadMessagesLatch(String persistenceId, CountDownLatch latch) {
+ blockReadMessagesLatches.put(persistenceId, latch);
+ }
+