@Override
@SuppressWarnings("checkstyle:illegalCatch")
- void handleWriteMessages(final WriteMessages message) {
+ long handleWriteMessages(final WriteMessages message) {
final int count = message.size();
final SegmentedJournalWriter<DataJournalEntry> writer = entries.writer();
+ long bytes = 0;
for (int i = 0; i < count; ++i) {
final long mark = writer.getLastIndex();
final List<PersistentRepr> reprs = CollectionConverters.asJava(request.payload());
LOG.trace("{}: append {}/{}: {} items at mark {}", persistenceId, i, count, reprs.size(), mark);
try {
- for (PersistentRepr repr : reprs) {
- final Object payload = repr.payload();
- if (!(payload instanceof Serializable)) {
- throw new UnsupportedOperationException("Non-serializable payload encountered "
- + payload.getClass());
- }
-
- LOG.trace("{}: starting append of {}", persistenceId, payload);
- final Indexed<ToPersistence> entry = writer.append(new ToPersistence(repr));
- final int size = entry.size();
- LOG.trace("{}: finished append of {} with {} bytes at {}", persistenceId, payload, size,
- entry.index());
- recordMessageSize(size);
- }
+ bytes += writePayload(writer, reprs);
} catch (Exception e) {
- LOG.warn("{}: failed to write out request", persistenceId, e);
+ LOG.warn("{}: failed to write out request {}/{} reverting to {}", persistenceId, i, count, mark, e);
message.setFailure(i, e);
writer.truncate(mark);
continue;
message.setSuccess(i);
}
writer.flush();
+ return bytes;
+ }
+
+ private long writePayload(final SegmentedJournalWriter<DataJournalEntry> writer, final List<PersistentRepr> reprs) {
+ long bytes = 0;
+ for (PersistentRepr repr : reprs) {
+ final Object payload = repr.payload();
+ if (!(payload instanceof Serializable)) {
+ throw new UnsupportedOperationException("Non-serializable payload encountered "
+ + payload.getClass());
+ }
+
+ LOG.trace("{}: starting append of {}", persistenceId, payload);
+ final Indexed<ToPersistence> entry = writer.append(new ToPersistence(repr));
+ final int size = entry.size();
+ LOG.trace("{}: finished append of {} with {} bytes at {}", persistenceId, payload, size, entry.index());
+ recordMessageSize(size);
+ bytes += size;
+ }
+ return bytes;
}
}
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Stopwatch;
import io.atomix.storage.StorageLevel;
import io.atomix.storage.journal.Indexed;
import io.atomix.storage.journal.SegmentedJournal;
private void handleWriteMessages(final WriteMessages message) {
ensureOpen();
- final long startTicks = System.nanoTime();
+ final Stopwatch sw = Stopwatch.createStarted();
final long start = dataJournal.lastWrittenSequenceNr();
+ final long bytes = dataJournal.handleWriteMessages(message);
+ sw.stop();
- dataJournal.handleWriteMessages(message);
-
- batchWriteTime.update(System.nanoTime() - startTicks, TimeUnit.NANOSECONDS);
+ batchWriteTime.update(sw.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
messageWriteCount.mark(dataJournal.lastWrittenSequenceNr() - start);
+
+ // log message after statistics are updated
+ LOG.debug("{}: write of {} bytes completed in {}", persistenceId, bytes, sw);
}
private void handleUnknown(final Object message) {
return;
}
+ final Stopwatch sw = Stopwatch.createStarted();
deleteJournal = SegmentedJournal.<Long>builder().withDirectory(directory).withName("delete")
.withNamespace(DELETE_NAMESPACE).withMaxSegmentSize(DELETE_SEGMENT_SIZE).build();
final Indexed<Long> lastEntry = deleteJournal.writer().getLastEntry();
dataJournal = new DataJournalV0(persistenceId, messageSize, context().system(), storage, directory,
maxEntrySize, maxSegmentSize);
dataJournal.deleteTo(lastDelete);
- LOG.debug("{}: journal open with last index {}, deleted to {}", persistenceId,
+ LOG.debug("{}: journal open in {} with last index {}, deleted to {}", persistenceId, sw,
dataJournal.lastWrittenSequenceNr(), lastDelete);
}
}