Add DataPersistenceProvider.streamToInstall() 55/116055/22
authorRobert Varga <robert.varga@pantheon.tech>
Tue, 25 Mar 2025 07:31:16 +0000 (08:31 +0100)
committerRobert Varga <robert.varga@pantheon.tech>
Tue, 1 Apr 2025 06:45:29 +0000 (08:45 +0200)
This is the first step in having asynchronous access to a snapshot
bytestream: RaftStorage now exposes streamToInstall() method, which
writes out a snapshot in the background and invokes a callback once
that is completed.

JIRA: CONTROLLER-2134
Change-Id: Ic650dbf4b31c62135dbc7b28fa9682f1ef5bf824
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/SnapshotManager.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/DataPersistenceProvider.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/ForwardingDataPersistenceProvider.java
raft/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/spi/RaftStorage.java
raft/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/TestDataProvider.java

index 897c89260c8555878617af2d0741d5d230afd15f..183ac8be6c36b66f73ae12ea9fbfbc745f8769fc 100644 (file)
@@ -203,7 +203,7 @@ public final class SnapshotManager {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private boolean captureToInstall(final @NonNull OutputStream outputStream) {
+    private boolean captureToInstall(final @NonNull FileBackedOutputStream outputStream) {
         try {
             snapshotCohort.createSnapshot(context.getActor(), outputStream);
         } catch (Exception e) {
index 2867ea452908d2ca0895a23707358ae29cd714a2..31d5cca10eafa1cc3dc06be123c622cece7affde 100644 (file)
@@ -9,10 +9,13 @@ package org.opendaylight.controller.cluster.raft.spi;
 
 import com.google.common.annotations.Beta;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.apache.pekko.persistence.JournalProtocol;
 import org.apache.pekko.persistence.SnapshotProtocol;
 import org.apache.pekko.persistence.SnapshotSelectionCriteria;
+import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@@ -23,8 +26,15 @@ import org.opendaylight.raft.spi.SnapshotSource;
  */
 // FIXME: find a better name for this interface. It is heavily influenced by Pekko Persistence, most notably the weird
 //        API around snapshots and message deletion -- which assumes the entity requesting it is the subclass itself.
-@NonNullByDefault
 public interface DataPersistenceProvider {
+    @Beta
+    @NonNullByDefault
+    @FunctionalInterface
+    interface WritableSnapshot {
+
+        void writeTo(OutputStream out) throws IOException;
+    }
+
     /**
      * Returns whether or not persistence recovery is applicable/enabled.
      *
@@ -43,8 +53,9 @@ public interface DataPersistenceProvider {
      * @param entry the journal entry to persist
      * @param callback the callback when persistence is complete
      */
-    // FIXME: no callback and throw an IOException
-    <T> void persist(T entry, Consumer<T> callback);
+    // FIXME: replace with:
+    //        void persist(Object entry) throws IOException
+    <T> void persist(@NonNull T entry, @NonNull Consumer<T> callback);
 
     /**
      * Persists an entry to the applicable journal asynchronously.
@@ -53,16 +64,20 @@ public interface DataPersistenceProvider {
      * @param entry the journal entry to persist
      * @param callback the callback when persistence is complete
      */
-    // FIXME: a BiConsumer<? super T, ? super Throwable> callback
-    <T> void persistAsync(T entry, Consumer<T> callback);
+    // FIXME: replace with:
+    //        void persistAsync(T entry, BiConsumer<? super T, ? super Throwable> callback)
+    <T> void persistAsync(@NonNull T entry, @NonNull Consumer<T> callback);
 
     /**
      * Saves a snapshot.
      *
      * @param snapshot the snapshot object to save
      */
-    // FIXME: add a BiConsumer<SnapshotSource, ? super Throwable> callback
-    void saveSnapshot(Snapshot snapshot);
+    // FIXME: replace with the below, combining the save functionality
+    void saveSnapshot(@NonNull Snapshot snapshot);
+
+    void streamToInstall(@NonNull WritableSnapshot snapshot,
+        @NonNull BiConsumer<SnapshotSource, ? super Throwable> callback);
 
     /**
      * Deletes snapshots based on the given criteria.
@@ -71,7 +86,7 @@ public interface DataPersistenceProvider {
      */
     // FIXME: criteria == max size? max snapshots?
     // FIXME: throws IOException
-    void deleteSnapshots(SnapshotSelectionCriteria criteria);
+    void deleteSnapshots(@NonNull SnapshotSelectionCriteria criteria);
 
     /**
      * Deletes journal entries up to the given sequence number.
@@ -94,7 +109,7 @@ public interface DataPersistenceProvider {
      * @param response A {@link JournalProtocol} response
      * @return {@code true} if the response was handled
      */
-    boolean handleJournalResponse(JournalProtocol.Response response);
+    boolean handleJournalResponse(JournalProtocol.@NonNull Response response);
 
     /**
      * Receive and potentially handle a {@link SnapshotProtocol} response.
@@ -102,5 +117,5 @@ public interface DataPersistenceProvider {
      * @param response A {@link SnapshotProtocol} response
      * @return {@code true} if the response was handled
      */
-    boolean handleSnapshotResponse(SnapshotProtocol.Response response);
+    boolean handleSnapshotResponse(SnapshotProtocol.@NonNull Response response);
 }
index 42bce81e0de2207c21e6a5c366019bba777ed41d..b56e929a92eba74a93e23a50fb59b93890aefd46 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.raft.spi;
 
 import com.google.common.base.MoreObjects;
 import java.io.IOException;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import org.apache.pekko.persistence.JournalProtocol;
 import org.apache.pekko.persistence.SnapshotProtocol;
@@ -48,6 +49,12 @@ public abstract class ForwardingDataPersistenceProvider implements DataPersisten
         delegate().saveSnapshot(entry);
     }
 
+    @Override
+    public void streamToInstall(final WritableSnapshot snapshot,
+            final BiConsumer<SnapshotSource, ? super Throwable> callback) {
+        delegate().streamToInstall(snapshot, callback);
+    }
+
     @Override
     public void deleteSnapshots(final SnapshotSelectionCriteria criteria) {
         delegate().deleteSnapshots(criteria);
index c4dde286b9db5aaf3daa43b5df615581e7508146..00729c67dd4fdc218d7b8dc2136b986ed14be7e1 100644 (file)
@@ -12,14 +12,18 @@ import static java.util.Objects.requireNonNull;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.MoreObjects.ToStringHelper;
 import java.io.IOException;
-import java.util.concurrent.Callable;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiConsumer;
 import org.eclipse.jdt.annotation.NonNull;
+import org.opendaylight.raft.spi.FileBackedOutputStream;
 import org.opendaylight.raft.spi.FileBackedOutputStream.Configuration;
 import org.opendaylight.raft.spi.SnapshotFileFormat;
+import org.opendaylight.raft.spi.SnapshotSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,21 +32,67 @@ import org.slf4j.LoggerFactory;
  */
 public abstract sealed class RaftStorage implements DataPersistenceProvider
         permits DisabledRaftStorage, EnabledRaftStorage {
+    private abstract class CancellableTask<T> implements Runnable {
+        private final BiConsumer<? super T, ? super Throwable> callback;
+
+        CancellableTask(final BiConsumer<? super T, ? super Throwable> callback) {
+            this.callback = requireNonNull(callback);
+        }
+
+        @Override
+        @SuppressWarnings("checkstyle:illegalCatch")
+        public final void run() {
+            if (!tasks.remove(this)) {
+                LOG.debug("{}: not executing task {}", memberId(), this);
+                return;
+            }
+
+            final T result;
+            try {
+                result = compute();
+            } catch (Exception e) {
+                callback.accept(null, e);
+                return;
+            }
+            callback.accept(result, null);
+        }
+
+        abstract @NonNull T compute() throws Exception;
+    }
+
+    private final class StreamSnapshotTask extends CancellableTask<SnapshotSource> {
+        private final WritableSnapshot snapshot;
+
+        StreamSnapshotTask(final BiConsumer<SnapshotSource, ? super Throwable> callback,
+                final WritableSnapshot snapshot) {
+            super(callback);
+            this.snapshot = requireNonNull(snapshot);
+        }
+
+        @Override
+        SnapshotSource compute() throws IOException {
+            try (var outer = new FileBackedOutputStream(streamConfig)) {
+                try (var inner = preferredFormat.encodeOutput(outer)) {
+                    snapshot.writeTo(inner);
+                }
+                return preferredFormat.sourceFor(outer.asByteSource()::openStream);
+            }
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(RaftStorage.class);
 
+    private final Set<CancellableTask<?>> tasks = ConcurrentHashMap.newKeySet();
     private final @NonNull SnapshotFileFormat preferredFormat;
     private final @NonNull Configuration streamConfig;
 
+    private ExecutorService executor;
+
     protected RaftStorage(final SnapshotFileFormat preferredFormat, final Configuration streamConfig) {
         this.preferredFormat = requireNonNull(preferredFormat);
         this.streamConfig = requireNonNull(streamConfig);
     }
 
-    private ExecutorService executor;
-
-    // FIXME: we should have the concept of being 'open', when we have a thread pool to perform the asynchronous part
-    //        of RaftActorSnapshotCohort.createSnapshot(), using virtual-thread-per-task
-
     // FIXME: this class should also be tracking the last snapshot bytes -- i.e. what AbstractLeader.SnapshotHolder.
     //        for file-based enabled storage, this means keeping track of the last snapshot we have. For disabled the
     //        case is similar, except we want to have a smarter strategy:
@@ -94,25 +144,29 @@ public abstract sealed class RaftStorage implements DataPersistenceProvider
         } finally {
             executor = null;
             stopExecutor(local);
+            cancelTasks();
         }
     }
 
-    protected abstract void preStop();
-
-    protected final <T> @NonNull Future<T> submit(final @NonNull Callable<T> task) {
-        return doSubmit(requireNonNull(task));
+    private void cancelTasks() {
+        for (var task : tasks) {
+            if (tasks.remove(task)) {
+                task.callback.accept(null, new CancellationException("Storage closed"));
+            } else {
+                LOG.debug("{}: not cancelling task {}", memberId(), task);
+            }
+        }
     }
 
-    protected final @NonNull Future<Void> submit(final @NonNull Runnable task) {
-        return doSubmit(Executors.<Void>callable(task, null));
-    }
+    protected abstract void preStop();
 
-    private <T> @NonNull Future<T> doSubmit(final @NonNull Callable<T> task) {
-        final var local = executor;
-        if (local == null) {
-            throw new IllegalStateException("Storage " + memberId() + " is stopped");
-        }
-        return local.submit(task);
+    @Override
+    public final void streamToInstall(final WritableSnapshot snapshot,
+            final BiConsumer<SnapshotSource, ? super Throwable> callback) {
+        final var local = checkNotClosed();
+        final var task = new StreamSnapshotTask(callback, snapshot);
+        tasks.add(task);
+        local.execute(task);
     }
 
     @Override
@@ -123,4 +177,12 @@ public abstract sealed class RaftStorage implements DataPersistenceProvider
     protected ToStringHelper addToStringAtrributes(final ToStringHelper helper) {
         return helper.add("memberId", memberId()).add("preferredFormat", preferredFormat).add("streams", streamConfig);
     }
+
+    private ExecutorService checkNotClosed() {
+        final var local = executor;
+        if (local == null) {
+            throw new IllegalStateException("Storage " + memberId() + " already stopped");
+        }
+        return local;
+    }
 }
index ea8c3c5a96a5e4e5d931d689e4790160a99303b4..255cc62bedfd38daf4c169fed5729391335a3c2a 100644 (file)
@@ -9,10 +9,16 @@ package org.opendaylight.controller.cluster.raft;
 
 import static java.util.Objects.requireNonNull;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.function.BiConsumer;
 import org.eclipse.jdt.annotation.NonNullByDefault;
 import org.opendaylight.controller.cluster.common.actor.ExecuteInSelfActor;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 import org.opendaylight.controller.cluster.raft.spi.ImmediateDataPersistenceProvider;
+import org.opendaylight.raft.spi.ByteArray;
+import org.opendaylight.raft.spi.PlainSnapshotSource;
+import org.opendaylight.raft.spi.SnapshotSource;
 
 @NonNullByDefault
 final class TestDataProvider implements ImmediateDataPersistenceProvider {
@@ -36,6 +42,20 @@ final class TestDataProvider implements ImmediateDataPersistenceProvider {
         // no-op
     }
 
+    @Override
+    public void streamToInstall(final WritableSnapshot snapshot,
+            final BiConsumer<SnapshotSource, ? super Throwable> callback) {
+        final byte[] bytes;
+        try (var baos = new ByteArrayOutputStream()) {
+            snapshot.writeTo(baos);
+            bytes = baos.toByteArray();
+        } catch (IOException e) {
+            actor.executeInSelf(() -> callback.accept(null, e));
+            return;
+        }
+        actor.executeInSelf(() -> callback.accept(new PlainSnapshotSource(ByteArray.wrap(bytes)), null));
+    }
+
     void setActor(final ExecuteInSelfActor actor) {
         this.actor = requireNonNull(actor);
     }