import static akka.actor.ActorRef.noSender;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import static com.google.common.base.Verify.verify;
import akka.actor.ActorRef;
import akka.dispatch.Futures;
import akka.persistence.PersistentRepr;
import akka.persistence.journal.japi.AsyncWriteJournal;
import com.typesafe.config.Config;
-import com.typesafe.config.ConfigMemorySize;
-import io.atomix.storage.StorageLevel;
import io.atomix.storage.journal.SegmentedJournal;
+import io.atomix.storage.journal.StorageLevel;
import java.io.File;
+import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Base64;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
* An Akka persistence journal implementation on top of {@link SegmentedJournal}. This actor represents aggregation
* of multiple journals and performs a receptionist job between Akka and invidual per-persistenceId actors. See
* {@link SegmentedJournalActor} for details on how the persistence works.
- *
- * @author Robert Varga
*/
public class SegmentedFileJournal extends AsyncWriteJournal {
public static final String STORAGE_ROOT_DIRECTORY = "root-directory";
public static final int STORAGE_MAX_ENTRY_SIZE_DEFAULT = 16 * 1024 * 1024;
public static final String STORAGE_MAX_SEGMENT_SIZE = "max-segment-size";
public static final int STORAGE_MAX_SEGMENT_SIZE_DEFAULT = STORAGE_MAX_ENTRY_SIZE_DEFAULT * 8;
+ public static final String STORAGE_MAX_UNFLUSHED_BYTES = "max-unflushed-bytes";
public static final String STORAGE_MEMORY_MAPPED = "memory-mapped";
private static final Logger LOG = LoggerFactory.getLogger(SegmentedFileJournal.class);
private final StorageLevel storage;
private final int maxEntrySize;
private final int maxSegmentSize;
+ private final int maxUnflushedBytes;
public SegmentedFileJournal(final Config config) {
rootDir = new File(config.getString(STORAGE_ROOT_DIRECTORY));
maxEntrySize = getBytes(config, STORAGE_MAX_ENTRY_SIZE, STORAGE_MAX_ENTRY_SIZE_DEFAULT);
maxSegmentSize = getBytes(config, STORAGE_MAX_SEGMENT_SIZE, STORAGE_MAX_SEGMENT_SIZE_DEFAULT);
+ maxUnflushedBytes = getBytes(config, STORAGE_MAX_UNFLUSHED_BYTES, maxEntrySize);
if (config.hasPath(STORAGE_MEMORY_MAPPED)) {
storage = config.getBoolean(STORAGE_MEMORY_MAPPED) ? StorageLevel.MAPPED : StorageLevel.DISK;
@Override
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(final Iterable<AtomicWrite> messages) {
- final Map<ActorRef, WriteMessages> map = new HashMap<>();
- final List<Future<Optional<Exception>>> result = new ArrayList<>();
+ final var map = new HashMap<ActorRef, WriteMessages>();
+ final var result = new ArrayList<Future<Optional<Exception>>>();
- for (AtomicWrite message : messages) {
- final String persistenceId = message.persistenceId();
- final ActorRef handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
+ for (var message : messages) {
+ final var persistenceId = message.persistenceId();
+ final var handler = handlers.computeIfAbsent(persistenceId, this::createHandler);
result.add(map.computeIfAbsent(handler, key -> new WriteMessages()).add(message));
}
}
private ActorRef createHandler(final String persistenceId) {
- final String directoryName = Base64.getUrlEncoder().encodeToString(persistenceId.getBytes(
- StandardCharsets.UTF_8));
- final File directory = new File(rootDir, directoryName);
+ final var directoryName = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8);
+ final var directory = new File(rootDir, directoryName);
LOG.debug("Creating handler for {} in directory {}", persistenceId, directory);
- final ActorRef handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
- maxEntrySize, maxSegmentSize));
+ final var handler = context().actorOf(SegmentedJournalActor.props(persistenceId, directory, storage,
+ maxEntrySize, maxSegmentSize, maxUnflushedBytes));
LOG.debug("Directory {} handled by {}", directory, handler);
-
- final ActorRef prev = handlers.putIfAbsent(persistenceId, handler);
- verify(prev == null, "Duplicate handler for %s, already handled by %s", persistenceId, prev);
return handler;
}
private <T> Future<T> delegateMessage(final String persistenceId, final AsyncMessage<T> message) {
- final ActorRef handler = handlers.get(persistenceId);
+ final var handler = handlers.get(persistenceId);
if (handler == null) {
return Futures.failed(new IllegalStateException("Cannot find handler for " + persistenceId));
}
if (!config.hasPath(path)) {
return defaultValue;
}
- final ConfigMemorySize value = config.getMemorySize(path);
- final long result = value.toBytes();
- checkArgument(result <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
- return (int) result;
+ final long value = config.getBytes(path);
+ checkArgument(value <= Integer.MAX_VALUE, "Size %s exceeds maximum allowed %s", Integer.MAX_VALUE);
+ return (int) value;
}
}