- public Future<Void> doAsyncWriteMessages(final Iterable<PersistentRepr> messages) {
- return Futures.future(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
- for (PersistentRepr repr : messages) {
- Map<Long, Object> journal = journals.get(repr.persistenceId());
- if(journal == null) {
- journal = Maps.newLinkedHashMap();
- journals.put(repr.persistenceId(), journal);
- }
-
- synchronized (journal) {
- LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
- repr.sequenceNr(), repr.payload());
- journal.put(repr.sequenceNr(), repr.payload());
- }
-
- CountDownLatch latch = writeMessagesCompleteLatches.get(repr.persistenceId());
- if(latch != null) {
- latch.countDown();
+ public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
+ return Futures.future(() -> {
+ for (AtomicWrite write : messages) {
+ // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc
+ PersistentRepr[] array = new PersistentRepr[write.payload().size()];
+ write.payload().copyToArray(array);
+ for (PersistentRepr repr: array) {
+ LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
+ repr.sequenceNr(), repr.payload());
+
+ addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
+
+ WriteMessagesComplete complete = WRITE_MESSAGES_COMPLETE.get(repr.persistenceId());
+ if (complete != null) {
+ if (complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
+ complete.latch.countDown();
+ }