Do not allow persistence callbacks to throw Exception 40/81640/6
authorRobert Varga <robert.varga@pantheon.tech>
Sat, 13 Apr 2019 12:55:13 +0000 (14:55 +0200)
committerRobert Varga <nite@hq.sk>
Tue, 16 Apr 2019 08:16:17 +0000 (08:16 +0000)
Since we are indirecting through an executor, we are forced
to wrap any exception -- just do not bother, as the callbacks
are executed in the context of an actor anyway (dealing with
RuntimeExceptions) and users are not throwing checked exceptions.

Change-Id: I6cea19ab7192fa42ad3c346d554411cd0d558a64
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLog.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImpl.java
opendaylight/md-sal/sal-akka-raft/src/main/java/org/opendaylight/controller/cluster/raft/behaviors/Follower.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/AbstractReplicatedLogImplTest.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/MockRaftActorContext.java
opendaylight/md-sal/sal-akka-raft/src/test/java/org/opendaylight/controller/cluster/raft/ReplicatedLogImplTest.java

index a05dbc6..1a0b38f 100644 (file)
@@ -7,8 +7,8 @@
  */
 package org.opendaylight.controller.cluster.raft;
 
-import akka.japi.Procedure;
 import java.util.List;
+import java.util.function.Consumer;
 import org.eclipse.jdt.annotation.NonNull;
 import org.eclipse.jdt.annotation.Nullable;
 
@@ -88,7 +88,7 @@ public interface ReplicatedLog {
      * Appends an entry to the in-memory log and persists it as well.
      *
      * @param replicatedLogEntry the entry to append
-     * @param callback the Procedure to be notified when persistence is complete (optional).
+     * @param callback the callback to be notified when persistence is complete (optional).
      * @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
      *        call and the execution of the associated callback. If false, subsequent messages are stashed and get
      *        delivered after persistence is complete and the associated callback is executed. In either case the
@@ -96,7 +96,7 @@ public interface ReplicatedLog {
      * @return true if the entry was successfully appended, false otherwise.
      */
     boolean appendAndPersist(@NonNull ReplicatedLogEntry replicatedLogEntry,
-            @Nullable Procedure<ReplicatedLogEntry> callback, boolean doAsync);
+            @Nullable Consumer<ReplicatedLogEntry> callback, boolean doAsync);
 
     /**
      * Returns a list of log entries starting from the given index to the end of the log.
index c53cad6..c22f6f4 100644 (file)
@@ -9,9 +9,9 @@ package org.opendaylight.controller.cluster.raft;
 
 import static java.util.Objects.requireNonNull;
 
-import akka.japi.Procedure;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
 import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
 
@@ -93,7 +93,7 @@ final class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
 
     @Override
     public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback, final boolean doAsync)  {
+            final Consumer<ReplicatedLogEntry> callback, final boolean doAsync)  {
 
         context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
 
@@ -112,24 +112,18 @@ final class ReplicatedLogImpl extends AbstractReplicatedLogImpl {
     }
 
     private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback) {
+            final Consumer<ReplicatedLogEntry> callback) {
         context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
     }
 
-    @SuppressWarnings("checkstyle:illegalCatch")
     private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
-            final Procedure<ReplicatedLogEntry> callback) {
+            final Consumer<ReplicatedLogEntry> callback) {
         context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
 
         dataSizeSinceLastSnapshot += persistedLogEntry.size();
 
         if (callback != null) {
-            try {
-                callback.apply(persistedLogEntry);
-            } catch (Exception e) {
-                context.getLogger().error("{}: persist callback failed", context.getId(), e);
-                throw new IllegalStateException("Persist callback failed", e);
-            }
+            callback.accept(persistedLogEntry);
         }
     }
 }
index 4fa4dbf..b193a54 100644 (file)
@@ -14,7 +14,6 @@ import akka.cluster.Cluster;
 import akka.cluster.ClusterEvent.CurrentClusterState;
 import akka.cluster.Member;
 import akka.cluster.MemberStatus;
-import akka.japi.Procedure;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import java.io.IOException;
@@ -24,6 +23,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import org.eclipse.jdt.annotation.Nullable;
 import org.opendaylight.controller.cluster.messaging.MessageAssembler;
 import org.opendaylight.controller.cluster.raft.RaftActorContext;
@@ -303,7 +303,7 @@ public class Follower extends AbstractRaftActorBehavior {
         // applied to the state already, as the persistence callback occurs async, and we want those entries
         // purged from the persisted log as well.
         final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
-        final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+        final Consumer<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
             final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
             final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1);
             if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
index 8ff6831..5b72cb3 100644 (file)
@@ -13,10 +13,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import akka.japi.Procedure;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -325,8 +325,11 @@ public class AbstractReplicatedLogImplTest {
         }
 
         @Override
-        public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
+        public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Consumer<ReplicatedLogEntry> callback,
                 boolean doAsync) {
+            if (callback != null) {
+                callback.accept(replicatedLogEntry);
+            }
             return true;
         }
 
index 1935e26..8c17e1e 100644 (file)
@@ -12,7 +12,6 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import akka.japi.Procedure;
 import com.google.common.io.ByteSource;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.IOException;
@@ -190,17 +189,11 @@ public class MockRaftActorContext extends RaftActorContextImpl {
         @Override
         @SuppressWarnings("checkstyle:IllegalCatch")
         public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
-                final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
+                final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
             append(replicatedLogEntry);
 
             if (callback != null) {
-                try {
-                    callback.apply(replicatedLogEntry);
-                } catch (RuntimeException e) {
-                    throw e;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
+                callback.accept(replicatedLogEntry);
             }
 
             return true;
index 276ace1..8e9c6c9 100644 (file)
@@ -19,6 +19,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
 import akka.japi.Procedure;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.util.Collections;
+import java.util.function.Consumer;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -92,12 +93,12 @@ public class ReplicatedLogImplTest {
         reset(mockPersistence);
 
         ReplicatedLogEntry logEntry2 = new SimpleReplicatedLogEntry(2, 1, new MockPayload("2"));
-        Procedure<ReplicatedLogEntry> mockCallback = mock(Procedure.class);
+        Consumer<ReplicatedLogEntry> mockCallback = mock(Consumer.class);
         log.appendAndPersist(logEntry2, mockCallback, true);
 
         verifyPersist(logEntry2);
 
-        verify(mockCallback).apply(same(logEntry2));
+        verify(mockCallback).accept(same(logEntry2));
 
         assertEquals("size", 2, log.size());
     }
@@ -107,7 +108,7 @@ public class ReplicatedLogImplTest {
     public void testAppendAndPersisWithDuplicateEntry() throws Exception {
         ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
 
-        Procedure<ReplicatedLogEntry> mockCallback = mock(Procedure.class);
+        Consumer<ReplicatedLogEntry> mockCallback = mock(Consumer.class);
         ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockPayload("1"));
 
         log.appendAndPersist(logEntry, mockCallback, true);

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.