Since we are indirecting through an executor, we are forced
to wrap any exception -- just do not bother, as the callbacks
are executed in the context of an actor anyway (dealing with
RuntimeExceptions) and users are not throwing checked exceptions.
Change-Id: I6cea19ab7192fa42ad3c346d554411cd0d558a64
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
*/
package org.opendaylight.controller.cluster.raft;
*/
package org.opendaylight.controller.cluster.raft;
-import akka.japi.Procedure;
+import java.util.function.Consumer;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
* Appends an entry to the in-memory log and persists it as well.
*
* @param replicatedLogEntry the entry to append
* Appends an entry to the in-memory log and persists it as well.
*
* @param replicatedLogEntry the entry to append
- * @param callback the Procedure to be notified when persistence is complete (optional).
+ * @param callback the callback to be notified when persistence is complete (optional).
* @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
* call and the execution of the associated callback. If false, subsequent messages are stashed and get
* delivered after persistence is complete and the associated callback is executed. In either case the
* @param doAsync if true, the persistent actor can receive subsequent messages to process in between the persist
* call and the execution of the associated callback. If false, subsequent messages are stashed and get
* delivered after persistence is complete and the associated callback is executed. In either case the
* @return true if the entry was successfully appended, false otherwise.
*/
boolean appendAndPersist(@NonNull ReplicatedLogEntry replicatedLogEntry,
* @return true if the entry was successfully appended, false otherwise.
*/
boolean appendAndPersist(@NonNull ReplicatedLogEntry replicatedLogEntry,
- @Nullable Procedure<ReplicatedLogEntry> callback, boolean doAsync);
+ @Nullable Consumer<ReplicatedLogEntry> callback, boolean doAsync);
/**
* Returns a list of log entries starting from the given index to the end of the log.
/**
* Returns a list of log entries starting from the given index to the end of the log.
import static java.util.Objects.requireNonNull;
import static java.util.Objects.requireNonNull;
-import akka.japi.Procedure;
import java.util.Collections;
import java.util.List;
import java.util.Collections;
import java.util.List;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
import org.opendaylight.controller.cluster.raft.persisted.DeleteEntries;
import org.opendaylight.controller.cluster.raft.persisted.Snapshot;
@Override
public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
@Override
public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
- final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
+ final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
context.getLogger().debug("{}: Append log entry and persist {} ", context.getId(), replicatedLogEntry);
}
private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
}
private void persistCallback(final ReplicatedLogEntry persistedLogEntry,
- final Procedure<ReplicatedLogEntry> callback) {
+ final Consumer<ReplicatedLogEntry> callback) {
context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
}
context.getExecutor().execute(() -> syncPersistCallback(persistedLogEntry, callback));
}
- @SuppressWarnings("checkstyle:illegalCatch")
private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
private void syncPersistCallback(final ReplicatedLogEntry persistedLogEntry,
- final Procedure<ReplicatedLogEntry> callback) {
+ final Consumer<ReplicatedLogEntry> callback) {
context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
dataSizeSinceLastSnapshot += persistedLogEntry.size();
if (callback != null) {
context.getLogger().debug("{}: persist complete {}", context.getId(), persistedLogEntry);
dataSizeSinceLastSnapshot += persistedLogEntry.size();
if (callback != null) {
- try {
- callback.apply(persistedLogEntry);
- } catch (Exception e) {
- context.getLogger().error("{}: persist callback failed", context.getId(), e);
- throw new IllegalStateException("Persist callback failed", e);
- }
+ callback.accept(persistedLogEntry);
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import akka.cluster.MemberStatus;
-import akka.japi.Procedure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.messaging.MessageAssembler;
import org.opendaylight.controller.cluster.raft.RaftActorContext;
// applied to the state already, as the persistence callback occurs async, and we want those entries
// purged from the persisted log as well.
final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
// applied to the state already, as the persistence callback occurs async, and we want those entries
// purged from the persisted log as well.
final AtomicBoolean shouldCaptureSnapshot = new AtomicBoolean(false);
- final Procedure<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
+ final Consumer<ReplicatedLogEntry> appendAndPersistCallback = logEntry -> {
final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1);
if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
final List<ReplicatedLogEntry> entries = appendEntries.getEntries();
final ReplicatedLogEntry lastEntryToAppend = entries.get(entries.size() - 1);
if (shouldCaptureSnapshot.get() && logEntry == lastEntryToAppend) {
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import akka.japi.Procedure;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
- public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
+ public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Consumer<ReplicatedLogEntry> callback,
+ if (callback != null) {
+ callback.accept(replicatedLogEntry);
+ }
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.actor.Props;
-import akka.japi.Procedure;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
- final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
+ final Consumer<ReplicatedLogEntry> callback, final boolean doAsync) {
append(replicatedLogEntry);
if (callback != null) {
append(replicatedLogEntry);
if (callback != null) {
- try {
- callback.apply(replicatedLogEntry);
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ callback.accept(replicatedLogEntry);
import akka.japi.Procedure;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
import akka.japi.Procedure;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collections;
+import java.util.function.Consumer;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
reset(mockPersistence);
ReplicatedLogEntry logEntry2 = new SimpleReplicatedLogEntry(2, 1, new MockPayload("2"));
reset(mockPersistence);
ReplicatedLogEntry logEntry2 = new SimpleReplicatedLogEntry(2, 1, new MockPayload("2"));
- Procedure<ReplicatedLogEntry> mockCallback = mock(Procedure.class);
+ Consumer<ReplicatedLogEntry> mockCallback = mock(Consumer.class);
log.appendAndPersist(logEntry2, mockCallback, true);
verifyPersist(logEntry2);
log.appendAndPersist(logEntry2, mockCallback, true);
verifyPersist(logEntry2);
- verify(mockCallback).apply(same(logEntry2));
+ verify(mockCallback).accept(same(logEntry2));
assertEquals("size", 2, log.size());
}
assertEquals("size", 2, log.size());
}
public void testAppendAndPersisWithDuplicateEntry() throws Exception {
ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
public void testAppendAndPersisWithDuplicateEntry() throws Exception {
ReplicatedLog log = ReplicatedLogImpl.newInstance(context);
- Procedure<ReplicatedLogEntry> mockCallback = mock(Procedure.class);
+ Consumer<ReplicatedLogEntry> mockCallback = mock(Consumer.class);
ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockPayload("1"));
log.appendAndPersist(logEntry, mockCallback, true);
ReplicatedLogEntry logEntry = new SimpleReplicatedLogEntry(1, 1, new MockPayload("1"));
log.appendAndPersist(logEntry, mockCallback, true);