import io.atomix.storage.journal.index.JournalIndex;
import io.atomix.storage.journal.index.SparseJournalIndex;
-import java.io.File;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
private final MappableJournalSegmentWriter<E> writer;
private final Set<MappableJournalSegmentReader<E>> readers = ConcurrentHashMap.newKeySet();
private final AtomicInteger references = new AtomicInteger();
+ private final FileChannel channel;
private boolean open = true;
public JournalSegment(
this.descriptor = descriptor;
this.storageLevel = storageLevel;
this.maxEntrySize = maxEntrySize;
- this.index = new SparseJournalIndex(indexDensity);
this.namespace = namespace;
- this.writer = new MappableJournalSegmentWriter<>(openChannel(file.file()), this, maxEntrySize, index, namespace);
- }
-
- private FileChannel openChannel(File file) {
+ index = new SparseJournalIndex(indexDensity);
try {
- return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
+ channel = FileChannel.open(file.file().toPath(),
+ StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
} catch (IOException e) {
throw new StorageException(e);
}
+ writer = new MappableJournalSegmentWriter<>(channel, this, maxEntrySize, index, namespace);
}
/**
* @return the size of the segment
*/
public int size() {
- return writer.size();
+ try {
+ return (int) channel.size();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
/**
*/
MappableJournalSegmentReader<E> createReader() {
checkOpen();
- MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(
- openChannel(file.file()), this, maxEntrySize, index, namespace);
+ MappableJournalSegmentReader<E> reader = new MappableJournalSegmentReader<>(channel, this, maxEntrySize, index,
+ namespace);
MappedByteBuffer buffer = writer.buffer();
if (buffer != null) {
reader.map(buffer);
writer.close();
readers.forEach(reader -> reader.close());
open = false;
+ try {
+ channel.close();
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
}
/**
package io.atomix.storage.journal;
import io.atomix.storage.journal.index.JournalIndex;
-import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
* Mappable log segment writer.
*/
final class MappableJournalSegmentWriter<E> implements JournalWriter<E> {
- private final FileChannel channel;
private final JournalSegment<E> segment;
private JournalSegmentWriter<E> writer;
int maxEntrySize,
JournalIndex index,
JournalSerdes namespace) {
- this.channel = channel;
this.segment = segment;
this.writer = new FileChannelJournalSegmentWriter<>(channel, segment, maxEntrySize, index, namespace);
}
return segment.index();
}
- /**
- * Returns the size of the segment.
- *
- * @return the size of the segment
- */
- public int size() {
- try {
- return (int) channel.size();
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
@Override
public long getLastIndex() {
return writer.getLastIndex();
@Override
public void close() {
writer.close();
- try {
- channel.close();
- } catch (IOException e) {
- throw new StorageException(e);
- }
}
}