Do not leak Kryo from atomix.storage
[controller.git] / third-party / atomix / storage / src / main / java / io / atomix / storage / journal / SegmentedJournal.java
index ea5944aabd03f354389d42d7d758b95b0e935094..f2d99eec533a094f36ef82476d8094f7f2ddd5cb 100644 (file)
@@ -24,13 +24,11 @@ import java.nio.file.StandardOpenOption;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-
-import com.google.common.collect.Sets;
-import io.atomix.utils.serializer.Namespace;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +56,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
   private final String name;
   private final StorageLevel storageLevel;
   private final File directory;
-  private final Namespace namespace;
+  private final JournalSerdes namespace;
   private final int maxSegmentSize;
   private final int maxEntrySize;
   private final int maxEntriesPerSegment;
@@ -67,8 +65,8 @@ public final class SegmentedJournal<E> implements Journal<E> {
   private final SegmentedJournalWriter<E> writer;
   private volatile long commitIndex;
 
-  private final NavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
-  private final Collection<SegmentedJournalReader<E>> readers = Sets.newConcurrentHashSet();
+  private final ConcurrentNavigableMap<Long, JournalSegment<E>> segments = new ConcurrentSkipListMap<>();
+  private final Collection<SegmentedJournalReader<E>> readers = ConcurrentHashMap.newKeySet();
   private JournalSegment<E> currentSegment;
 
   private volatile boolean open = true;
@@ -77,7 +75,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
       String name,
       StorageLevel storageLevel,
       File directory,
-      Namespace namespace,
+      JournalSerdes namespace,
       int maxSegmentSize,
       int maxEntrySize,
       int maxEntriesPerSegment,
@@ -230,7 +228,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
   /**
    * Opens the segments.
    */
-  private void open() {
+  private synchronized void open() {
     // Load existing log segments from disk.
     for (JournalSegment<E> segment : loadSegments()) {
       segments.put(segment.descriptor().index(), segment);
@@ -665,7 +663,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
   /**
    * Raft log builder.
    */
-  public static class Builder<E> {
+  public static final class Builder<E> {
     private static final boolean DEFAULT_FLUSH_ON_COMMIT = false;
     private static final String DEFAULT_NAME = "atomix";
     private static final String DEFAULT_DIRECTORY = System.getProperty("user.dir");
@@ -673,17 +671,15 @@ public final class SegmentedJournal<E> implements Journal<E> {
     private static final int DEFAULT_MAX_ENTRY_SIZE = 1024 * 1024;
     private static final int DEFAULT_MAX_ENTRIES_PER_SEGMENT = 1024 * 1024;
     private static final double DEFAULT_INDEX_DENSITY = .005;
-    private static final int DEFAULT_CACHE_SIZE = 1024;
-
-    protected String name = DEFAULT_NAME;
-    protected StorageLevel storageLevel = StorageLevel.DISK;
-    protected File directory = new File(DEFAULT_DIRECTORY);
-    protected Namespace namespace;
-    protected int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
-    protected int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
-    protected int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
-    protected double indexDensity = DEFAULT_INDEX_DENSITY;
-    protected int cacheSize = DEFAULT_CACHE_SIZE;
+
+    private String name = DEFAULT_NAME;
+    private StorageLevel storageLevel = StorageLevel.DISK;
+    private File directory = new File(DEFAULT_DIRECTORY);
+    private JournalSerdes namespace;
+    private int maxSegmentSize = DEFAULT_MAX_SEGMENT_SIZE;
+    private int maxEntrySize = DEFAULT_MAX_ENTRY_SIZE;
+    private int maxEntriesPerSegment = DEFAULT_MAX_ENTRIES_PER_SEGMENT;
+    private double indexDensity = DEFAULT_INDEX_DENSITY;
     private boolean flushOnCommit = DEFAULT_FLUSH_ON_COMMIT;
 
     protected Builder() {
@@ -746,7 +742,7 @@ public final class SegmentedJournal<E> implements Journal<E> {
      * @param namespace The journal serializer.
      * @return The journal builder.
      */
-    public Builder<E> withNamespace(Namespace namespace) {
+    public Builder<E> withNamespace(JournalSerdes namespace) {
       this.namespace = requireNonNull(namespace, "namespace cannot be null");
       return this;
     }
@@ -823,21 +819,6 @@ public final class SegmentedJournal<E> implements Journal<E> {
       return this;
     }
 
-    /**
-     * Sets the journal cache size.
-     *
-     * @param cacheSize the journal cache size
-     * @return the journal builder
-     * @throws IllegalArgumentException if the cache size is not positive
-     * @deprecated since 3.0.4
-     */
-    @Deprecated
-    public Builder<E> withCacheSize(int cacheSize) {
-      checkArgument(cacheSize >= 0, "cacheSize must be positive");
-      this.cacheSize = cacheSize;
-      return this;
-    }
-
     /**
      * Enables flushing buffers to disk when entries are committed to a segment, returning the builder for method
      * chaining.