- return Futures.future(new Callable<Iterable<Optional<Exception>>>() {
- @Override
- public Iterable<Optional<Exception>> call() throws Exception {
- for (AtomicWrite write : messages) {
- // Copy to array - workaround for eclipse "ambiguous method" errors for toIterator, toIterable etc
- PersistentRepr[] array = new PersistentRepr[write.payload().size()];
- write.payload().copyToArray(array);
- for(PersistentRepr repr: array) {
- LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
- repr.sequenceNr(), repr.payload());
-
- addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
-
- WriteMessagesComplete complete = writeMessagesComplete.get(repr.persistenceId());
- if(complete != null) {
- if(complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
- complete.latch.countDown();
- }
+ return Futures.future(() -> {
+ for (AtomicWrite write : messages) {
+ for (PersistentRepr repr : CollectionConverters.asJava(write.payload())) {
+ LOG.trace("doAsyncWriteMessages: id: {}: seqNr: {}, payload: {}", repr.persistenceId(),
+ repr.sequenceNr(), repr.payload());
+
+ addEntry(repr.persistenceId(), repr.sequenceNr(), repr.payload());
+
+ WriteMessagesComplete complete = WRITE_MESSAGES_COMPLETE.get(repr.persistenceId());
+ if (complete != null) {
+ if (complete.ofType == null || complete.ofType.equals(repr.payload().getClass())) {
+ complete.latch.countDown();