*/
package io.atomix.storage.journal;
-import com.google.common.annotations.VisibleForTesting;
+import static java.util.Objects.requireNonNull;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
import java.nio.ByteBuffer;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static java.util.Objects.requireNonNull;
+import java.nio.channels.FileChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import org.eclipse.jdt.annotation.NonNull;
/**
* Stores information about a {@link JournalSegment} of the log.
this.locked = buffer.get() == 1;
}
+ /**
+ * Read a JournalSegmentDescriptor from a {@link Path}.
+ *
+ * @param path path to read from
+ * @return A {@link JournalSegmentDescriptor}
+ * @throws IOException if an I/O error occurs or there is not enough data
+ */
+ public static @NonNull JournalSegmentDescriptor readFrom(final Path path) throws IOException {
+ try (var channel = FileChannel.open(path, StandardOpenOption.READ)) {
+ final var buffer = ByteBuffer.allocate(BYTES);
+ final var readBytes = channel.read(buffer);
+ if (readBytes != BYTES) {
+ throw new IOException("Need " + BYTES + " bytes, only " + readBytes + " available");
+ }
+ return new JournalSegmentDescriptor(buffer.flip());
+ }
+ }
+
/**
* Returns the segment version.
* <p>
@Override
public String toString() {
- return toStringHelper(this)
+ return MoreObjects.toStringHelper(this)
.add("version", version)
.add("id", id)
.add("index", index)
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Collection;
-import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
return new JournalSegment(segmentFile, descriptor, storageLevel, maxEntrySize, indexDensity);
}
- /**
- * Loads a segment.
- */
- private JournalSegment loadSegment(long segmentId) {
- File segmentFile = JournalSegmentFile.createSegmentFile(name, directory, segmentId);
- ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
- try (FileChannel channel = openChannel(segmentFile)) {
- channel.read(buffer);
- buffer.flip();
- JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
- JournalSegment segment = newSegment(new JournalSegmentFile(segmentFile), descriptor);
- LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), segmentFile.getName());
- return segment;
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
- private FileChannel openChannel(File file) {
- try {
- return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
- } catch (IOException e) {
- throw new StorageException(e);
- }
- }
-
/**
* Loads all segments from disk.
*
// Ensure log directories are created.
directory.mkdirs();
- TreeMap<Long, JournalSegment> segments = new TreeMap<>();
+ final var segments = new TreeMap<Long, JournalSegment>();
// Iterate through all files in the log directory.
- for (File file : directory.listFiles(File::isFile)) {
+ for (var file : directory.listFiles(File::isFile)) {
// If the file looks like a segment file, attempt to load the segment.
if (JournalSegmentFile.isSegmentFile(name, file)) {
- JournalSegmentFile segmentFile = new JournalSegmentFile(file);
- ByteBuffer buffer = ByteBuffer.allocate(JournalSegmentDescriptor.BYTES);
- try (FileChannel channel = openChannel(file)) {
- channel.read(buffer);
- buffer.flip();
+ // read the descriptor
+ final JournalSegmentDescriptor descriptor;
+ try {
+ descriptor = JournalSegmentDescriptor.readFrom(file.toPath());
} catch (IOException e) {
throw new StorageException(e);
}
- JournalSegmentDescriptor descriptor = new JournalSegmentDescriptor(buffer);
-
// Load the segment.
- JournalSegment segment = loadSegment(descriptor.id());
+ final var segmentFile = new JournalSegmentFile(file);
+ final var segment = newSegment(segmentFile, descriptor);
+ LOG.debug("Loaded disk segment: {} ({})", descriptor.id(), file.getName());
// Add the segment to the segments list.
- LOG.debug("Found segment: {} ({})", segment.descriptor().id(), segmentFile.file().getName());
segments.put(segment.firstIndex(), segment);
}
}
// Verify that all the segments in the log align with one another.
JournalSegment previousSegment = null;
boolean corrupted = false;
- Iterator<Map.Entry<Long, JournalSegment>> iterator = segments.entrySet().iterator();
+ final var iterator = segments.entrySet().iterator();
while (iterator.hasNext()) {
- JournalSegment segment = iterator.next().getValue();
+ final var segment = iterator.next().getValue();
if (previousSegment != null && previousSegment.lastIndex() != segment.firstIndex() - 1) {
- LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(), previousSegment.file().file());
+ LOG.warn("Journal is inconsistent. {} is not aligned with prior segment {}", segment.file().file(),
+ previousSegment.file().file());
corrupted = true;
}
if (corrupted) {