This migrates most of sal-akka-raft to use java.util.Optional.
Change-Id: Iaccc760101762dc9d4d647ded80de9a76f1f067b
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.example;
import akka.actor.ActorRef;
import akka.actor.Props;
-import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import org.apache.commons.lang3.SerializationUtils;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
import org.opendaylight.controller.cluster.example.messages.KeyValueSaved;
private long persistIdentifier = 1;
- public ExampleActor(String id, Map<String, String> peerAddresses,
- Optional<ConfigParams> configParams) {
+ public ExampleActor(final String id, final Map<String, String> peerAddresses,
+ final Optional<ConfigParams> configParams) {
super(id, peerAddresses, configParams, (short)0);
setPersistence(true);
roleChangeNotifier = createRoleChangeNotifier(id);
}
@Override
- protected void handleNonRaftCommand(Object message) {
+ protected void handleNonRaftCommand(final Object message) {
if (message instanceof KeyValue) {
if (isLeader()) {
persistData(getSender(), new PayloadIdentifier(persistIdentifier++), (Payload) message, false);
+ ", im-mem journal size=" + getRaftActorContext().getReplicatedLog().size();
}
- public Optional<ActorRef> createRoleChangeNotifier(String actorId) {
+ public Optional<ActorRef> createRoleChangeNotifier(final String actorId) {
ActorRef exampleRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(actorId), actorId + "-notifier");
return Optional.<ActorRef>of(exampleRoleChangeNotifier);
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
try {
if (installSnapshotStream.isPresent()) {
SerializationUtils.serialize((Serializable) state, installSnapshotStream.get());
}
@Override
- public void applySnapshot(Snapshot.State snapshotState) {
+ public void applySnapshot(final Snapshot.State snapshotState) {
state.clear();
state.putAll(((MapState)snapshotState).state);
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
}
@Override
- public void appendRecoveredLogEntry(Payload data) {
+ public void appendRecoveredLogEntry(final Payload data) {
}
@Override
}
@Override
- public void applyRecoverySnapshot(Snapshot.State snapshotState) {
+ public void applyRecoverySnapshot(final Snapshot.State snapshotState) {
}
@Override
@SuppressWarnings("unchecked")
@Override
- public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+ public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
try {
return new MapState((Map<String, String>) SerializationUtils.deserialize(snapshotBytes.read()));
} catch (IOException e) {
Map<String, String> state;
- MapState(Map<String, String> state) {
+ MapState(final Map<String, String> state) {
this.state = state;
}
}
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.example;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
-import com.google.common.base.Optional;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import org.opendaylight.controller.cluster.example.messages.KeyValue;
-import org.opendaylight.controller.cluster.raft.ConfigParams;
public final class Main {
private static final ActorSystem ACTOR_SYSTEM = ActorSystem.create();
}
@SuppressWarnings("checkstyle:RegexpSingleLineJava")
- public static void main(String[] args) throws Exception {
+ public static void main(final String[] args) throws Exception {
ActorRef example1Actor =
ACTOR_SYSTEM.actorOf(ExampleActor.props("example-1",
- withoutPeer("example-1"), Optional.<ConfigParams>absent()), "example-1");
+ withoutPeer("example-1"), Optional.empty()), "example-1");
ActorRef example2Actor =
ACTOR_SYSTEM.actorOf(ExampleActor.props("example-2",
- withoutPeer("example-2"), Optional.<ConfigParams>absent()), "example-2");
+ withoutPeer("example-2"), Optional.empty()), "example-2");
ActorRef example3Actor =
ACTOR_SYSTEM.actorOf(ExampleActor.props("example-3",
- withoutPeer("example-3"), Optional.<ConfigParams>absent()), "example-3");
+ withoutPeer("example-3"), Optional.empty()), "example-3");
List<ActorRef> examples = Arrays.asList(example1Actor, example2Actor, example3Actor);
String actorName = "example-" + num;
examples.add(num - 1,
ACTOR_SYSTEM.actorOf(ExampleActor.props(actorName,
- withoutPeer(actorName), Optional.<ConfigParams>absent()),
- actorName));
+ withoutPeer(actorName), Optional.empty()), actorName));
System.out.println("Created actor : " + actorName);
continue;
}
}
}
- private static Map<String, String> withoutPeer(String peerId) {
+ private static Map<String, String> withoutPeer(final String peerId) {
Map<String, String> without = new HashMap<>(allPeers);
without.remove(peerId);
return without;
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.example;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.ConfigFactory;
import java.io.BufferedReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.opendaylight.controller.cluster.example.messages.PrintRole;
import org.opendaylight.controller.cluster.example.messages.PrintState;
* AbstractUptypedActor and AbstractUptypedPersistentActor would need to be commented out.
* Also RaftActor handleCommand(), debug log which prints for every command other than AE/AER
*/
- public static void main(String[] args) throws Exception {
+ public static void main(final String[] args) throws Exception {
actorSystem = ActorSystem.create("raft-test", ConfigFactory
.load().getConfig("raft-test"));
}
// create the listener using a separate actor system for each example actor
- private static void createClusterRoleChangeListener(List<String> memberIds) {
+ private static void createClusterRoleChangeListener(final List<String> memberIds) {
System.out.println("memberIds=" + memberIds);
for (String memberId : memberIds) {
ActorRef listenerActor = listenerActorSystem.actorOf(
}
}
- public static ActorRef createExampleActor(String name) {
+ public static ActorRef createExampleActor(final String name) {
return actorSystem.actorOf(ExampleActor.props(name, withoutPeer(name),
Optional.of(configParams)), name);
}
- public void createNodes(int num) {
+ public void createNodes(final int num) {
for (int i = 0; i < num; i++) {
nameCounter = nameCounter + 1;
allPeers.put("example-" + nameCounter, "akka://raft-test/user/example-" + nameCounter);
}
// add num clients to all nodes in the system
- public void addClients(int num) {
+ public void addClients(final int num) {
for (Map.Entry<String, ActorRef> actorRefEntry : actorRefs.entrySet()) {
for (int i = 0; i < num; i++) {
String clientName = "client-" + i + "-" + actorRefEntry.getKey();
}
// add num clients to a node
- public void addClientsToNode(String actorName, int num) {
+ public void addClientsToNode(final String actorName, final int num) {
ActorRef actorRef = actorRefs.get(actorName);
for (int i = 0; i < num; i++) {
String clientName = "client-" + i + "-" + actorName;
}
}
- public void stopNode(String actorName) {
+ public void stopNode(final String actorName) {
ActorRef actorRef = actorRefs.get(actorName);
for (Map.Entry<String,ActorRef> entry : clientActorRefs.entrySet()) {
allPeers.remove(actorName);
}
- public void reinstateNode(String actorName) {
+ public void reinstateNode(final String actorName) {
String address = "akka://default/user/" + actorName;
allPeers.put(actorName, address);
}
}
- public void startLoggingForClient(ActorRef client) {
+ public void startLoggingForClient(final ActorRef client) {
logGenerator.startLoggingForClient(client);
}
}
}
- public void stopLoggingForClient(ActorRef client) {
+ public void stopLoggingForClient(final ActorRef client) {
logGenerator.stopLoggingForClient(client);
}
}
- private static Map<String, String> withoutPeer(String peerId) {
+ private static Map<String, String> withoutPeer(final String peerId) {
Map<String, String> without = new ConcurrentHashMap<>(allPeers);
without.remove(peerId);
import akka.actor.PoisonPill;
import akka.actor.Status;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.eclipse.jdt.annotation.NonNull;
import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.Nullable;
import org.opendaylight.controller.cluster.raft.base.messages.LeaderTransitioning;
}
public Optional<String> getRequestedFollowerId() {
- return Optional.fromNullable(requestedFollowerId);
+ return Optional.ofNullable(requestedFollowerId);
}
interface OnComplete {
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNull;
import org.eclipse.jdt.annotation.Nullable;
private final Stopwatch isolatedLeaderCheck = Stopwatch.createStarted();
private @Nullable LeadershipTransferContext leadershipTransferContext;
- Leader(RaftActorContext context, @Nullable AbstractLeader initializeFromLeader) {
+ Leader(final RaftActorContext context, @Nullable final AbstractLeader initializeFromLeader) {
super(context, RaftState.Leader, initializeFromLeader);
}
- public Leader(RaftActorContext context) {
+ public Leader(final RaftActorContext context) {
this(context, null);
}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object originalMessage) {
+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object originalMessage) {
requireNonNull(sender, "sender should not be null");
if (ISOLATED_LEADER_CHECK.equals(originalMessage)) {
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+ protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+ final AppendEntriesReply appendEntriesReply) {
RaftActorBehavior returnBehavior = super.handleAppendEntriesReply(sender, appendEntriesReply);
tryToCompleteLeadershipTransfer(appendEntriesReply.getFollowerId());
return returnBehavior;
*
* @param leadershipTransferCohort the cohort participating in the leadership transfer
*/
- public void transferLeadership(@NonNull RaftActorLeadershipTransferCohort leadershipTransferCohort) {
+ public void transferLeadership(@NonNull final RaftActorLeadershipTransferCohort leadershipTransferCohort) {
log.debug("{}: Attempting to transfer leadership", logName());
leadershipTransferContext = new LeadershipTransferContext(leadershipTransferCohort);
sendAppendEntries(0, false);
}
- private void tryToCompleteLeadershipTransfer(String followerId) {
+ private void tryToCompleteLeadershipTransfer(final String followerId) {
if (leadershipTransferContext == null) {
return;
}
}
@VisibleForTesting
- void markFollowerActive(String followerId) {
+ void markFollowerActive(final String followerId) {
getFollower(followerId).markFollowerActive();
}
@VisibleForTesting
- void markFollowerInActive(String followerId) {
+ void markFollowerInActive(final String followerId) {
getFollower(followerId).markFollowerInActive();
}
RaftActorLeadershipTransferCohort transferCohort;
Stopwatch timer = Stopwatch.createStarted();
- LeadershipTransferContext(RaftActorLeadershipTransferCohort transferCohort) {
+ LeadershipTransferContext(final RaftActorLeadershipTransferCohort transferCohort) {
this.transferCohort = transferCohort;
}
- boolean isExpired(long timeout) {
+ boolean isExpired(final long timeout) {
if (timer.elapsed(TimeUnit.MILLISECONDS) >= timeout) {
transferCohort.abortTransfer();
return true;
import akka.actor.ActorRef;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.OutputStream;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.After;
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
}
@Override
- public void applySnapshot(Snapshot.State snapshotState) {
+ public void applySnapshot(final Snapshot.State snapshotState) {
}
@Override
- public State deserializeSnapshot(ByteSource snapshotBytes) {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) {
throw new UnsupportedOperationException();
}
};
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(String id, boolean persistent,
- Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
+ private TestActorRef<MockRaftActor> doTestSnapshotAfterStartupWithMigratedMessage(final String id,
+ final boolean persistent, final Consumer<Snapshot> snapshotVerifier, final State snapshotState) {
InMemorySnapshotStore.addSnapshotSavedLatch(id);
InMemoryJournal.addDeleteMessagesCompleteLatch(id);
DefaultConfigParamsImpl config = new DefaultConfigParamsImpl();
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
}
@Override
- public void applySnapshot(State newState) {
+ public void applySnapshot(final State newState) {
}
@Override
- public State deserializeSnapshot(ByteSource snapshotBytes) {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) {
throw new UnsupportedOperationException();
}
};
import akka.actor.ActorRef;
import akka.actor.Props;
import com.google.common.base.Function;
-import com.google.common.base.Optional;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
- Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
+ Collections.emptyMap(), Optional.ofNullable(builder.config), PAYLOAD_VERSION);
state = Collections.synchronizedList(new ArrayList<>());
this.actorDelegate = mock(RaftActor.class);
this.recoveryCohortDelegate = mock(RaftActorRecoveryCohort.class);
}
@Override
- public void createSnapshot(final ActorRef actorRef, final java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
LOG.info("{}: createSnapshot called", persistenceId());
snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
}
@Override
protected Optional<ActorRef> getRoleChangeNotifier() {
- return Optional.fromNullable(roleChangeNotifier);
+ return Optional.ofNullable(roleChangeNotifier);
}
@Override public String persistenceId() {
private ActorRef roleChangeNotifier;
private RaftActorSnapshotMessageSupport snapshotMessageSupport;
private Snapshot restoreFromSnapshot;
- private Optional<Boolean> persistent = Optional.absent();
+ private Optional<Boolean> persistent = Optional.empty();
private final Class<A> actorClass;
private Function<Runnable, Void> pauseLeaderFunction;
private RaftActorSnapshotCohort snapshotCohort;
import static org.junit.Assert.assertEquals;
import akka.actor.ActorRef;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.util.Arrays;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import akka.dispatch.Dispatchers;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
-import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.SerializationUtils;
import org.junit.After;
addServerReply = testKit.expectMsgClass(Duration.ofSeconds(5), AddServerReply.class);
assertEquals("getStatus", ServerChangeStatus.OK, addServerReply.getStatus());
- assertEquals("getLeaderHint", java.util.Optional.of(LEADER_ID), addServerReply.getLeaderHint());
+ assertEquals("getLeaderHint", Optional.of(LEADER_ID), addServerReply.getLeaderHint());
expectFirstMatching(leaderCollectorActor, ApplyState.class);
assertEquals("Leader journal last index", 1, leaderActorContext.getReplicatedLog().lastIndex());
LOG.info("testChangeToVotingWithNoLeaderAndOtherLeaderElected ending");
}
- private static void verifyRaftState(RaftState expState, RaftActor... raftActors) {
+ private static void verifyRaftState(final RaftState expState, final RaftActor... raftActors) {
Stopwatch sw = Stopwatch.createStarted();
while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
for (RaftActor raftActor : raftActors) {
fail("None of the RaftActors have state " + expState);
}
- private static ServerInfo votingServer(String id) {
+ private static ServerInfo votingServer(final String id) {
return new ServerInfo(id, true);
}
- private static ServerInfo nonVotingServer(String id) {
+ private static ServerInfo nonVotingServer(final String id) {
return new ServerInfo(id, false);
}
- private ActorRef newLeaderCollectorActor(MockLeaderRaftActor leaderRaftActor) {
+ private ActorRef newLeaderCollectorActor(final MockLeaderRaftActor leaderRaftActor) {
return newCollectorActor(leaderRaftActor, LEADER_ID);
}
- private ActorRef newCollectorActor(AbstractMockRaftActor raftActor, String id) {
+ private ActorRef newCollectorActor(final AbstractMockRaftActor raftActor, final String id) {
ActorRef collectorActor = actorFactory.createTestActor(
MessageCollectorActor.props(), actorFactory.generateActorId(id + "Collector"));
raftActor.setCollectorActor(collectorActor);
return collectorActor;
}
- private static void verifyServerConfigurationPayloadEntry(ReplicatedLog log, ServerInfo... expected) {
+ private static void verifyServerConfigurationPayloadEntry(final ReplicatedLog log, final ServerInfo... expected) {
ReplicatedLogEntry logEntry = log.get(log.lastIndex());
assertEquals("Last log entry payload class", ServerConfigurationPayload.class, logEntry.getData().getClass());
ServerConfigurationPayload payload = (ServerConfigurationPayload)logEntry.getData();
assertEquals("Server config", Sets.newHashSet(expected), Sets.newHashSet(payload.getServerConfig()));
}
- private static RaftActorContextImpl newFollowerContext(String id, TestActorRef<? extends AbstractActor> actor) {
+ private static RaftActorContextImpl newFollowerContext(final String id,
+ final TestActorRef<? extends AbstractActor> actor) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
private volatile ActorRef collectorActor;
private volatile Class<?> dropMessageOfType;
- AbstractMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
- boolean persistent, ActorRef collectorActor) {
+ AbstractMockRaftActor(final String id, final Map<String, String> peerAddresses,
+ final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
super(builder().id(id).peerAddresses(peerAddresses).config(config.get())
.persistent(Optional.of(persistent)));
this.collectorActor = collectorActor;
}
- void setDropMessageOfType(Class<?> dropMessageOfType) {
+ void setDropMessageOfType(final Class<?> dropMessageOfType) {
this.dropMessageOfType = dropMessageOfType;
}
- void setCollectorActor(ActorRef collectorActor) {
+ void setCollectorActor(final ActorRef collectorActor) {
this.collectorActor = collectorActor;
}
@Override
- public void handleCommand(Object message) {
+ public void handleCommand(final Object message) {
if (dropMessageOfType == null || !dropMessageOfType.equals(message.getClass())) {
super.handleCommand(message);
}
public static class CollectingMockRaftActor extends AbstractMockRaftActor {
- CollectingMockRaftActor(String id, Map<String, String> peerAddresses, Optional<ConfigParams> config,
- boolean persistent, ActorRef collectorActor) {
+ CollectingMockRaftActor(final String id, final Map<String, String> peerAddresses,
+ final Optional<ConfigParams> config, final boolean persistent, final ActorRef collectorActor) {
super(id, peerAddresses, config, persistent, collectorActor);
snapshotCohortDelegate = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef,
+ final Optional<OutputStream> installSnapshotStream) {
actorRef.tell(new CaptureSnapshotReply(ByteState.empty(), installSnapshotStream), actorRef);
}
@Override
public void applySnapshot(
- org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
+ final org.opendaylight.controller.cluster.raft.persisted.Snapshot.State snapshotState) {
}
@Override
public org.opendaylight.controller.cluster.raft.persisted.Snapshot.State deserializeSnapshot(
- ByteSource snapshotBytes) {
+ final ByteSource snapshotBytes) {
throw new UnsupportedOperationException();
}
};
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, boolean persistent, ActorRef collectorActor) {
+ final ConfigParams config, final boolean persistent, final ActorRef collectorActor) {
return Props.create(CollectingMockRaftActor.class, id, peerAddresses, Optional.of(config),
persistent, collectorActor);
}
public static class MockLeaderRaftActor extends AbstractMockRaftActor {
- public MockLeaderRaftActor(Map<String, String> peerAddresses, ConfigParams config,
- RaftActorContext fromContext) {
+ public MockLeaderRaftActor(final Map<String, String> peerAddresses, final ConfigParams config,
+ final RaftActorContext fromContext) {
super(LEADER_ID, peerAddresses, Optional.of(config), NO_PERSISTENCE, null);
setPersistence(false);
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
MockSnapshotState snapshotState = new MockSnapshotState(new ArrayList<>(getState()));
if (installSnapshotStream.isPresent()) {
SerializationUtils.serialize(snapshotState, installSnapshotStream.get());
actorRef.tell(new CaptureSnapshotReply(snapshotState, installSnapshotStream), actorRef);
}
- static Props props(Map<String, String> peerAddresses, RaftActorContext fromContext) {
+ static Props props(final Map<String, String> peerAddresses, final RaftActorContext fromContext) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(100, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(10);
}
public static class MockNewFollowerRaftActor extends AbstractMockRaftActor {
- public MockNewFollowerRaftActor(ConfigParams config, ActorRef collectorActor) {
+ public MockNewFollowerRaftActor(final ConfigParams config, final ActorRef collectorActor) {
super(NEW_SERVER_ID, Maps.<String, String>newHashMap(), Optional.of(config), NO_PERSISTENCE,
collectorActor);
setPersistence(false);
}
- static Props props(ConfigParams config, ActorRef collectorActor) {
+ static Props props(final ConfigParams config, final ActorRef collectorActor) {
return Props.create(MockNewFollowerRaftActor.class, config, collectorActor);
}
}
import akka.protobuf.ByteString;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
doReturn(true).when(mockSupport).handleSnapshotMessage(same(applySnapshot), any(ActorRef.class));
mockRaftActor.handleCommand(applySnapshot);
- CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(),
- java.util.Optional.empty());
+ CaptureSnapshotReply captureSnapshotReply = new CaptureSnapshotReply(ByteState.empty(), Optional.empty());
doReturn(true).when(mockSupport).handleSnapshotMessage(same(captureSnapshotReply), any(ActorRef.class));
mockRaftActor.handleCommand(captureSnapshotReply);
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
- leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, java.util.Optional.empty(),
+ leaderActor.getRaftActorContext().getSnapshotManager().persist(snapshotState, Optional.empty(),
Runtime.getRuntime().totalMemory());
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
followerActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
- java.util.Optional.empty()));
+ Optional.empty()));
assertTrue(followerActor.getRaftActorContext().getSnapshotManager().isCapturing());
// The commit is needed to complete the snapshot creation process
new MockRaftActorContext.MockPayload("foo-3"),
new MockRaftActorContext.MockPayload("foo-4")));
leaderActor.handleCommand(new CaptureSnapshotReply(ByteState.of(snapshotBytes.toByteArray()),
- java.util.Optional.empty()));
+ Optional.empty()));
assertTrue(leaderActor.getRaftActorContext().getSnapshotManager().isCapturing());
assertEquals("Real snapshot didn't clear the log till replicatedToAllIndex", 0,
// Now send a CaptureSnapshotReply
mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
- java.util.Optional.empty()), mockActorRef);
+ Optional.empty()), mockActorRef);
// Trimming log in this scenario is a no-op
assertEquals(-1, leaderActor.getReplicatedLog().getSnapshotIndex());
// Now send a CaptureSnapshotReply
mockActorRef.tell(new CaptureSnapshotReply(ByteState.of(fromObject("foo").toByteArray()),
- java.util.Optional.empty()), mockActorRef);
+ Optional.empty()), mockActorRef);
// Trimming log in this scenario is a no-op
assertEquals(3, leaderActor.getReplicatedLog().getSnapshotIndex());
ArgumentCaptor<ActorRef> replyActor = ArgumentCaptor.forClass(ActorRef.class);
verify(mockRaftActor.snapshotCohortDelegate, timeout(5000)).createSnapshot(replyActor.capture(),
- eq(java.util.Optional.empty()));
+ eq(Optional.empty()));
byte[] stateSnapshot = new byte[]{1,2,3};
- replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), java.util.Optional.empty()),
+ replyActor.getValue().tell(new CaptureSnapshotReply(ByteState.of(stateSnapshot), Optional.empty()),
ActorRef.noSender());
GetSnapshotReply reply = kit.expectMsgClass(GetSnapshotReply.class);
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
import akka.protobuf.ByteString;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
-import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
final AtomicReference<MockRaftActor> followerRaftActor) {
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(final ActorRef actorRef,
- final java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
try {
actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
installSnapshotStream), actorRef);
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
* and is available at http://www.eclipse.org/legal/epl-v10.html
*/
-
package org.opendaylight.controller.cluster.raft.behaviors;
import static org.junit.Assert.assertEquals;
import akka.protobuf.ByteString;
import akka.testkit.TestActorRef;
import akka.testkit.javadsl.TestKit;
-import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.SerializationUtils;
actorContext.getReplicatedLog().removeFrom(0);
- AtomicReference<java.util.Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
+ AtomicReference<Optional<OutputStream>> installSnapshotStream = new AtomicReference<>();
actorContext.setCreateSnapshotProcedure(installSnapshotStream::set);
leader = new Leader(actorContext);
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
- doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
MessageCollectorActor.expectFirstMatching(followerActor, AppendEntries.class);
leader.handleMessage(leaderActor, new AppendEntriesReply(FOLLOWER_ID, 1, true, 0, 1, (short)0));
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
- doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
MessageCollectorActor.clearMessages(followerActor);
RaftActorLeadershipTransferCohort mockTransferCohort = mock(RaftActorLeadershipTransferCohort.class);
- doReturn(Optional.absent()).when(mockTransferCohort).getRequestedFollowerId();
+ doReturn(Optional.empty()).when(mockTransferCohort).getRequestedFollowerId();
leader.transferLeadership(mockTransferCohort);
verify(mockTransferCohort, never()).transferComplete();
import akka.serialization.JavaSerializer;
import akka.serialization.Serialization;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import org.eclipse.jdt.annotation.NonNull;
@Override
protected void handleNonRaftCommand(final Object message) {
try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
- final java.util.Optional<Error> maybeError = context.error();
+ final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
maybeError.get());