}
@SuppressWarnings("checkstyle:IllegalCatch")
- private boolean captureToInstall(final @NonNull OutputStream outputStream) {
+ private boolean captureToInstall(final @NonNull FileBackedOutputStream outputStream) {
try {
snapshotCohort.createSnapshot(context.getActor(), outputStream);
} catch (Exception e) {
import com.google.common.annotations.Beta;
import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.pekko.persistence.JournalProtocol;
import org.apache.pekko.persistence.SnapshotProtocol;
import org.apache.pekko.persistence.SnapshotSelectionCriteria;
+import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
*/
// FIXME: find a better name for this interface. It is heavily influenced by Pekko Persistence, most notably the weird
// API around snapshots and message deletion -- which assumes the entity requesting it is the subclass itself.
-@NonNullByDefault
public interface DataPersistenceProvider {
+ @Beta
+ @NonNullByDefault
+ @FunctionalInterface
+ interface WritableSnapshot {
+
+ void writeTo(OutputStream out) throws IOException;
+ }
+
/**
* Returns whether or not persistence recovery is applicable/enabled.
*
* @param entry the journal entry to persist
* @param callback the callback when persistence is complete
*/
- // FIXME: no callback and throw an IOException
- <T> void persist(T entry, Consumer<T> callback);
+ // FIXME: replace with:
+ // void persist(Object entry) throws IOException
+ <T> void persist(@NonNull T entry, @NonNull Consumer<T> callback);
/**
* Persists an entry to the applicable journal asynchronously.
* @param entry the journal entry to persist
* @param callback the callback when persistence is complete
*/
- // FIXME: a BiConsumer<? super T, ? super Throwable> callback
- <T> void persistAsync(T entry, Consumer<T> callback);
+ // FIXME: replace with:
+ // void persistAsync(T entry, BiConsumer<? super T, ? super Throwable> callback)
+ <T> void persistAsync(@NonNull T entry, @NonNull Consumer<T> callback);
/**
* Saves a snapshot.
*
* @param snapshot the snapshot object to save
*/
- // FIXME: add a BiConsumer<SnapshotSource, ? super Throwable> callback
- void saveSnapshot(Snapshot snapshot);
+ // FIXME: replace with the below, combining the save functionality
+ void saveSnapshot(@NonNull Snapshot snapshot);
+
+ void streamToInstall(@NonNull WritableSnapshot snapshot,
+ @NonNull BiConsumer<SnapshotSource, ? super Throwable> callback);
/**
* Deletes snapshots based on the given criteria.
*/
// FIXME: criteria == max size? max snapshots?
// FIXME: throws IOException
- void deleteSnapshots(SnapshotSelectionCriteria criteria);
+ void deleteSnapshots(@NonNull SnapshotSelectionCriteria criteria);
/**
* Deletes journal entries up to the given sequence number.
* @param response A {@link JournalProtocol} response
* @return {@code true} if the response was handled
*/
- boolean handleJournalResponse(JournalProtocol.Response response);
+ boolean handleJournalResponse(JournalProtocol.@NonNull Response response);
/**
* Receive and potentially handle a {@link SnapshotProtocol} response.
* @param response A {@link SnapshotProtocol} response
* @return {@code true} if the response was handled
*/
- boolean handleSnapshotResponse(SnapshotProtocol.Response response);
+ boolean handleSnapshotResponse(SnapshotProtocol.@NonNull Response response);
}
import com.google.common.base.MoreObjects;
import java.io.IOException;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.pekko.persistence.JournalProtocol;
import org.apache.pekko.persistence.SnapshotProtocol;
delegate().saveSnapshot(entry);
}
+ @Override
+ public void streamToInstall(final WritableSnapshot snapshot,
+ final BiConsumer<SnapshotSource, ? super Throwable> callback) {
+ delegate().streamToInstall(snapshot, callback);
+ }
+
@Override
public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
delegate().deleteSnapshots(criteria);
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import java.io.IOException;
-import java.util.concurrent.Callable;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.raft.spi.FileBackedOutputStream;
import org.opendaylight.raft.spi.FileBackedOutputStream.Configuration;
import org.opendaylight.raft.spi.SnapshotFileFormat;
+import org.opendaylight.raft.spi.SnapshotSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
*/
public abstract sealed class RaftStorage implements DataPersistenceProvider
permits DisabledRaftStorage, EnabledRaftStorage {
+ private abstract class CancellableTask<T> implements Runnable {
+ private final BiConsumer<? super T, ? super Throwable> callback;
+
+ CancellableTask(final BiConsumer<? super T, ? super Throwable> callback) {
+ this.callback = requireNonNull(callback);
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:illegalCatch")
+ public final void run() {
+ if (!tasks.remove(this)) {
+ LOG.debug("{}: not executing task {}", memberId(), this);
+ return;
+ }
+
+ final T result;
+ try {
+ result = compute();
+ } catch (Exception e) {
+ callback.accept(null, e);
+ return;
+ }
+ callback.accept(result, null);
+ }
+
+ abstract @NonNull T compute() throws Exception;
+ }
+
+ private final class StreamSnapshotTask extends CancellableTask<SnapshotSource> {
+ private final WritableSnapshot snapshot;
+
+ StreamSnapshotTask(final BiConsumer<SnapshotSource, ? super Throwable> callback,
+ final WritableSnapshot snapshot) {
+ super(callback);
+ this.snapshot = requireNonNull(snapshot);
+ }
+
+ @Override
+ SnapshotSource compute() throws IOException {
+ try (var outer = new FileBackedOutputStream(streamConfig)) {
+ try (var inner = preferredFormat.encodeOutput(outer)) {
+ snapshot.writeTo(inner);
+ }
+ return preferredFormat.sourceFor(outer.asByteSource()::openStream);
+ }
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
+ private final Set<CancellableTask<?>> tasks = ConcurrentHashMap.newKeySet();
private final @NonNull SnapshotFileFormat preferredFormat;
private final @NonNull Configuration streamConfig;
+ private ExecutorService executor;
+
protected RaftStorage(final SnapshotFileFormat preferredFormat, final Configuration streamConfig) {
this.preferredFormat = requireNonNull(preferredFormat);
this.streamConfig = requireNonNull(streamConfig);
}
- private ExecutorService executor;
-
- // FIXME: we should have the concept of being 'open', when we have a thread pool to perform the asynchronous part
- // of RaftActorSnapshotCohort.createSnapshot(), using virtual-thread-per-task
-
// FIXME: this class should also be tracking the last snapshot bytes -- i.e. what AbstractLeader.SnapshotHolder.
// for file-based enabled storage, this means keeping track of the last snapshot we have. For disabled the
// case is similar, except we want to have a smarter strategy:
} finally {
executor = null;
stopExecutor(local);
+ cancelTasks();
}
}
- protected abstract void preStop();
-
- protected final <T> @NonNull Future<T> submit(final @NonNull Callable<T> task) {
- return doSubmit(requireNonNull(task));
+ private void cancelTasks() {
+ for (var task : tasks) {
+ if (tasks.remove(task)) {
+ task.callback.accept(null, new CancellationException("Storage closed"));
+ } else {
+ LOG.debug("{}: not cancelling task {}", memberId(), task);
+ }
+ }
}
- protected final @NonNull Future<Void> submit(final @NonNull Runnable task) {
- return doSubmit(Executors.<Void>callable(task, null));
- }
+ protected abstract void preStop();
- private <T> @NonNull Future<T> doSubmit(final @NonNull Callable<T> task) {
- final var local = executor;
- if (local == null) {
- throw new IllegalStateException("Storage " + memberId() + " is stopped");
- }
- return local.submit(task);
+ @Override
+ public final void streamToInstall(final WritableSnapshot snapshot,
+ final BiConsumer<SnapshotSource, ? super Throwable> callback) {
+ final var local = checkNotClosed();
+ final var task = new StreamSnapshotTask(callback, snapshot);
+ tasks.add(task);
+ local.execute(task);
}
@Override
protected ToStringHelper addToStringAtrributes(final ToStringHelper helper) {
return helper.add("memberId", memberId()).add("preferredFormat", preferredFormat).add("streams", streamConfig);
}
+
+ private ExecutorService checkNotClosed() {
+ final var local = executor;
+ if (local == null) {
+ throw new IllegalStateException("Storage " + memberId() + " already stopped");
+ }
+ return local;
+ }
}
import static java.util.Objects.requireNonNull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.function.BiConsumer;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.spi.ImmediateDataPersistenceProvider;
+import org.opendaylight.raft.spi.ByteArray;
+import org.opendaylight.raft.spi.PlainSnapshotSource;
+import org.opendaylight.raft.spi.SnapshotSource;
@NonNullByDefault
final class TestDataProvider implements ImmediateDataPersistenceProvider {
// no-op
}
+ @Override
+ public void streamToInstall(final WritableSnapshot snapshot,
+ final BiConsumer<SnapshotSource, ? super Throwable> callback) {
+ final byte[] bytes;
+ try (var baos = new ByteArrayOutputStream()) {
+ snapshot.writeTo(baos);
+ bytes = baos.toByteArray();
+ } catch (IOException e) {
+ actor.executeInSelf(() -> callback.accept(null, e));
+ return;
+ }
+ actor.executeInSelf(() -> callback.accept(new PlainSnapshotSource(ByteArray.wrap(bytes)), null));
+ }
+
void setActor(final ExecuteInSelfActor actor) {
this.actor = requireNonNull(actor);
}