X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=third-party%2Fatomix%2Fstorage%2Fsrc%2Fmain%2Fjava%2Fio%2Fatomix%2Fstorage%2Fjournal%2FSegmentedJournal.java;h=a452587b7691247aacd193fa220b575cd9ba560f;hb=refs%2Fchanges%2F15%2F104715%2F1;hp=07496c5dc1855e79bd7b4599db7adcb63aae2f86;hpb=5417df1af08aa5eba93a7fc602c73e81e6767ab8;p=controller.git diff --git a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java index 07496c5dc1..a452587b76 100644 --- a/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java +++ b/third-party/atomix/storage/src/main/java/io/atomix/storage/journal/SegmentedJournal.java @@ -24,26 +24,24 @@ import java.nio.file.StandardOpenOption; import java.util.Collection; import java.util.Iterator; import java.util.Map; -import java.util.NavigableMap; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import com.google.common.collect.Sets; -import io.atomix.storage.StorageException; -import io.atomix.storage.StorageLevel; import io.atomix.utils.serializer.Namespace; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; /** * Segmented journal. */ -public class SegmentedJournal implements Journal { +public final class SegmentedJournal implements Journal { /** * Returns a new Raft log builder. @@ -69,8 +67,8 @@ public class SegmentedJournal implements Journal { private final SegmentedJournalWriter writer; private volatile long commitIndex; - private final NavigableMap> segments = new ConcurrentSkipListMap<>(); - private final Collection readers = Sets.newConcurrentHashSet(); + private final ConcurrentNavigableMap> segments = new ConcurrentSkipListMap<>(); + private final Collection> readers = Sets.newConcurrentHashSet(); private JournalSegment currentSegment; private volatile boolean open = true; @@ -85,10 +83,10 @@ public class SegmentedJournal implements Journal { int maxEntriesPerSegment, double indexDensity, boolean flushOnCommit) { - this.name = checkNotNull(name, "name cannot be null"); - this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null"); - this.directory = checkNotNull(directory, "directory cannot be null"); - this.namespace = checkNotNull(namespace, "namespace cannot be null"); + this.name = requireNonNull(name, "name cannot be null"); + this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); + this.directory = requireNonNull(directory, "directory cannot be null"); + this.namespace = requireNonNull(namespace, "namespace cannot be null"); this.maxSegmentSize = maxSegmentSize; this.maxEntrySize = maxEntrySize; this.maxEntriesPerSegment = maxEntriesPerSegment; @@ -232,7 +230,7 @@ public class SegmentedJournal implements Journal { /** * Opens the segments. */ - private void open() { + private synchronized void open() { // Load existing log segments from disk. for (JournalSegment segment : loadSegments()) { segments.put(segment.descriptor().index(), segment); @@ -359,7 +357,7 @@ public class SegmentedJournal implements Journal { assertOpen(); assertDiskSpace(); - JournalSegment lastSegment = getLastSegment(); + JournalSegment lastSegment = getLastSegment(); JournalSegmentDescriptor descriptor = JournalSegmentDescriptor.builder() .withId(lastSegment != null ? lastSegment.descriptor().id() + 1 : 1) .withIndex(currentSegment.lastIndex() + 1) @@ -410,7 +408,7 @@ public class SegmentedJournal implements Journal { * * @param segment The segment to remove. */ - synchronized void removeSegment(JournalSegment segment) { + synchronized void removeSegment(JournalSegment segment) { segments.remove(segment.index()); segment.close(); segment.delete(); @@ -552,7 +550,7 @@ public class SegmentedJournal implements Journal { * @param index The index at which to reset readers. */ void resetHead(long index) { - for (SegmentedJournalReader reader : readers) { + for (SegmentedJournalReader reader : readers) { if (reader.getNextIndex() < index) { reader.reset(index); } @@ -565,14 +563,14 @@ public class SegmentedJournal implements Journal { * @param index The index at which to reset readers. */ void resetTail(long index) { - for (SegmentedJournalReader reader : readers) { + for (SegmentedJournalReader reader : readers) { if (reader.getNextIndex() >= index) { reader.reset(index); } } } - void closeReader(SegmentedJournalReader reader) { + void closeReader(SegmentedJournalReader reader) { readers.remove(reader); } @@ -616,7 +614,7 @@ public class SegmentedJournal implements Journal { SortedMap> compactSegments = segments.headMap(segmentEntry.getValue().index()); if (!compactSegments.isEmpty()) { log.debug("{} - Compacting {} segment(s)", name, compactSegments.size()); - for (JournalSegment segment : compactSegments.values()) { + for (JournalSegment segment : compactSegments.values()) { log.trace("Deleting segment: {}", segment); segment.close(); segment.delete(); @@ -667,7 +665,7 @@ public class SegmentedJournal implements Journal { /** * Raft log builder. */ - public static class Builder implements io.atomix.utils.Builder> { + public static final class Builder { private static final boolean DEFAULT_FLUSH_ON_COMMIT = false; private static final String DEFAULT_NAME = "atomix"; private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir"); @@ -675,17 +673,15 @@ public class SegmentedJournal implements Journal { private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024; private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024; private static final double DEFAULT_INDEX_DENSITY = .005; - private static final int DEFAULT_CACHE_SIZE = 1024; - - protected String name = DEFAULT_NAME; - protected StorageLevel storageLevel = StorageLevel.DISK; - protected File directory = new File(DEFAULT_DIRECTORY); - protected Namespace namespace; - protected int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; - protected int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; - protected int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; - protected double indexDensity = DEFAULT_INDEX_DENSITY; - protected int cacheSize = DEFAULT_CACHE_SIZE; + + private String name = DEFAULT_NAME; + private StorageLevel storageLevel = StorageLevel.DISK; + private File directory = new File(DEFAULT_DIRECTORY); + private Namespace namespace; + private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE; + private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE; + private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT; + private double indexDensity = DEFAULT_INDEX_DENSITY; private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT; protected Builder() { @@ -698,7 +694,7 @@ public class SegmentedJournal implements Journal { * @return The storage builder. */ public Builder withName(String name) { - this.name = checkNotNull(name, "name cannot be null"); + this.name = requireNonNull(name, "name cannot be null"); return this; } @@ -711,7 +707,7 @@ public class SegmentedJournal implements Journal { * @return The storage builder. */ public Builder withStorageLevel(StorageLevel storageLevel) { - this.storageLevel = checkNotNull(storageLevel, "storageLevel cannot be null"); + this.storageLevel = requireNonNull(storageLevel, "storageLevel cannot be null"); return this; } @@ -725,7 +721,7 @@ public class SegmentedJournal implements Journal { * @throws NullPointerException If the {@code directory} is {@code null} */ public Builder withDirectory(String directory) { - return withDirectory(new File(checkNotNull(directory, "directory cannot be null"))); + return withDirectory(new File(requireNonNull(directory, "directory cannot be null"))); } /** @@ -738,7 +734,7 @@ public class SegmentedJournal implements Journal { * @throws NullPointerException If the {@code directory} is {@code null} */ public Builder withDirectory(File directory) { - this.directory = checkNotNull(directory, "directory cannot be null"); + this.directory = requireNonNull(directory, "directory cannot be null"); return this; } @@ -749,7 +745,7 @@ public class SegmentedJournal implements Journal { * @return The journal builder. */ public Builder withNamespace(Namespace namespace) { - this.namespace = checkNotNull(namespace, "namespace cannot be null"); + this.namespace = requireNonNull(namespace, "namespace cannot be null"); return this; } @@ -836,7 +832,6 @@ public class SegmentedJournal implements Journal { @Deprecated public Builder withCacheSize(int cacheSize) { checkArgument(cacheSize >= 0, "cacheSize must be positive"); - this.cacheSize = cacheSize; return this; } @@ -868,7 +863,11 @@ public class SegmentedJournal implements Journal { return this; } - @Override + /** + * Build the {@link SegmentedJournal}. + * + * @return A new {@link SegmentedJournal}. + */ public SegmentedJournal build() { return new SegmentedJournal<>( name,