/*
* Copyright 2017-2022 Open Networking Foundation and others. All rights reserved.
* Copyright (c) 2024 PANTHEON.tech, s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.atomix.storage.journal;
import static io.atomix.storage.journal.SegmentEntry.HEADER_BYTES;
import io.atomix.storage.journal.index.JournalIndex;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
/**
* Segment writer.
*
* The format of an entry in the log is as follows:
*
* - 64-bit index
* - 8-bit boolean indicating whether a term change is contained in the entry
* - 64-bit optional term
* - 32-bit signed entry length, including the entry type ID
* - 8-bit signed entry type ID
* - n-bit entry bytes
*
*
* @author Jordan Halterman
*/
final class DiskJournalSegmentWriter extends JournalSegmentWriter {
private static final ByteBuffer ZERO_ENTRY_HEADER = ByteBuffer.wrap(new byte[HEADER_BYTES]);
private final JournalSegmentReader reader;
private final ByteBuffer buffer;
DiskJournalSegmentWriter(final FileChannel channel, final JournalSegment segment, final int maxEntrySize,
final JournalIndex index) {
super(channel, segment, maxEntrySize, index);
buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
reader = new JournalSegmentReader(segment,
new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize);
reset(0);
}
DiskJournalSegmentWriter(final JournalSegmentWriter previous) {
super(previous);
buffer = DiskFileReader.allocateBuffer(maxSegmentSize, maxEntrySize);
reader = new JournalSegmentReader(segment,
new DiskFileReader(segment.file().file().toPath(), channel, buffer), maxEntrySize);
}
@Override
MappedByteBuffer buffer() {
return null;
}
@Override
MappedJournalSegmentWriter toMapped() {
return new MappedJournalSegmentWriter(this);
}
@Override
DiskJournalSegmentWriter toFileChannel() {
return this;
}
@Override
JournalSegmentReader reader() {
return reader;
}
@Override
ByteBuffer startWrite(final int position, final int size) {
return buffer.clear().slice(0, size);
}
@Override
void commitWrite(final int position, final ByteBuffer entry) {
try {
channel.write(entry, position);
} catch (IOException e) {
throw new StorageException(e);
}
}
@Override
void writeEmptyHeader(final int position) {
try {
channel.write(ZERO_ENTRY_HEADER.asReadOnlyBuffer(), position);
} catch (IOException e) {
throw new StorageException(e);
}
}
@Override
void flush() {
try {
if (channel.isOpen()) {
channel.force(true);
}
} catch (IOException e) {
throw new StorageException(e);
}
}
@Override
void close() {
flush();
}
}