@Override
@SuppressWarnings("checkstyle:illegalCatch")
- void handleWriteMessages(final WriteMessages message) {
+ long handleWriteMessages(final WriteMessages message) {
final int count = message.size();
final SegmentedJournalWriter<DataJournalEntry> writer = entries.writer();
+ long bytes = 0;
for (int i = 0; i < count; ++i) {
final long mark = writer.getLastIndex();
final List<PersistentRepr> reprs = CollectionConverters.asJava(request.payload());
LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
try {
- for (PersistentRepr repr : reprs) {
- final Object payload = repr.payload();
- if (!(payload instanceof Serializable)) {
- throw new UnsupportedOperationException("Non-serializable payload encountered "
- + payload.getClass());
- }
-
- LOG.trace("{}: starting append of {}", persistenceId, payload);
- final Indexed<ToPersistence> entry = writer.append(new ToPersistence(repr));
- final int size = entry.size();
- LOG.trace("{}: finished append of {} with {} bytes at {}", persistenceId, payload, size,
- entry.index());
- recordMessageSize(size);
- }
+ bytes += writePayload(writer, reprs);
} catch (Exception e) {
- LOG.warn("{}: failed to write out request", persistenceId, e);
+ LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e);
message.setFailure(i, e);
writer.truncate(mark);
continue;
message.setSuccess(i);
}
writer.flush();
+ return bytes;
+ }
+
+ private long writePayload(final SegmentedJournalWriter<DataJournalEntry> writer, final List<PersistentRepr> reprs) {
+ long bytes = 0;
+ for (PersistentRepr repr : reprs) {
+ final Object payload = repr.payload();
+ if (!(payload instanceof Serializable)) {
+ throw new UnsupportedOperationException("Non-serializable payload encountered "
+ + payload.getClass());
+ }
+
+ LOG.trace("{}: starting append of {}", persistenceId, payload);
+ final Indexed<ToPersistence> entry = writer.append(new ToPersistence(repr));
+ final int size = entry.size();
+ LOG.trace("{}: finished append of {} with {} bytes at {}", persistenceId, payload, size, entry.index());
+ recordMessageSize(size);
+ bytes += size;
+ }
+ return bytes;
}
}