import io.atomix.storage.journal.index.JournalIndex;
import io.atomix.storage.journal.index.SparseJournalIndex;
import java.io.IOException;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
private final JournalIndex index;
private final JournalSerdes namespace;
private final MappableJournalSegmentWriter<E> writer;
- private final Set<MappableJournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
+ private final Set<JournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
private final FileChannel channel;
private boolean open = true;
* Acquires a reference to the log segment.
*/
void acquire() {
- if (references.getAndIncrement() == 0 && open) {
- map();
+ if (references.getAndIncrement() == 0 && storageLevel == StorageLevel.MAPPED) {
+ writer.map();
}
}
* Releases a reference to the log segment.
*/
void release() {
- if (references.decrementAndGet() == 0 && open) {
- unmap();
- }
- }
-
- /**
- * Maps the log segment into memory.
- */
- private void map() {
- if (storageLevel == StorageLevel.MAPPED) {
- MappedByteBuffer buffer = writer.map();
- readers.forEach(reader -> reader.map(buffer));
- }
- }
-
- /**
- * Unmaps the log segment from memory.
- */
- private void unmap() {
- if (storageLevel == StorageLevel.MAPPED) {
- writer.unmap();
- readers.forEach(reader -> reader.unmap());
+ if (references.decrementAndGet() == 0) {
+ if (storageLevel == StorageLevel.MAPPED) {
+ writer.unmap();
+ }
+ if (!open) {
+ finishClose();
+ }
}
}
*
* @return A new segment reader.
*/
- MappableJournalSegmentReader<E> createReader() {
+ JournalSegmentReader<E> createReader() {
checkOpen();
- MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(channel, this, maxEntrySize, index,
- namespace);
- MappedByteBuffer buffer = writer.buffer();
- if (buffer != null) {
- reader.map(buffer);
- }
+ acquire();
+
+ final var buffer = writer.buffer();
+ final var reader = buffer == null
+ ? new FileChannelJournalSegmentReader<>(channel, this, maxEntrySize, index, namespace)
+ : new MappedJournalSegmentReader<>(buffer, this, maxEntrySize, index, namespace);
readers.add(reader);
return reader;
}
*
* @param reader the closed segment reader
*/
- void closeReader(MappableJournalSegmentReader<E> reader) {
- readers.remove(reader);
+ void closeReader(JournalSegmentReader<E> reader) {
+ if (readers.remove(reader)) {
+ release();
+ }
}
/**
*/
@Override
public void close() {
- unmap();
- writer.close();
- readers.forEach(reader -> reader.close());
+ if (!open) {
+ return;
+ }
+
open = false;
+ readers.forEach(JournalSegmentReader::close);
+ if (references.get() == 0) {
+ finishClose();
+ }
+ }
+
+ private void finishClose() {
+ writer.close();
try {
channel.close();
} catch (IOException e) {