import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.ObjectOutputStream;
private Optional<SnapshotHolder> snapshotHolder = Optional.absent();
private int minReplicationCount;
- protected AbstractLeader(RaftActorContext context, RaftState state,
- @Nullable AbstractLeader initializeFromLeader) {
+ protected AbstractLeader(final RaftActorContext context, final RaftState state,
+ @Nullable final AbstractLeader initializeFromLeader) {
super(context, state);
appendEntriesMessageSlicer = MessageSlicer.builder().logContext(logName())
scheduleHeartBeat(context.getConfigParams().getHeartBeatInterval());
}
- protected AbstractLeader(RaftActorContext context, RaftState state) {
+ protected AbstractLeader(final RaftActorContext context, final RaftState state) {
this(context, state, null);
}
return followerToLog.keySet();
}
- public void addFollower(String followerId) {
+ public void addFollower(final String followerId) {
FollowerLogInformation followerLogInformation = new FollowerLogInformationImpl(
context.getPeerInfo(followerId), -1, context);
followerToLog.put(followerId, followerLogInformation);
}
}
- public void removeFollower(String followerId) {
+ public void removeFollower(final String followerId) {
followerToLog.remove(followerId);
}
}
@VisibleForTesting
- void setSnapshot(@Nullable SnapshotHolder snapshotHolder) {
+ void setSnapshot(@Nullable final SnapshotHolder snapshotHolder) {
this.snapshotHolder = Optional.fromNullable(snapshotHolder);
}
}
@Override
- protected RaftActorBehavior handleAppendEntries(ActorRef sender,
- AppendEntries appendEntries) {
+ protected RaftActorBehavior handleAppendEntries(final ActorRef sender,
+ final AppendEntries appendEntries) {
log.debug("{}: handleAppendEntries: {}", logName(), appendEntries);
}
@Override
- protected RaftActorBehavior handleAppendEntriesReply(ActorRef sender, AppendEntriesReply appendEntriesReply) {
+ protected RaftActorBehavior handleAppendEntriesReply(final ActorRef sender,
+ final AppendEntriesReply appendEntriesReply) {
log.trace("{}: handleAppendEntriesReply: {}", logName(), appendEntriesReply);
// Update the FollowerLogInformation
}
}
- private boolean updateFollowerLogInformation(FollowerLogInformation followerLogInformation,
- AppendEntriesReply appendEntriesReply) {
+ private boolean updateFollowerLogInformation(final FollowerLogInformation followerLogInformation,
+ final AppendEntriesReply appendEntriesReply) {
boolean updated = followerLogInformation.setMatchIndex(appendEntriesReply.getLogLastIndex());
updated = followerLogInformation.setNextIndex(appendEntriesReply.getLogLastIndex() + 1) || updated;
}
@Override
- protected ClientRequestTracker removeClientRequestTracker(long logIndex) {
+ protected ClientRequestTracker removeClientRequestTracker(final long logIndex) {
final Iterator<ClientRequestTracker> it = trackers.iterator();
while (it.hasNext()) {
final ClientRequestTracker t = it.next();
}
@Override
- protected RaftActorBehavior handleRequestVoteReply(ActorRef sender,
- RequestVoteReply requestVoteReply) {
+ protected RaftActorBehavior handleRequestVoteReply(final ActorRef sender,
+ final RequestVoteReply requestVoteReply) {
return this;
}
protected void beforeSendHeartbeat(){}
@Override
- public RaftActorBehavior handleMessage(ActorRef sender, Object message) {
+ public RaftActorBehavior handleMessage(final ActorRef sender, final Object message) {
Preconditions.checkNotNull(sender, "sender should not be null");
if (appendEntriesMessageSlicer.handleMessage(message)) {
return this;
}
- private void handleInstallSnapshotReply(InstallSnapshotReply reply) {
+ private void handleInstallSnapshotReply(final InstallSnapshotReply reply) {
log.debug("{}: handleInstallSnapshotReply: {}", logName(), reply);
String followerId = reply.getFollowerId();
return false;
}
- private void replicate(Replicate replicate) {
+ private void replicate(final Replicate replicate) {
long logIndex = replicate.getReplicatedLogEntry().getIndex();
log.debug("{}: Replicate message: identifier: {}, logIndex: {}, payload: {}, isSendImmediate: {}", logName(),
}
}
- protected void sendAppendEntries(long timeSinceLastActivityInterval, boolean isHeartbeat) {
+ protected void sendAppendEntries(final long timeSinceLastActivityInterval, final boolean isHeartbeat) {
// Send an AppendEntries to all followers
for (Entry<String, FollowerLogInformation> e : followerToLog.entrySet()) {
final String followerId = e.getKey();
* This method checks if any update needs to be sent to the given follower. This includes append log entries,
* sending next snapshot chunk, and initiating a snapshot.
*/
- private void sendUpdatesToFollower(String followerId, FollowerLogInformation followerLogInformation,
- boolean sendHeartbeat, boolean isHeartbeat) {
+ private void sendUpdatesToFollower(final String followerId, final FollowerLogInformation followerLogInformation,
+ final boolean sendHeartbeat, final boolean isHeartbeat) {
ActorSelection followerActor = context.getPeerActorSelection(followerId);
if (followerActor != null) {
* @param followerId the id of the follower.
* @return true if capture was initiated, false otherwise.
*/
- public boolean initiateCaptureSnapshot(String followerId) {
+ public boolean initiateCaptureSnapshot(final String followerId) {
FollowerLogInformation followerLogInfo = followerToLog.get(followerId);
if (snapshotHolder.isPresent()) {
// If a snapshot is present in the memory, most likely another install is in progress no need to capture
return captureInitiated;
}
- private boolean canInstallSnapshot(long nextIndex) {
+ private boolean canInstallSnapshot(final long nextIndex) {
// If the follower's nextIndex is -1 then we might as well send it a snapshot
// Otherwise send it a snapshot only if the nextIndex is not present in the log but is present
// in the snapshot
* Sends a snapshot chunk to a given follower
* InstallSnapshot should qualify as a heartbeat too.
*/
- private void sendSnapshotChunk(ActorSelection followerActor, FollowerLogInformation followerLogInfo) {
+ private void sendSnapshotChunk(final ActorSelection followerActor, final FollowerLogInformation followerLogInfo) {
if (snapshotHolder.isPresent()) {
LeaderInstallSnapshotState installSnapshotState = followerLogInfo.getInstallSnapshotState();
if (installSnapshotState == null) {
actor()
);
- log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
- installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
} catch (IOException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
+
+ log.debug("{}: InstallSnapshot sent to follower {}, Chunk: {}/{}", logName(), followerActor.path(),
+ installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks());
}
}
}
}
- private void scheduleHeartBeat(FiniteDuration interval) {
+ private void scheduleHeartBeat(final FiniteDuration interval) {
if (followerToLog.isEmpty()) {
// Optimization - do not bother scheduling a heartbeat as there are
// no followers
}
@VisibleForTesting
- public FollowerLogInformation getFollower(String followerId) {
+ public FollowerLogInformation getFollower(final String followerId) {
return followerToLog.get(followerId);
}
private final long lastIncludedIndex;
private final ByteSource snapshotBytes;
- SnapshotHolder(Snapshot snapshot, ByteSource snapshotBytes) {
+ SnapshotHolder(final Snapshot snapshot, final ByteSource snapshotBytes) {
this.lastIncludedTerm = snapshot.getLastAppliedTerm();
this.lastIncludedIndex = snapshot.getLastAppliedIndex();
this.snapshotBytes = snapshotBytes;
*/
package org.opendaylight.controller.cluster.raft.behaviors;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
private long snapshotSize;
private InputStream snapshotInputStream;
- LeaderInstallSnapshotState(int snapshotChunkSize, String logName) {
+ LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
this.snapshotChunkSize = snapshotChunkSize;
this.logName = logName;
}
- void setSnapshotBytes(ByteSource snapshotBytes) throws IOException {
+ void setSnapshotBytes(final ByteSource snapshotBytes) throws IOException {
if (this.snapshotBytes != null) {
return;
}
|| replyReceivedForOffset == offset);
}
- boolean isLastChunk(int index) {
+ boolean isLastChunk(final int index) {
return totalChunks == index;
}
- void markSendStatus(boolean success) {
+ void markSendStatus(final boolean success) {
if (success) {
// if the chunk sent was successful
replyReceivedForOffset = offset;
try {
snapshotInputStream = snapshotBytes.openStream();
} catch (IOException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
import akka.actor.Props;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
final CountDownLatch snapshotCommitted = new CountDownLatch(1);
private final Function<Runnable, Void> pauseLeaderFunction;
- protected MockRaftActor(AbstractBuilder<?, ?> builder) {
+ protected MockRaftActor(final AbstractBuilder<?, ?> builder) {
super(builder.id, builder.peerAddresses != null ? builder.peerAddresses :
Collections.<String, String>emptyMap(), Optional.fromNullable(builder.config), PAYLOAD_VERSION);
state = new ArrayList<>();
pauseLeaderFunction = builder.pauseLeaderFunction;
}
- public void setRaftActorRecoverySupport(RaftActorRecoverySupport support) {
+ public void setRaftActorRecoverySupport(final RaftActorRecoverySupport support) {
raftActorRecoverySupport = support;
}
try {
assertEquals("Recovery complete", true, recoveryComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
try {
assertEquals("Behavior initialized", true, initializeBehaviorComplete.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}
@Override
- protected void applyState(ActorRef clientActor, Identifier identifier, Object data) {
+ protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
actorDelegate.applyState(clientActor, identifier, data);
LOG.info("{}: applyState called: {}", persistenceId(), data);
}
@Override
- public void startLogRecoveryBatch(int maxBatchSize) {
+ public void startLogRecoveryBatch(final int maxBatchSize) {
}
@Override
- public void appendRecoveredLogEntry(Payload data) {
+ public void appendRecoveredLogEntry(final Payload data) {
state.add(data);
}
}
@Override
- public void applyRecoverySnapshot(Snapshot.State newState) {
+ public void applyRecoverySnapshot(final Snapshot.State newState) {
recoveryCohortDelegate.applyRecoverySnapshot(newState);
applySnapshotState(newState);
}
- private void applySnapshotState(Snapshot.State newState) {
+ private void applySnapshotState(final Snapshot.State newState) {
if (newState instanceof MockSnapshotState) {
state.clear();
state.addAll(((MockSnapshotState)newState).getState());
}
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final java.util.Optional<OutputStream> installSnapshotStream) {
LOG.info("{}: createSnapshot called", persistenceId());
snapshotCohortDelegate.createSnapshot(actorRef, installSnapshotStream);
}
@Override
- public void applySnapshot(Snapshot.State newState) {
+ public void applySnapshot(final Snapshot.State newState) {
LOG.info("{}: applySnapshot called", persistenceId());
applySnapshotState(newState);
snapshotCohortDelegate.applySnapshot(newState);
}
@Override
- public Snapshot.State deserializeSnapshot(ByteSource snapshotBytes) {
+ public Snapshot.State deserializeSnapshot(final ByteSource snapshotBytes) {
try {
return (Snapshot.State) SerializationUtils.deserialize(snapshotBytes.read());
} catch (IOException e) {
return this.getId();
}
- protected void newBehavior(RaftActorBehavior newBehavior) {
+ protected void newBehavior(final RaftActorBehavior newBehavior) {
self().tell(newBehavior, ActorRef.noSender());
}
}
@Override
- protected void pauseLeader(Runnable operation) {
+ protected void pauseLeader(final Runnable operation) {
if (pauseLeaderFunction != null) {
pauseLeaderFunction.apply(operation);
} else {
}
}
- public static List<Object> fromState(Snapshot.State from) {
+ public static List<Object> fromState(final Snapshot.State from) {
if (from instanceof MockSnapshotState) {
return ((MockSnapshotState)from).getState();
}
return restoreFromSnapshot;
}
- public static Props props(final String id, final Map<String, String> peerAddresses, ConfigParams config) {
+ public static Props props(final String id, final Map<String, String> peerAddresses, final ConfigParams config) {
return builder().id(id).peerAddresses(peerAddresses).config(config).props();
}
public static Props props(final String id, final Map<String, String> peerAddresses,
- ConfigParams config, DataPersistenceProvider dataPersistenceProvider) {
+ final ConfigParams config, final DataPersistenceProvider dataPersistenceProvider) {
return builder().id(id).peerAddresses(peerAddresses).config(config)
.dataPersistenceProvider(dataPersistenceProvider).props();
}
private Function<Runnable, Void> pauseLeaderFunction;
private RaftActorSnapshotCohort snapshotCohort;
- protected AbstractBuilder(Class<A> actorClass) {
+ protected AbstractBuilder(final Class<A> actorClass) {
this.actorClass = actorClass;
}
return (T) this;
}
- public T id(String newId) {
+ public T id(final String newId) {
this.id = newId;
return self();
}
- public T peerAddresses(Map<String, String> newPeerAddresses) {
+ public T peerAddresses(final Map<String, String> newPeerAddresses) {
this.peerAddresses = newPeerAddresses;
return self();
}
- public T config(ConfigParams newConfig) {
+ public T config(final ConfigParams newConfig) {
this.config = newConfig;
return self();
}
- public T dataPersistenceProvider(DataPersistenceProvider newDataPersistenceProvider) {
+ public T dataPersistenceProvider(final DataPersistenceProvider newDataPersistenceProvider) {
this.dataPersistenceProvider = newDataPersistenceProvider;
return self();
}
- public T roleChangeNotifier(ActorRef newRoleChangeNotifier) {
+ public T roleChangeNotifier(final ActorRef newRoleChangeNotifier) {
this.roleChangeNotifier = newRoleChangeNotifier;
return self();
}
- public T snapshotMessageSupport(RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
+ public T snapshotMessageSupport(final RaftActorSnapshotMessageSupport newSnapshotMessageSupport) {
this.snapshotMessageSupport = newSnapshotMessageSupport;
return self();
}
- public T restoreFromSnapshot(Snapshot newRestoreFromSnapshot) {
+ public T restoreFromSnapshot(final Snapshot newRestoreFromSnapshot) {
this.restoreFromSnapshot = newRestoreFromSnapshot;
return self();
}
- public T persistent(Optional<Boolean> newPersistent) {
+ public T persistent(final Optional<Boolean> newPersistent) {
this.persistent = newPersistent;
return self();
}
- public T pauseLeaderFunction(Function<Runnable, Void> newPauseLeaderFunction) {
+ public T pauseLeaderFunction(final Function<Runnable, Void> newPauseLeaderFunction) {
this.pauseLeaderFunction = newPauseLeaderFunction;
return self();
}
- public T snapshotCohort(RaftActorSnapshotCohort newSnapshotCohort) {
+ public T snapshotCohort(final RaftActorSnapshotCohort newSnapshotCohort) {
this.snapshotCohort = newSnapshotCohort;
return self();
}
private final List<Object> state;
- public MockSnapshotState(List<Object> state) {
+ public MockSnapshotState(final List<Object> state) {
this.state = state;
}
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.Procedure;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.OutputStream;
}
@Override
- public void update(long newTerm, String newVotedFor) {
+ public void update(final long newTerm, final String newVotedFor) {
this.currentTerm = newTerm;
this.votedFor = newVotedFor;
// TODO : Write to some persistent state
}
- @Override public void updateAndPersist(long newTerm, String newVotedFor) {
+ @Override public void updateAndPersist(final long newTerm, final String newVotedFor) {
update(newTerm, newVotedFor);
}
};
setReplicatedLog(new MockReplicatedLogBuilder().build());
}
- public MockRaftActorContext(String id, ActorSystem system, ActorRef actor) {
+ public MockRaftActorContext(final String id, final ActorSystem system, final ActorRef actor) {
super(actor, null, id, newElectionTerm(), -1, -1, new HashMap<>(),
new DefaultConfigParamsImpl(), new NonPersistentDataProvider(),
applyState -> actor.tell(applyState, actor), LOG);
setLastApplied(replicatedLog.lastIndex());
}
- @Override public ActorRef actorOf(Props props) {
+ @Override public ActorRef actorOf(final Props props) {
return system.actorOf(props);
}
- @Override public ActorSelection actorSelection(String path) {
+ @Override public ActorSelection actorSelection(final String path) {
return system.actorSelection(path);
}
return this.system;
}
- @Override public ActorSelection getPeerActorSelection(String peerId) {
+ @Override public ActorSelection getPeerActorSelection(final String peerId) {
String peerAddress = getPeerAddress(peerId);
if (peerAddress != null) {
return actorSelection(peerAddress);
return null;
}
- public void setPeerAddresses(Map<String, String> peerAddresses) {
+ public void setPeerAddresses(final Map<String, String> peerAddresses) {
for (String id: getPeerIds()) {
removePeer(id);
}
snapshotManager.setSnapshotCohort(new RaftActorSnapshotCohort() {
@Override
- public State deserializeSnapshot(ByteSource snapshotBytes) throws IOException {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) throws IOException {
return ByteState.of(snapshotBytes.read());
}
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef, final Optional<OutputStream> installSnapshotStream) {
}
@Override
- public void applySnapshot(State snapshotState) {
+ public void applySnapshot(final State snapshotState) {
}
});
return snapshotManager;
}
- public void setCreateSnapshotProcedure(Consumer<Optional<OutputStream>> createSnapshotProcedure) {
+ public void setCreateSnapshotProcedure(final Consumer<Optional<OutputStream>> createSnapshotProcedure) {
this.createSnapshotProcedure = createSnapshotProcedure;
}
return raftPolicy != null ? raftPolicy : super.getRaftPolicy();
}
- public void setRaftPolicy(RaftPolicy raftPolicy) {
+ public void setRaftPolicy(final RaftPolicy raftPolicy) {
this.raftPolicy = raftPolicy;
}
}
@Override
- public void captureSnapshotIfReady(ReplicatedLogEntry replicatedLogEntry) {
+ public void captureSnapshotIfReady(final ReplicatedLogEntry replicatedLogEntry) {
}
@Override
- public boolean shouldCaptureSnapshot(long logIndex) {
+ public boolean shouldCaptureSnapshot(final long logIndex) {
return false;
}
@Override
- public boolean removeFromAndPersist(long index) {
+ public boolean removeFromAndPersist(final long index) {
return removeFrom(index) >= 0;
}
@Override
@SuppressWarnings("checkstyle:IllegalCatch")
- public boolean appendAndPersist(ReplicatedLogEntry replicatedLogEntry, Procedure<ReplicatedLogEntry> callback,
- boolean doAsync) {
+ public boolean appendAndPersist(final ReplicatedLogEntry replicatedLogEntry,
+ final Procedure<ReplicatedLogEntry> callback, final boolean doAsync) {
append(replicatedLogEntry);
if (callback != null) {
try {
callback.apply(replicatedLogEntry);
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
public MockPayload() {
}
- public MockPayload(String data) {
+ public MockPayload(final String data) {
this.value = data;
size = value.length();
}
- public MockPayload(String data, int size) {
+ public MockPayload(final String data, final int size) {
this(data);
this.size = size;
}
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
public static class MockReplicatedLogBuilder {
private final ReplicatedLog mockLog = new SimpleReplicatedLog();
- public MockReplicatedLogBuilder createEntries(int start, int end, int term) {
+ public MockReplicatedLogBuilder createEntries(final int start, final int end, final int term) {
for (int i = start; i < end; i++) {
this.mockLog.append(new SimpleReplicatedLogEntry(i, term,
new MockRaftActorContext.MockPayload(Integer.toString(i))));
return this;
}
- public MockReplicatedLogBuilder addEntry(int index, int term, MockPayload payload) {
+ public MockReplicatedLogBuilder addEntry(final int index, final int term, final MockPayload payload) {
this.mockLog.append(new SimpleReplicatedLogEntry(index, term, payload));
return this;
}
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.junit.After;
import org.junit.Before;
import org.opendaylight.controller.cluster.raft.DefaultConfigParamsImpl;
Uninterruptibles.awaitUninterruptibly(behaviorStateChangeLatch, 5, TimeUnit.SECONDS));
}
- void expectMessageClass(Class<?> expClass, int expCount) {
+ void expectMessageClass(final Class<?> expClass, final int expCount) {
messagesReceivedLatches.put(expClass, new CountDownLatch(expCount));
}
- void waitForExpectedMessages(Class<?> expClass) {
+ void waitForExpectedMessages(final Class<?> expClass) {
CountDownLatch latch = messagesReceivedLatches.get(expClass);
assertNotNull("No messages received for " + expClass, latch);
assertTrue("Missing messages of type " + expClass,
Uninterruptibles.awaitUninterruptibly(latch, 5, TimeUnit.SECONDS));
}
- void dropMessagesToBehavior(Class<?> msgClass) {
+ void dropMessagesToBehavior(final Class<?> msgClass) {
dropMessagesToBehavior(msgClass, 1);
}
- void dropMessagesToBehavior(Class<?> msgClass, int expCount) {
+ void dropMessagesToBehavior(final Class<?> msgClass, final int expCount) {
expectMessageClass(msgClass, expCount);
dropMessagesToBehavior.put(msgClass, Boolean.TRUE);
}
super.clear();
}
- void forwardCapturedMessageToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
+ void forwardCapturedMessageToBehavior(final Class<?> msgClass, final ActorRef sender) {
Object message = getFirstMatching(getSelf(), msgClass);
assertNotNull("Message of type " + msgClass + " not received", message);
getSelf().tell(message, sender);
}
- void forwardCapturedMessagesToBehavior(Class<?> msgClass, ActorRef sender) throws Exception {
+ void forwardCapturedMessagesToBehavior(final Class<?> msgClass, final ActorRef sender) {
for (Object m: getAllMatching(getSelf(), msgClass)) {
getSelf().tell(m, sender);
}
}
- <T> T getCapturedMessage(Class<T> msgClass) throws Exception {
+ <T> T getCapturedMessage(final Class<T> msgClass) {
T message = getFirstMatching(getSelf(), msgClass);
assertNotNull("Message of type " + msgClass + " not received", message);
return message;
RaftActorBehavior behavior;
MockRaftActorContext context;
- SetBehavior(RaftActorBehavior behavior, MockRaftActorContext context) {
+ SetBehavior(final RaftActorBehavior behavior, final MockRaftActorContext context) {
this.behavior = behavior;
this.context = context;
}
return configParams;
}
- MockRaftActorContext newRaftActorContext(String id, ActorRef actor,
- Map<String, String> peerAddresses) {
+ MockRaftActorContext newRaftActorContext(final String id, final ActorRef actor,
+ final Map<String, String> peerAddresses) {
MockRaftActorContext context = new MockRaftActorContext(id, system, actor);
context.setPeerAddresses(peerAddresses);
context.getTermInformation().updateAndPersist(1, "");
}
@SuppressWarnings("checkstyle:IllegalCatch")
- void verifyBehaviorState(String name, MemberActor actor, RaftState expState) {
+ void verifyBehaviorState(final String name, final MemberActor actor, final RaftState expState) {
+ RaftState actualState;
try {
- RaftState actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
- Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
- assertEquals(name + " behavior state", expState, actualState);
+ actualState = (RaftState) Await.result(Patterns.ask(actor.self(), GetBehaviorState.INSTANCE,
+ Timeout.apply(5, TimeUnit.SECONDS)), Duration.apply(5, TimeUnit.SECONDS));
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
+ assertEquals(name + " behavior state", expState, actualState);
}
- void initializeLeaderBehavior(MemberActor actor, MockRaftActorContext context, int numActiveFollowers)
- throws Exception {
+ void initializeLeaderBehavior(final MemberActor actor, final MockRaftActorContext context,
+ final int numActiveFollowers) {
// Leader sends immediate heartbeats - we don't care about it so ignore it.
// Sometimes the initial AppendEntries messages go to dead letters, probably b/c the follower actors
// haven't been fully created/initialized by akka. So we try up to 3 times to create the Leader as
}
- TestActorRef<MemberActor> newMemberActor(String name) throws Exception {
+ TestActorRef<MemberActor> newMemberActor(final String name) throws TimeoutException, InterruptedException {
TestActorRef<MemberActor> actor = factory.createTestActor(MemberActor.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), name);
MessageCollectorActor.waitUntilReady(actor);
return actor;
}
- void sendHeartbeat(TestActorRef<MemberActor> leaderActor) {
+ void sendHeartbeat(final TestActorRef<MemberActor> leaderActor) {
Uninterruptibles.sleepUninterruptibly(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
leaderActor.tell(SendImmediateHeartBeat.INSTANCE, ActorRef.noSender());
}
RaftActorBehavior behavior;
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (behavior != null) {
behavior.close();
}
* term the RaftActor gets into the Follower state.
*/
@Test
- public void testHandleRaftRPCWithNewerTerm() throws Exception {
+ public void testHandleRaftRPCWithNewerTerm() {
MockRaftActorContext actorContext = createActorContext();
assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, behaviorActor,
* change it's state and it responds back with a failure.
*/
@Test
- public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() throws Exception {
+ public void testHandleAppendEntriesSenderTermLessThanReceiverTerm() {
MockRaftActorContext context = createActorContext();
short payloadVersion = 5;
context.setPayloadVersion(payloadVersion);
@Test
- public void testHandleAppendEntriesAddSameEntryToLog() throws Exception {
+ public void testHandleAppendEntriesAddSameEntryToLog() {
MockRaftActorContext context = createActorContext();
context.getTermInformation().update(2, "test");
handleAppendEntriesAddSameEntryToLogReply(behaviorActor);
}
- protected void handleAppendEntriesAddSameEntryToLogReply(TestActorRef<MessageCollectorActor> replyActor)
- throws Exception {
+ protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor) {
AppendEntriesReply reply = MessageCollectorActor.getFirstMatching(replyActor, AppendEntriesReply.class);
Assert.assertNull("Expected no AppendEntriesReply", reply);
}
}
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) {
Payload payload = new MockRaftActorContext.MockPayload("");
setLastLogEntry(actorContext, 1, 0, payload);
}
protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(
- MockRaftActorContext actorContext, long term, long index, Payload data) {
+ final MockRaftActorContext actorContext, final long term, final long index, final Payload data) {
return setLastLogEntry(actorContext, new SimpleReplicatedLogEntry(index, term, data));
}
- protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(MockRaftActorContext actorContext,
- ReplicatedLogEntry logEntry) {
+ protected MockRaftActorContext.SimpleReplicatedLog setLastLogEntry(final MockRaftActorContext actorContext,
+ final ReplicatedLogEntry logEntry) {
MockRaftActorContext.SimpleReplicatedLog log = new MockRaftActorContext.SimpleReplicatedLog();
log.append(logEntry);
actorContext.setReplicatedLog(log);
protected abstract T createBehavior(RaftActorContext actorContext);
- protected final T createBehavior(MockRaftActorContext actorContext) {
+ protected final T createBehavior(final MockRaftActorContext actorContext) {
T ret = createBehavior((RaftActorContext)actorContext);
actorContext.setCurrentBehavior(ret);
return ret;
return new MockRaftActorContext();
}
- protected MockRaftActorContext createActorContext(ActorRef actor) {
+ protected MockRaftActorContext createActorContext(final ActorRef actor) {
return new MockRaftActorContext("test", getSystem(), actor);
}
return new RequestVoteReply(100, false);
}
- protected ByteString toByteString(Map<String, String> state) {
+ protected ByteString toByteString(final Map<String, String> state) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
oos.writeObject(state);
}
}
- protected void logStart(String name) {
+ protected void logStart(final String name) {
LoggerFactory.getLogger(getClass()).info("Starting " + name);
}
@Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (candidate != null) {
candidate.close();
}
@Test
@Override
- public void testHandleAppendEntriesAddSameEntryToLog() throws Exception {
+ public void testHandleAppendEntriesAddSameEntryToLog() {
MockRaftActorContext context = createActorContext();
context.getTermInformation().update(2, "test");
return new Candidate(actorContext);
}
- @Override protected MockRaftActorContext createActorContext() {
+ @Override
+ protected MockRaftActorContext createActorContext() {
return new MockRaftActorContext("candidate", getSystem(), candidateActor);
}
@Override
protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
- final ActorRef actorRef, final RaftRPC rpc) throws Exception {
+ final ActorRef actorRef, final RaftRPC rpc) {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
if (rpc instanceof RequestVote) {
assertEquals("New votedFor", ((RequestVote)rpc).getCandidateId(),
public class DelayedMessagesElectionScenarioTest extends AbstractLeaderElectionScenarioTest {
@Test
- public void runTest() throws Exception {
+ public void runTest() {
testLog.info("DelayedMessagesElectionScenarioTest starting");
setupInitialMemberBehaviors();
testLog.info("DelayedMessagesElectionScenarioTest ending");
}
- private void forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2() throws Exception {
+ private void forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2() {
testLog.info("forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2 starting");
// Now forward the original delayed RequestVoteReply from member 3 to member 2 that granted
testLog.info("forwardDelayedRequestVoteReplyFromOriginalFollowerMember3ToMember2 ending");
}
- private void sendElectionTimeoutToFollowerMember3() throws Exception {
+ private void sendElectionTimeoutToFollowerMember3() {
testLog.info("sendElectionTimeoutToFollowerMember3 starting");
// Send ElectionTimeout to member 3 to simulate missing heartbeat from a Leader. member 3
testLog.info("sendElectionTimeoutToFollowerMember3 ending");
}
- private void forwardDelayedRequestVotesToLeaderMember1AndFollowerMember3() throws Exception {
+ private void forwardDelayedRequestVotesToLeaderMember1AndFollowerMember3() {
testLog.info("forwardDelayedRequestVotesToLeaderMember1AndFollowerMember3 starting");
// At this point member 1 and 3 actors have captured the RequestVote messages. First
testLog.info("sendInitialElectionTimeoutToFollowerMember2 ending");
}
- private void setupInitialMemberBehaviors() throws Exception {
+ private void setupInitialMemberBehaviors() {
testLog.info("setupInitialMemberBehaviors starting");
// Create member 2's behavior initially as Follower
member3Actor.clear();
testLog.info("setupInitialMemberBehaviors ending");
-
}
}
import akka.testkit.TestActorRef;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSource;
@Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (follower != null) {
follower.close();
}
}
@Override
- protected Follower createBehavior(RaftActorContext actorContext) {
+ protected Follower createBehavior(final RaftActorContext actorContext) {
return spy(new Follower(actorContext));
}
}
@Override
- protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
MockRaftActorContext context = new MockRaftActorContext("follower", getSystem(), actorRef);
context.setPayloadVersion(payloadVersion);
return context;
@Test
- public void testHandleFirstAppendEntries() throws Exception {
+ public void testHandleFirstAppendEntries() {
logStart("testHandleFirstAppendEntries");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() throws Exception {
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOne() {
logStart("testHandleFirstAppendEntries");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog()
- throws Exception {
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInLog() {
logStart("testHandleFirstAppendEntries");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot()
- throws Exception {
+ public void testHandleFirstAppendEntriesWithPrevIndexMinusOneAndReplicatedToAllIndexPresentInSnapshot() {
logStart("testHandleFirstAppendEntries");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing()
- throws Exception {
+ public void testFirstAppendEntriesWithNoPrevIndexAndReplToAllPresentInSnapshotButCalculatedPrevEntryMissing() {
logStart(
"testFirstAppendEntriesWithNoPrevIndexAndReplicatedToAllPresentInSnapshotButCalculatedPrevEntryMissing");
}
@Test
- public void testHandleSyncUpAppendEntries() throws Exception {
+ public void testHandleSyncUpAppendEntries() {
logStart("testHandleSyncUpAppendEntries");
MockRaftActorContext context = createActorContext();
context.setLastApplied(101);
context.setCommitIndex(101);
- setLastLogEntry(context, 1, 101,
- new MockRaftActorContext.MockPayload(""));
+ setLastLogEntry(context, 1, 101, new MockRaftActorContext.MockPayload(""));
- entries = Arrays.asList(
- newReplicatedLogEntry(2, 101, "foo"));
+ entries = Arrays.asList(newReplicatedLogEntry(2, 101, "foo"));
// The new commitIndex is 101
appendEntries = new AppendEntries(2, "leader-1", 101, 1, entries, 102, 101, (short)0);
syncStatus = MessageCollectorActor.getFirstMatching(followerActor, FollowerInitialSyncUpStatus.class);
assertNull(syncStatus);
-
}
@Test
- public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() throws Exception {
+ public void testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete() {
logStart("testHandleAppendEntriesLeaderChangedBeforeSyncUpComplete");
MockRaftActorContext context = createActorContext();
// We get a new message saying initial status is not done
assertFalse(syncStatus.isInitialSyncDone());
-
}
-
@Test
- public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() throws Exception {
+ public void testHandleAppendEntriesLeaderChangedAfterSyncUpComplete() {
logStart("testHandleAppendEntriesLeaderChangedAfterSyncUpComplete");
MockRaftActorContext context = createActorContext();
// We get a new message saying initial status is not done
assertFalse(syncStatus.isInitialSyncDone());
-
}
-
/**
* This test verifies that when an AppendEntries RPC is received by a RaftActor
* with a commitIndex that is greater than what has been applied to the
* sets it current applied state to the commitIndex of the sender.
*/
@Test
- public void testHandleAppendEntriesWithNewerCommitIndex() throws Exception {
+ public void testHandleAppendEntriesWithNewerCommitIndex() {
logStart("testHandleAppendEntriesWithNewerCommitIndex");
MockRaftActorContext context = createActorContext();
* the follower its applied correctly.
*/
@Test
- public void testHandleInstallSnapshot() throws Exception {
+ public void testHandleInstallSnapshot() {
logStart("testHandleInstallSnapshot");
MockRaftActorContext context = createActorContext();
assertNull("Expected null SnapshotTracker", follower.getSnapshotTracker());
}
-
/**
* Verify that when an AppendEntries is sent to a follower during a snapshot install
* the Follower short-circuits the processing of the AppendEntries message.
*/
@Test
- public void testReceivingAppendEntriesDuringInstallSnapshot() throws Exception {
+ public void testReceivingAppendEntriesDuringInstallSnapshot() {
logStart("testReceivingAppendEntriesDuringInstallSnapshot");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() throws Exception {
+ public void testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader() {
logStart("testReceivingAppendEntriesDuringInstallSnapshotFromDifferentLeader");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() throws Exception {
+ public void testInitialSyncUpWithHandleInstallSnapshotFollowedByAppendEntries() {
logStart("testInitialSyncUpWithHandleInstallSnapshot");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testHandleOutOfSequenceInstallSnapshot() throws Exception {
+ public void testHandleOutOfSequenceInstallSnapshot() {
logStart("testHandleOutOfSequenceInstallSnapshot");
MockRaftActorContext context = createActorContext();
}
@Test
- public void testCaptureSnapshotOnLastEntryInAppendEntries() throws Exception {
+ public void testCaptureSnapshotOnLastEntryInAppendEntries() {
String id = "testCaptureSnapshotOnLastEntryInAppendEntries";
logStart(id);
}
@Test
- public void testCaptureSnapshotOnMiddleEntryInAppendEntries() throws Exception {
+ public void testCaptureSnapshotOnMiddleEntryInAppendEntries() {
String id = "testCaptureSnapshotOnMiddleEntryInAppendEntries";
logStart(id);
}
@Test
- public void testCaptureSnapshotOnAppendEntriesWithUnapplied() throws Exception {
+ public void testCaptureSnapshotOnAppendEntriesWithUnapplied() {
String id = "testCaptureSnapshotOnAppendEntriesWithUnapplied";
logStart(id);
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private RaftActorSnapshotCohort newRaftActorSnapshotCohort(final AtomicReference<MockRaftActor> followerRaftActor) {
+ private static RaftActorSnapshotCohort newRaftActorSnapshotCohort(
+ final AtomicReference<MockRaftActor> followerRaftActor) {
RaftActorSnapshotCohort snapshotCohort = new RaftActorSnapshotCohort() {
@Override
- public void createSnapshot(ActorRef actorRef, java.util.Optional<OutputStream> installSnapshotStream) {
+ public void createSnapshot(final ActorRef actorRef,
+ final java.util.Optional<OutputStream> installSnapshotStream) {
try {
actorRef.tell(new CaptureSnapshotReply(new MockSnapshotState(followerRaftActor.get().getState()),
installSnapshotStream), actorRef);
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@Override
- public void applySnapshot(State snapshotState) {
+ public void applySnapshot(final State snapshotState) {
}
@Override
- public State deserializeSnapshot(ByteSource snapshotBytes) {
+ public State deserializeSnapshot(final ByteSource snapshotBytes) {
throw new UnsupportedOperationException();
}
};
return snapshotCohort;
}
- public byte[] getNextChunk(ByteString bs, int offset, int chunkSize) {
+ public byte[] getNextChunk(final ByteString bs, final int offset, final int chunkSize) {
int snapshotLength = bs.size();
int start = offset;
int size = chunkSize;
return nextChunk;
}
- private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
- String expFollowerId, long expLogLastTerm, long expLogLastIndex) {
+ private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
+ final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex) {
expectAndVerifyAppendEntriesReply(expTerm, expSuccess, expFollowerId, expLogLastTerm, expLogLastIndex, false);
}
- private void expectAndVerifyAppendEntriesReply(int expTerm, boolean expSuccess,
- String expFollowerId, long expLogLastTerm, long expLogLastIndex,
- boolean expForceInstallSnapshot) {
+ private void expectAndVerifyAppendEntriesReply(final int expTerm, final boolean expSuccess,
+ final String expFollowerId, final long expLogLastTerm, final long expLogLastIndex,
+ final boolean expForceInstallSnapshot) {
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(leaderActor,
AppendEntriesReply.class);
}
- private static ReplicatedLogEntry newReplicatedLogEntry(long term, long index, String data) {
+ private static ReplicatedLogEntry newReplicatedLogEntry(final long term, final long index, final String data) {
return new SimpleReplicatedLogEntry(index, term,
new MockRaftActorContext.MockPayload(data));
}
}
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
String expVotedFor = rpc instanceof RequestVote ? ((RequestVote)rpc).getCandidateId() : null;
}
@Override
- protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor)
- throws Exception {
+ protected void handleAppendEntriesAddSameEntryToLogReply(final TestActorRef<MessageCollectorActor> replyActor) {
AppendEntriesReply reply = MessageCollectorActor.expectFirstMatching(replyActor, AppendEntriesReply.class);
assertEquals("isSuccess", true, reply.isSuccess());
}
@Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (isolatedLeader != null) {
isolatedLeader.close();
}
}
@Override
- protected IsolatedLeader createBehavior(RaftActorContext actorContext) {
+ protected IsolatedLeader createBehavior(final RaftActorContext actorContext) {
return new IsolatedLeader(actorContext);
}
}
@Override
- protected MockRaftActorContext createActorContext(ActorRef actor) {
+ protected MockRaftActorContext createActorContext(final ActorRef actor) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setElectionTimeoutFactor(100000);
MockRaftActorContext context = new MockRaftActorContext("isolated-leader", getSystem(), actor);
@Override
@After
- public void tearDown() throws Exception {
+ public void tearDown() {
if (leader != null) {
leader.close();
}
}
- private RaftActorBehavior sendReplicate(MockRaftActorContext actorContext, long index) {
+ private RaftActorBehavior sendReplicate(final MockRaftActorContext actorContext, final long index) {
return sendReplicate(actorContext, 1, index);
}
}
@Override
- protected MockRaftActorContext createActorContext(ActorRef actorRef) {
+ protected MockRaftActorContext createActorContext(final ActorRef actorRef) {
return createActorContext(LEADER_ID, actorRef);
}
- private MockRaftActorContext createActorContext(String id, ActorRef actorRef) {
+ private MockRaftActorContext createActorContext(final String id, final ActorRef actorRef) {
DefaultConfigParamsImpl configParams = new DefaultConfigParamsImpl();
configParams.setHeartBeatInterval(new FiniteDuration(50, TimeUnit.MILLISECONDS));
configParams.setElectionTimeoutFactor(100000);
assertTrue("Expected Leader", newBehavior instanceof Leader);
}
- private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(RaftPolicy raftPolicy) {
+ private RaftActorBehavior setupIsolatedLeaderCheckTestWithTwoFollowers(final RaftPolicy raftPolicy) {
ActorRef followerActor1 = getSystem().actorOf(MessageCollectorActor.props(), "follower-1");
ActorRef followerActor2 = getSystem().actorOf(MessageCollectorActor.props(), "follower-2");
}
@Override
- protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(MockRaftActorContext actorContext,
- ActorRef actorRef, RaftRPC rpc) throws Exception {
+ protected void assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(final MockRaftActorContext actorContext,
+ final ActorRef actorRef, final RaftRPC rpc) {
super.assertStateChangesToFollowerWhenRaftRPCHasNewerTerm(actorContext, actorRef, rpc);
assertEquals("New votedFor", null, actorContext.getTermInformation().getVotedFor());
}
private final long electionTimeOutIntervalMillis;
private final int snapshotChunkSize;
- MockConfigParamsImpl(long electionTimeOutIntervalMillis, int snapshotChunkSize) {
+ MockConfigParamsImpl(final long electionTimeOutIntervalMillis, final int snapshotChunkSize) {
super();
this.electionTimeOutIntervalMillis = electionTimeOutIntervalMillis;
this.snapshotChunkSize = snapshotChunkSize;
private long candidateElectionTerm;
@Test
- public void runTest() throws Exception {
+ public void runTest() {
testLog.info("PartitionedCandidateOnStartupElectionScenarioTest starting");
setupInitialMember1AndMember2Behaviors();
testLog.info("PartitionedCandidateOnStartupElectionScenarioTest ending");
}
- private void sendElectionTimeoutToFollowerMember1() throws Exception {
+ private void sendElectionTimeoutToFollowerMember1() {
testLog.info("sendElectionTimeoutToFollowerMember1 starting");
// At this point we have no leader. Candidate member 3 would continue to start new elections
testLog.info("sendElectionTimeoutToFollowerMember1 ending");
}
- private void resolvePartitionAndSendElectionTimeoutsToCandidateMember3() throws Exception {
+ private void resolvePartitionAndSendElectionTimeoutsToCandidateMember3() {
testLog.info("resolvePartitionAndSendElectionTimeoutsToCandidateMember3 starting");
// Now send a couple more ElectionTimeouts to Candidate member 3 with the partition resolved.
testLog.info("setupPartitionedCandidateMember3AndSendElectionTimeouts ending");
}
- private void setupInitialMember1AndMember2Behaviors() throws Exception {
+ private void setupInitialMember1AndMember2Behaviors() {
testLog.info("setupInitialMember1AndMember2Behaviors starting");
// Initialize the ReplicatedLog and election term info for member 1 and 2. The current term
member3Actor.clear();
testLog.info("setupInitialMember1AndMember2Behaviors ending");
-
}
}
testLog.info("resolvePartitionedLeadersWithLeaderMember2SendingHeartbeatFirst ending");
}
- private void resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst() throws Exception {
+ private void resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst() {
testLog.info("resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst starting");
// Re-establish connectivity between member 2 and 3, ie stop dropping messages between
testLog.info("resolvePartitionedLeadersWithLeaderMember3SendingHeartbeatFirst ending");
}
- private void sendElectionTimeoutToNowCandidateMember2() throws Exception {
+ private void sendElectionTimeoutToNowCandidateMember2() {
testLog.info("sendElectionTimeoutToNowCandidateMember2 starting");
// member 2, now a candidate, is partitioned from the Leader (now member 3) and hasn't received any
testLog.info("sendElectionTimeoutToNowCandidateMember2 ending");
}
- private void sendInitialElectionTimeoutToFollowerMember3() throws Exception {
+ private void sendInitialElectionTimeoutToFollowerMember3() {
testLog.info("sendInitialElectionTimeoutToFollowerMember3 starting");
// Send ElectionTimeout to member 3 to simulate no heartbeat from a Leader (originally member 1).
testLog.info("sendInitialElectionTimeoutToFollowerMember3 ending");
}
- private void sendInitialElectionTimeoutToFollowerMember2() {
+ private void sendInitialElectionTimeoutToFollowerMember2() throws Exception {
testLog.info("sendInitialElectionTimeoutToFollowerMember2 starting");
// Send ElectionTimeout to member 2 to simulate no heartbeat from the Leader (member 1).
}
}
- private byte[] getNextChunk(ByteString bs, int offset, int size) {
+ private static byte[] getNextChunk(final ByteString bs, final int offset, int size) {
int snapshotLength = bs.size();
int start = offset;
if (size > snapshotLength) {
public class SnapshotTest {
@Test
- public void testSerialization() throws Exception {
+ public void testSerialization() {
testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7}, Arrays.asList(
new SimpleReplicatedLogEntry(6, 2, new MockPayload("payload"))));
testSerialization(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9}, Collections.emptyList());
}
- private void testSerialization(byte[] state, List<ReplicatedLogEntry> unapplied) throws Exception {
+ private static void testSerialization(final byte[] state, final List<ReplicatedLogEntry> unapplied) {
long lastIndex = 6;
long lastTerm = 2;
long lastAppliedIndex = 5;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.ArrayList;
import java.util.Collections;
private final List<Object> messages = new ArrayList<>();
- @Override public void onReceive(Object message) throws Exception {
+ @Override public void onReceive(final Object message) throws Exception {
if (ARE_YOU_READY.equals(message)) {
getSender().tell("yes", getSelf());
} else if (GET_ALL_MESSAGES.equals(message)) {
messages.clear();
}
- @SuppressWarnings("unchecked")
- private static List<Object> getAllMessages(ActorRef actor) throws Exception {
+ @SuppressWarnings({"unchecked", "checkstyle:illegalCatch"})
+ private static List<Object> getAllMessages(final ActorRef actor) {
FiniteDuration operationDuration = Duration.create(5, TimeUnit.SECONDS);
Timeout operationTimeout = new Timeout(operationDuration);
Future<Object> future = Patterns.ask(actor, GET_ALL_MESSAGES, operationTimeout);
- return (List<Object>) Await.result(future, operationDuration);
+ try {
+ return (List<Object>) Await.result(future, operationDuration);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- public static void clearMessages(ActorRef actor) {
+ public static void clearMessages(final ActorRef actor) {
actor.tell(CLEAR_MESSAGES, ActorRef.noSender());
}
* @param clazz the class to match
* @return the first matching message
*/
- public static <T> T getFirstMatching(ActorRef actor, Class<T> clazz) throws Exception {
+ public static <T> T getFirstMatching(final ActorRef actor, final Class<T> clazz) {
List<Object> allMessages = getAllMessages(actor);
for (Object message : allMessages) {
}
@SuppressWarnings("checkstyle:IllegalCatch")
- public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count) {
+ public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count) {
return expectMatching(actor, clazz, count, msg -> true);
}
@SuppressWarnings("checkstyle:IllegalCatch")
- public static <T> List<T> expectMatching(ActorRef actor, Class<T> clazz, int count,
- Predicate<T> matcher) {
+ public static <T> List<T> expectMatching(final ActorRef actor, final Class<T> clazz, final int count,
+ final Predicate<T> matcher) {
int timeout = 5000;
Exception lastEx = null;
List<T> messages = Collections.emptyList();
clazz, messages.size(), messages), lastEx);
}
- public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz) {
+ public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz) {
return expectFirstMatching(actor, clazz, 5000);
}
@SuppressWarnings("checkstyle:IllegalCatch")
- public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, long timeout) {
+ public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
Exception lastEx = null;
int count = (int) (timeout / 50);
for (int i = 0; i < count; i++) {
}
@SuppressWarnings("checkstyle:IllegalCatch")
- public static <T> T expectFirstMatching(ActorRef actor, Class<T> clazz, Predicate<T> matcher) {
+ public static <T> T expectFirstMatching(final ActorRef actor, final Class<T> clazz, final Predicate<T> matcher) {
int timeout = 5000;
Exception lastEx = null;
T lastMessage = null;
clazz, lastMessage), lastEx);
}
- public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz) {
+ public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz) {
assertNoneMatching(actor, clazz, 5000);
}
@SuppressWarnings("checkstyle:IllegalCatch")
- public static <T> void assertNoneMatching(ActorRef actor, Class<T> clazz, long timeout) {
+ public static <T> void assertNoneMatching(final ActorRef actor, final Class<T> clazz, final long timeout) {
Exception lastEx = null;
int count = (int) (timeout / 50);
for (int i = 0; i < count; i++) {
}
if (lastEx != null) {
- Throwables.propagate(lastEx);
+ Throwables.throwIfUnchecked(lastEx);
+ throw new RuntimeException(lastEx);
}
return;
}
- public static <T> List<T> getAllMatching(ActorRef actor, Class<T> clazz) throws Exception {
+ public static <T> List<T> getAllMatching(final ActorRef actor, final Class<T> clazz) {
List<Object> allMessages = getAllMessages(actor);
- List<T> output = Lists.newArrayList();
+ List<T> output = new ArrayList<>();
for (Object message : allMessages) {
if (message.getClass().equals(clazz)) {
return output;
}
- public static void waitUntilReady(ActorRef actor) throws Exception {
+ public static void waitUntilReady(final ActorRef actor) throws TimeoutException, InterruptedException {
long timeout = 500;
FiniteDuration duration = Duration.create(timeout, TimeUnit.MILLISECONDS);
for (int i = 0; i < 10; i++) {
<relativePath>../../config/config-parent</relativePath>
</parent>
- <groupId>org.opendaylight.controller</groupId>
<artifactId>sal-clustering-commons</artifactId>
<version>1.6.0-SNAPSHOT</version>
<packaging>bundle</packaging>
private final Integer capacity;
private final FiniteDuration pushTimeOut;
- public MeteredBoundedMailbox(ActorSystem.Settings settings, Config config) {
+ public MeteredBoundedMailbox(final ActorSystem.Settings settings, final Config config) {
CommonConfig commonConfig = new CommonConfig(settings.config());
this.capacity = commonConfig.getMailBoxCapacity();
@Override
- public MeteredMessageQueue create(final scala.Option<ActorRef> owner, scala.Option<ActorSystem> system) {
+ public MeteredMessageQueue create(final scala.Option<ActorRef> owner, final scala.Option<ActorSystem> system) {
final MeteredMessageQueue queue = new MeteredMessageQueue(this.capacity, this.pushTimeOut);
monitorQueueSize(owner, queue);
return queue;
}
- private void monitorQueueSize(scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
+ private static void monitorQueueSize(final scala.Option<ActorRef> owner, final MeteredMessageQueue monitoredQueue) {
registerMetric(owner, QUEUE_SIZE, getQueueSizeGuage(monitoredQueue));
}
return monitoredQueue::size;
}
- static <T extends Metric> void registerMetric(scala.Option<ActorRef> owner, String metricName, T metric) {
+ static <T extends Metric> void registerMetric(final scala.Option<ActorRef> owner, final String metricName,
+ final T metric) {
if (owner.isEmpty()) {
// there's no actor to monitor
return;
public static class MeteredMessageQueue extends BoundedDequeBasedMailbox.MessageQueue {
private static final long serialVersionUID = 1L;
- public MeteredMessageQueue(int capacity, FiniteDuration pushTimeOut) {
+ public MeteredMessageQueue(final int capacity, final FiniteDuration pushTimeOut) {
super(capacity, pushTimeOut);
}
}
meteredActor.onReceive(message);
} catch (Throwable e) {
Throwables.propagateIfPossible(e, Exception.class);
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
} finally {
//stop timers
contextByMsgType.stop();
private final BiConsumer<Object, ActorRef> assembledMessageCallback;
private final String logContext;
- private MessageAssembler(Builder builder) {
+ private MessageAssembler(final Builder builder) {
this.fileBackedStreamFactory = Preconditions.checkNotNull(builder.fileBackedStreamFactory,
"FiledBackedStreamFactory cannot be null");
this.assembledMessageCallback = Preconditions.checkNotNull(builder.assembledMessageCallback,
* @param message the message to check
* @return true if handled, false otherwise
*/
- public static boolean isHandledMessage(Object message) {
+ public static boolean isHandledMessage(final Object message) {
return message instanceof MessageSlice || message instanceof AbortSlicing;
}
messageSlice.getSliceIndex()), true);
}
- private void processMessageSliceForState(final MessageSlice messageSlice, AssembledMessageState state,
+ private void processMessageSliceForState(final MessageSlice messageSlice, final AssembledMessageState state,
final ActorRef sendTo) {
final Identifier identifier = messageSlice.getIdentifier();
final ActorRef replyTo = messageSlice.getReplyTo();
}
}
- private Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
+ private static Object reAssembleMessage(final AssembledMessageState state) throws MessageSliceException {
try {
final ByteSource assembledBytes = state.getAssembledBytes();
try (ObjectInputStream in = new ObjectInputStream(assembledBytes.openStream())) {
}
}
- private void onAbortSlicing(AbortSlicing message) {
+ private void onAbortSlicing(final AbortSlicing message) {
removeState(message.getIdentifier());
}
stateCache.invalidate(identifier);
}
- private void stateRemoved(RemovalNotification<Identifier, AssembledMessageState> notification) {
+ private void stateRemoved(final RemovalNotification<Identifier, AssembledMessageState> notification) {
if (notification.wasEvicted()) {
LOG.warn("{}: AssembledMessageState for {} was expired from the cache", logContext, notification.getKey());
} else {
}
@VisibleForTesting
- boolean hasState(Identifier forIdentifier) {
+ boolean hasState(final Identifier forIdentifier) {
boolean exists = stateCache.getIfPresent(forIdentifier) != null;
stateCache.cleanUp();
return exists;
private final String logContext;
private final long id;
- private MessageSlicer(Builder builder) {
+ private MessageSlicer(final Builder builder) {
this.fileBackedStreamFactory = builder.fileBackedStreamFactory;
this.messageSliceSize = builder.messageSliceSize;
this.maxSlicingTries = builder.maxSlicingTries;
* @param message the message to check
* @return true if handled, false otherwise
*/
- public static boolean isHandledMessage(Object message) {
+ public static boolean isHandledMessage(final Object message) {
return message instanceof MessageSliceReply;
}
*
* @param options the SliceOptions
*/
- public void slice(SliceOptions options) {
+ public void slice(final SliceOptions options) {
final Identifier identifier = options.getIdentifier();
final Serializable message = options.getMessage();
final FileBackedOutputStream fileBackedStream;
}
}
- private void sendTo(SliceOptions options, Object message, ActorRef sender) {
+ private static void sendTo(final SliceOptions options, final Object message, final ActorRef sender) {
if (options.getSendToRef() != null) {
options.getSendToRef().tell(message, sender);
} else {
stateCache.invalidateAll();
}
- private MessageSlice getNextSliceMessage(SlicedMessageState<ActorRef> state) throws IOException {
+ private static MessageSlice getNextSliceMessage(final SlicedMessageState<ActorRef> state) throws IOException {
final byte[] firstSliceBytes = state.getNextSlice();
return new MessageSlice(state.getIdentifier(), firstSliceBytes, state.getCurrentSliceIndex(),
state.getTotalSlices(), state.getLastSliceHashCode(), state.getReplyTarget());
stateCache.invalidate(identifier);
}
- private void stateRemoved(RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
+ private void stateRemoved(final RemovalNotification<Identifier, SlicedMessageState<ActorRef>> notification) {
final SlicedMessageState<ActorRef> state = notification.getValue();
state.close();
if (notification.wasEvicted()) {
}
@VisibleForTesting
- boolean hasState(Identifier forIdentifier) {
+ boolean hasState(final Identifier forIdentifier) {
boolean exists = stateCache.getIfPresent(forIdentifier) != null;
stateCache.cleanUp();
return exists;
package org.opendaylight.controller.cluster.schema.provider.impl;
import com.google.common.annotations.Beta;
-import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.Set;
import javax.annotation.Nonnull;
private final SchemaRepository repository;
private final Set<SourceIdentifier> providedSources;
- public RemoteYangTextSourceProviderImpl(SchemaRepository repository, Set<SourceIdentifier> providedSources) {
- this.repository = repository;
+ public RemoteYangTextSourceProviderImpl(final SchemaRepository repository,
+ final Set<SourceIdentifier> providedSources) {
+ this.repository = Preconditions.checkNotNull(repository);
this.providedSources = providedSources;
}
}
@Override
- public Future<YangTextSchemaSourceSerializationProxy> getYangTextSchemaSource(SourceIdentifier identifier) {
+ public Future<YangTextSchemaSourceSerializationProxy> getYangTextSchemaSource(final SourceIdentifier identifier) {
LOG.trace("Sending yang schema source for {}", identifier);
final Promise<YangTextSchemaSourceSerializationProxy> promise = akka.dispatch.Futures.promise();
- CheckedFuture<YangTextSchemaSource, ?> future =
+ ListenableFuture<YangTextSchemaSource> future =
repository.getSchemaSource(identifier, YangTextSchemaSource.class);
Futures.addCallback(future, new FutureCallback<YangTextSchemaSource>() {
@Override
- public void onSuccess(@Nonnull YangTextSchemaSource result) {
+ public void onSuccess(@Nonnull final YangTextSchemaSource result) {
try {
promise.success(new YangTextSchemaSourceSerializationProxy(result));
} catch (IOException e) {
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
LOG.warn("Unable to retrieve schema source from provider", failure);
promise.failure(failure);
}
- });
+ }, MoreExecutors.directExecutor());
return promise.future();
}
}
@SuppressWarnings("unchecked")
- private void testSlicing(String logContext, int messageSliceSize, int expTotalSlices, byte[] messageData) {
+ private void testSlicing(final String logContext, final int messageSliceSize, final int expTotalSlices,
+ final byte[] messageData) {
reset(mockAssembledMessageCallback);
final BytesMessage message = new BytesMessage(messageData);
assertEquals("Sender ActorRef", sender, senderActorRefCaptor.getValue());
}
- static void assertSuccessfulMessageSliceReply(MessageSliceReply reply, Identifier identifier, int sliceIndex) {
+ static void assertSuccessfulMessageSliceReply(final MessageSliceReply reply, final Identifier identifier,
+ final int sliceIndex) {
assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
.getClientIdentifier());
assertEquals("SliceIndex", sliceIndex, reply.getSliceIndex());
}
- static void assertFailedMessageSliceReply(MessageSliceReply reply, Identifier identifier, boolean isRetriable) {
+ static void assertFailedMessageSliceReply(final MessageSliceReply reply, final Identifier identifier,
+ final boolean isRetriable) {
assertEquals("Identifier", identifier, ((MessageSliceIdentifier)reply.getIdentifier())
.getClientIdentifier());
assertEquals("Failure present", Boolean.TRUE, reply.getFailure().isPresent());
assertEquals("isRetriable", isRetriable, reply.getFailure().get().isRetriable());
}
- static void assertMessageSlice(MessageSlice sliceMessage, Identifier identifier, int sliceIndex, int totalSlices,
- int lastSliceHashCode, ActorRef replyTo) {
+ static void assertMessageSlice(final MessageSlice sliceMessage, final Identifier identifier, final int sliceIndex,
+ final int totalSlices, final int lastSliceHashCode, final ActorRef replyTo) {
assertEquals("Identifier", identifier, ((MessageSliceIdentifier)sliceMessage.getIdentifier())
.getClientIdentifier());
assertEquals("SliceIndex", sliceIndex, sliceMessage.getSliceIndex());
}
}
- private MessageSlicer newMessageSlicer(String logContext, final int messageSliceSize) {
+ private static MessageSlicer newMessageSlicer(final String logContext, final int messageSliceSize) {
return MessageSlicer.builder().messageSliceSize(messageSliceSize).logContext(logContext)
.fileBackedStreamFactory(FILE_BACKED_STREAM_FACTORY).build();
}
import akka.persistence.serialization.Snapshot;
import akka.persistence.serialization.SnapshotSerializer;
import akka.testkit.JavaTestKit;
-import com.google.common.base.Throwables;
import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.io.FileOutputStream;
assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot());
}
+ @SuppressWarnings("checkstyle:illegalThrows")
@Test(expected = IOException.class)
- public void testDoLoadAsyncWithFailure() throws IOException {
+ public void testDoLoadAsyncWithFailure() throws Throwable {
createSnapshotFile(PERSISTENCE_ID, null, 1, 2000);
JavaTestKit probe = new JavaTestKit(system);
snapshotStore.tell(new SnapshotProtocol.LoadSnapshot(PERSISTENCE_ID,
SnapshotSelectionCriteria.latest(), Long.MAX_VALUE), probe.getRef());
LoadSnapshotFailed failed = probe.expectMsgClass(LoadSnapshotFailed.class);
- Throwables.propagateIfInstanceOf(failed.cause(), IOException.class);
+ throw failed.cause();
}
@Test
assertEquals("SelectedSnapshot snapshot", "one", possibleSnapshot.get().snapshot());
}
- private void createSnapshotFile(String persistenceId, String payload, int seqNr, int timestamp) throws IOException {
+ private static void createSnapshotFile(final String persistenceId, final String payload, final int seqNr,
+ final int timestamp) throws IOException {
String name = toSnapshotName(persistenceId, seqNr, timestamp);
try (FileOutputStream fos = new FileOutputStream(new File(SNAPSHOT_DIR, name))) {
if (payload != null) {
}
}
- private static String toSnapshotName(String persistenceId, int seqNr, int timestamp)
+ private static String toSnapshotName(final String persistenceId, final int seqNr, final int timestamp)
throws UnsupportedEncodingException {
final String encodedPersistenceId = URLEncoder.encode(persistenceId, StandardCharsets.UTF_8.name());
return "snapshot-" + encodedPersistenceId + "-" + seqNr + "-" + timestamp;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
* If any of arguments is null.
*/
DOMBrokerTransactionChain(final long chainId, final Map<LogicalDatastoreType, DOMStoreTransactionChain> chains,
- AbstractDOMBroker broker, final TransactionChainListener listener) {
+ final AbstractDOMBroker broker, final TransactionChainListener listener) {
super(chains);
this.chainId = chainId;
this.broker = Preconditions.checkNotNull(broker);
public void onFailure(final Throwable failure) {
transactionFailed(transaction, failure);
}
- });
+ }, MoreExecutors.directExecutor());
return ret;
}
import akka.actor.ActorRef;
import akka.util.Timeout;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
try {
return (DataStoreClient) Await.result(ExplicitAsk.ask(actor, GET_CLIENT_FACTORY,
Timeout.apply(timeout, unit)), Duration.Inf());
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}
final long stamp = lock.readLock();
try {
if (aborted != null) {
- throw Throwables.propagate(aborted);
+ Throwables.throwIfUnchecked(aborted);
+ throw new RuntimeException(aborted);
}
final ClientLocalHistory history = new ClientLocalHistory(this, historyId);
import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.CheckedFuture;
latch.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for latch of {}", successor);
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
return successor;
}
} catch (Exception e) {
LOG.error("Failed to get actor for {}", clientProps, e);
clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- throw Throwables.propagate(e);
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
}
identifier = client.getIdentifier();
while (it.hasNext()) {
firstEx.addSuppressed(it.next().cause());
}
- Throwables.propagateIfInstanceOf(firstEx, ExecutionException.class);
- Throwables.propagateIfInstanceOf(firstEx, TimeoutException.class);
+ Throwables.throwIfInstanceOf(firstEx, ExecutionException.class);
+ Throwables.throwIfInstanceOf(firstEx, TimeoutException.class);
throw new ExecutionException(firstEx);
}
changeStateFrom(currentState, afterState);
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Iterator;
private static final MessageSupplier COMMIT_MESSAGE_SUPPLIER = new MessageSupplier() {
@Override
- public Object newMessage(TransactionIdentifier transactionId, short version) {
+ public Object newMessage(final TransactionIdentifier transactionId, final short version) {
return new CommitTransaction(transactionId, version).toSerializable();
}
@Override
- public boolean isSerializedReplyType(Object reply) {
+ public boolean isSerializedReplyType(final Object reply) {
return CommitTransactionReply.isSerializedType(reply);
}
};
private static final MessageSupplier ABORT_MESSAGE_SUPPLIER = new MessageSupplier() {
@Override
- public Object newMessage(TransactionIdentifier transactionId, short version) {
+ public Object newMessage(final TransactionIdentifier transactionId, final short version) {
return new AbortTransaction(transactionId, version).toSerializable();
}
@Override
- public boolean isSerializedReplyType(Object reply) {
+ public boolean isSerializedReplyType(final Object reply) {
return AbortTransactionReply.isSerializedType(reply);
}
};
private final TransactionIdentifier transactionId;
private volatile OperationCallback commitOperationCallback;
- public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<CohortInfo> cohorts,
- TransactionIdentifier transactionId) {
+ public ThreePhaseCommitCohortProxy(final ActorContext actorContext, final List<CohortInfo> cohorts,
+ final TransactionIdentifier transactionId) {
this.actorContext = actorContext;
this.cohorts = cohorts;
this.transactionId = Preconditions.checkNotNull(transactionId);
for (final CohortInfo info: cohorts) {
info.getActorFuture().onComplete(new OnComplete<ActorSelection>() {
@Override
- public void onComplete(Throwable failure, ActorSelection actor) {
+ public void onComplete(final Throwable failure, final ActorSelection actor) {
synchronized (lock) {
boolean done = completed.decrementAndGet() == 0;
if (failure != null) {
Futures.addCallback(resolveCohorts(), new FutureCallback<Void>() {
@Override
- public void onSuccess(Void notUsed) {
+ public void onSuccess(final Void notUsed) {
finishCanCommit(returnFuture);
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
returnFuture.setException(failure);
}
- });
+ }, MoreExecutors.directExecutor());
return returnFuture;
}
final OnComplete<Object> onComplete = new OnComplete<Object>() {
@Override
- public void onComplete(Throwable failure, Object response) {
+ public void onComplete(final Throwable failure, final Object response) {
if (failure != null) {
LOG.debug("Tx {}: a canCommit cohort Future failed", transactionId, failure);
sendCanCommitTransaction(iterator.next(), onComplete);
}
- private void sendCanCommitTransaction(CohortInfo toCohortInfo, OnComplete<Object> onComplete) {
+ private void sendCanCommitTransaction(final CohortInfo toCohortInfo, final OnComplete<Object> onComplete) {
CanCommitTransaction message = new CanCommitTransaction(transactionId, toCohortInfo.getActorVersion());
LOG.debug("Tx {}: sending {} to {}", transactionId, message, toCohortInfo.getResolvedActor());
future.onComplete(onComplete, actorContext.getClientDispatcher());
}
- private Future<Iterable<Object>> invokeCohorts(MessageSupplier messageSupplier) {
+ private Future<Iterable<Object>> invokeCohorts(final MessageSupplier messageSupplier) {
List<Future<Object>> futureList = Lists.newArrayListWithCapacity(cohorts.size());
for (CohortInfo cohort : cohorts) {
Object message = messageSupplier.newMessage(transactionId, cohort.getActorVersion());
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private static boolean successfulFuture(ListenableFuture<Void> future) {
+ private static boolean successfulFuture(final ListenableFuture<Void> future) {
if (!future.isDone()) {
return false;
}
} else {
Futures.addCallback(future, new FutureCallback<Void>() {
@Override
- public void onSuccess(Void notUsed) {
+ public void onSuccess(final Void notUsed) {
finishVoidOperation(operationName, messageSupplier, expectedResponseClass,
propagateException, returnFuture, callback);
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
LOG.debug("Tx {}: a {} cohort path Future failed: {}", transactionId, operationName, failure);
if (propagateException) {
returnFuture.set(null);
}
}
- });
+ }, MoreExecutors.directExecutor());
}
return returnFuture;
}
- private void finishVoidOperation(final String operationName, MessageSupplier messageSupplier,
+ private void finishVoidOperation(final String operationName, final MessageSupplier messageSupplier,
final Class<?> expectedResponseClass, final boolean propagateException,
final SettableFuture<Void> returnFuture, final OperationCallback callback) {
LOG.debug("Tx {} finish {}", transactionId, operationName);
combinedFuture.onComplete(new OnComplete<Iterable<Object>>() {
@Override
- public void onComplete(Throwable failure, Iterable<Object> responses) throws Throwable {
+ public void onComplete(final Throwable failure, final Iterable<Object> responses) throws Throwable {
Throwable exceptionToPropagate = failure;
if (exceptionToPropagate == null) {
for (Object response: responses) {
private volatile ActorSelection resolvedActor;
private final Supplier<Short> actorVersionSupplier;
- CohortInfo(Future<ActorSelection> actorFuture, Supplier<Short> actorVersionSupplier) {
+ CohortInfo(final Future<ActorSelection> actorFuture, final Supplier<Short> actorVersionSupplier) {
this.actorFuture = actorFuture;
this.actorVersionSupplier = actorVersionSupplier;
}
return resolvedActor;
}
- void setResolvedActor(ActorSelection resolvedActor) {
+ void setResolvedActor(final ActorSelection resolvedActor) {
this.resolvedActor = resolvedActor;
}
*/
package org.opendaylight.controller.cluster.datastore.persisted;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.DataInput;
} catch (IOException e) {
// This should never happen
LOG.error("Failed to serialize {}", transactionId, e);
- throw Throwables.propagate(e);
+ throw new RuntimeException("Failed to serialized " + transactionId, e);
}
return new AbortTransactionPayload(transactionId, out.toByteArray());
}
*/
package org.opendaylight.controller.cluster.datastore.persisted;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.DataInput;
} catch (IOException e) {
// This should never happen
LOG.error("Failed to serialize {}", historyId, e);
- throw Throwables.propagate(e);
+ throw new RuntimeException("Failed to serialize " + historyId, e);
}
return new CloseLocalHistoryPayload(historyId, out.toByteArray());
}
*/
package org.opendaylight.controller.cluster.datastore.persisted;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.DataInput;
} catch (IOException e) {
// This should never happen
LOG.error("Failed to serialize {}", historyId, e);
- throw Throwables.propagate(e);
+ throw new RuntimeException("Failed to serialize " + historyId, e);
}
return new CreateLocalHistoryPayload(historyId, out.toByteArray());
}
*/
package org.opendaylight.controller.cluster.datastore.persisted;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.DataInput;
} catch (IOException e) {
// This should never happen
LOG.error("Failed to serialize {}", historyId, e);
- throw Throwables.propagate(e);
+ throw new RuntimeException("Failed to serialize " + historyId, e);
}
return new PurgeLocalHistoryPayload(historyId, out.toByteArray());
}
*/
package org.opendaylight.controller.cluster.datastore.persisted;
-import com.google.common.base.Throwables;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import java.io.DataInput;
} catch (IOException e) {
// This should never happen
LOG.error("Failed to serialize {}", transactionId, e);
- throw Throwables.propagate(e);
+ throw new RuntimeException("Failed to serialize " + transactionId, e);
}
return new PurgeTransactionPayload(transactionId, out.toByteArray());
}
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import java.util.List;
import org.opendaylight.controller.cluster.access.concepts.MemberName;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
try {
return (List<String>) Await.result(
Patterns.ask(shardManager, GetLocalShardIds.INSTANCE, ASK_TIMEOUT_MILLIS), Duration.Inf());
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
return syncStatus;
}
- void setSyncStatus(boolean syncStatus) {
+ void setSyncStatus(final boolean syncStatus) {
this.syncStatus = syncStatus;
}
try {
Await.result(Patterns.ask(shardManager, new SwitchShardBehavior(shardId, state, term),
ASK_TIMEOUT_MILLIS), Duration.Inf());
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
break;
case Candidate:
}
@Override
- public void switchAllLocalShardsState(String newState, long term) {
+ public void switchAllLocalShardsState(final String newState, final long term) {
LOG.info("switchAllLocalShardsState called newState = {}, term = {}", newState, term);
requestSwitchShardState(null, newState, term);
}
@Override
- public void switchShardState(String shardId, String newState, long term) {
+ public void switchShardState(final String shardId, final String newState, final long term) {
final ShardIdentifier identifier = ShardIdentifier.fromShardIdString(shardId);
LOG.info("switchShardState called shardName = {}, newState = {}, term = {}", shardId, newState, term);
requestSwitchShardState(identifier, newState, term);
*/
package org.opendaylight.controller.cluster.datastore.utils;
-import com.google.common.base.Throwables;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
}
@SuppressWarnings("checkstyle:IllegalCatch")
- public static void toFile(File file, DataTreeModification modification) {
+ public static void toFile(final File file, final DataTreeModification modification) {
try (FileOutputStream outStream = new FileOutputStream(file)) {
modification.applyToCursor(new DataTreeModificationOutputCursor(new DataOutputStream(outStream)));
} catch (IOException | RuntimeException e) {
private static class DataTreeModificationOutputCursor extends AbstractDataTreeModificationCursor {
private final DataOutputStream output;
- DataTreeModificationOutputCursor(DataOutputStream output) {
+ DataTreeModificationOutputCursor(final DataOutputStream output) {
this.output = output;
}
@Override
- public void delete(PathArgument child) {
+ public void delete(final PathArgument child) {
try {
output.write("\nDELETE -> ".getBytes(StandardCharsets.UTF_8));
output.write(current().node(child).toString().getBytes(StandardCharsets.UTF_8));
output.writeByte('\n');
} catch (IOException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
@Override
- public void merge(PathArgument child, NormalizedNode<?, ?> data) {
+ public void merge(final PathArgument child, final NormalizedNode<?, ?> data) {
outputPathAndNode("MERGE", child, data);
}
@Override
- public void write(PathArgument child, NormalizedNode<?, ?> data) {
+ public void write(final PathArgument child, final NormalizedNode<?, ?> data) {
outputPathAndNode("WRITE", child, data);
}
- private void outputPathAndNode(String name, PathArgument child, NormalizedNode<?, ?> data) {
+ private void outputPathAndNode(final String name, final PathArgument child, final NormalizedNode<?, ?> data) {
try {
output.writeByte('\n');
output.write(name.getBytes(StandardCharsets.UTF_8));
NormalizedNodeXMLOutput.toStream(output, data);
output.writeByte('\n');
} catch (IOException | XMLStreamException e) {
- Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.AbstractMap.SimpleEntry;
distributedConfigDatastore.getActorContext().getClusterWrapper().getCurrentMemberName(), subtrees);
return new ProxyProducer(producer, subtrees, shardedDataTreeActor,
distributedConfigDatastore.getActorContext(), shards);
- } else if (response instanceof Exception) {
- closeProducer(producer);
- throw Throwables.propagate((Exception) response);
- } else {
- closeProducer(producer);
- throw new RuntimeException("Unexpected response to create producer received." + response);
}
+
+ closeProducer(producer);
+
+ if (response instanceof Throwable) {
+ Throwables.throwIfUnchecked((Throwable) response);
+ throw new RuntimeException((Throwable) response);
+ }
+ throw new RuntimeException("Unexpected response to create producer received." + response);
}
@Override
shardRegistrationPromise.failure(
new DOMDataTreeShardCreationFailedException("Unable to create a cds shard.", throwable));
}
- });
+ }, MoreExecutors.directExecutor());
return FutureConverters.toJava(shardRegistrationPromise.future());
}
public void onFailure(final Throwable throwable) {
LOG.error("Removal of shard {} from configuration failed.", prefix, throwable);
}
- });
+ }, MoreExecutors.directExecutor());
}
DOMDataTreePrefixTableEntry<DOMDataTreeShardRegistration<DOMDataTreeShard>> lookupShardFrontend(
assertEquals("Submit complete", true, doneLatch.await(5, TimeUnit.SECONDS));
if (caughtEx.get() != null) {
- Throwables.propagate(caughtEx.get());
+ Throwables.throwIfUnchecked(caughtEx.get());
+ throw new RuntimeException(caughtEx.get());
}
assertEquals("Task count", doAsync ? 1 : 0, futureExecutor.getTaskCount());
}
public static void writeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
- final NormalizedNode<?,?> node) throws Exception {
+ final NormalizedNode<?,?> node) throws DataValidationFailedException {
BatchedModifications batched = newBatchedModifications(nextTransactionId(), id, node, true, true, 1);
DataTreeModification modification = store.getDataTree().takeSnapshot().newModification();
batched.apply(modification);
}
public void mergeToStore(final ShardDataTree store, final YangInstanceIdentifier id,
- final NormalizedNode<?,?> node) throws Exception {
+ final NormalizedNode<?,?> node) throws DataValidationFailedException {
final BatchedModifications batched = new BatchedModifications(nextTransactionId(), CURRENT_VERSION);
batched.addModification(new MergeModification(id, node));
batched.setReady(true);
Map<String, ShardStrategy> strategyMap = ImmutableMap.<String, ShardStrategy>builder().put(
"junk", new ShardStrategy() {
@Override
- public String findShard(YangInstanceIdentifier path) {
+ public String findShard(final YangInstanceIdentifier path) {
return "junk";
}
@Override
- public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+ public YangInstanceIdentifier getPrefixForPath(final YangInstanceIdentifier path) {
return YangInstanceIdentifier.EMPTY;
}
}).put(
"cars", new ShardStrategy() {
@Override
- public String findShard(YangInstanceIdentifier path) {
+ public String findShard(final YangInstanceIdentifier path) {
return "cars";
}
@Override
- public YangInstanceIdentifier getPrefixForPath(YangInstanceIdentifier path) {
+ public YangInstanceIdentifier getPrefixForPath(final YangInstanceIdentifier path) {
return YangInstanceIdentifier.EMPTY;
}
}).build();
@Override
- public ShardStrategy getStrategyForModule(String moduleName) {
+ public ShardStrategy getStrategyForModule(final String moduleName) {
return strategyMap.get(moduleName);
}
@Override
- public String getModuleNameFromNameSpace(String nameSpace) {
+ public String getModuleNameFromNameSpace(final String nameSpace) {
if (TestModel.JUNK_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
return "junk";
} else if (CarsModel.BASE_QNAME.getNamespace().toASCIIString().equals(nameSpace)) {
final TransactionType type) {
ArgumentMatcher<CreateTransaction> matcher = new ArgumentMatcher<CreateTransaction>() {
@Override
- public boolean matches(Object argument) {
+ public boolean matches(final Object argument) {
if (CreateTransaction.class.equals(argument.getClass())) {
CreateTransaction obj = CreateTransaction.fromSerializable(argument);
return obj.getTransactionId().getHistoryId().getClientId().getFrontendId().getMemberName()
protected DataExists eqDataExists() {
ArgumentMatcher<DataExists> matcher = new ArgumentMatcher<DataExists>() {
@Override
- public boolean matches(Object argument) {
+ public boolean matches(final Object argument) {
return argument instanceof DataExists && ((DataExists)argument).getPath().equals(TestModel.TEST_PATH);
}
};
protected ReadData eqReadData(final YangInstanceIdentifier path) {
ArgumentMatcher<ReadData> matcher = new ArgumentMatcher<ReadData>() {
@Override
- public boolean matches(Object argument) {
+ public boolean matches(final Object argument) {
return argument instanceof ReadData && ((ReadData)argument).getPath().equals(path);
}
};
return argThat(matcher);
}
- protected Future<Object> readyTxReply(String path) {
+ protected Future<Object> readyTxReply(final String path) {
return Futures.successful((Object)new ReadyTransactionReply(path));
}
- protected Future<ReadDataReply> readDataReply(NormalizedNode<?, ?> data) {
+ protected Future<ReadDataReply> readDataReply(final NormalizedNode<?, ?> data) {
return Futures.successful(new ReadDataReply(data, DataStoreVersions.CURRENT_VERSION));
}
- protected Future<DataExistsReply> dataExistsReply(boolean exists) {
+ protected Future<DataExistsReply> dataExistsReply(final boolean exists) {
return Futures.successful(new DataExistsReply(exists, DataStoreVersions.CURRENT_VERSION));
}
- protected Future<BatchedModificationsReply> batchedModificationsReply(int count) {
+ protected Future<BatchedModificationsReply> batchedModificationsReply(final int count) {
return Futures.successful(new BatchedModificationsReply(count));
}
return mock(Future.class);
}
- protected ActorSelection actorSelection(ActorRef actorRef) {
+ protected ActorSelection actorSelection(final ActorRef actorRef) {
return getSystem().actorSelection(actorRef.path());
}
- protected void expectBatchedModifications(ActorRef actorRef, int count) {
+ protected void expectBatchedModifications(final ActorRef actorRef, final int count) {
doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
}
- protected void expectBatchedModifications(int count) {
+ protected void expectBatchedModifications(final int count) {
doReturn(batchedModificationsReply(count)).when(mockActorContext).executeOperationAsync(
any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
}
- protected void expectBatchedModificationsReady(ActorRef actorRef) {
+ protected void expectBatchedModificationsReady(final ActorRef actorRef) {
expectBatchedModificationsReady(actorRef, false);
}
- protected void expectBatchedModificationsReady(ActorRef actorRef, boolean doCommitOnReady) {
+ protected void expectBatchedModificationsReady(final ActorRef actorRef, final boolean doCommitOnReady) {
doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
any(ActorSelection.class), isA(BatchedModifications.class), any(Timeout.class));
}
- protected void expectFailedBatchedModifications(ActorRef actorRef) {
+ protected void expectFailedBatchedModifications(final ActorRef actorRef) {
doReturn(Futures.failed(new TestException())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(BatchedModifications.class), any(Timeout.class));
}
- protected void expectReadyLocalTransaction(ActorRef actorRef, boolean doCommitOnReady) {
+ protected void expectReadyLocalTransaction(final ActorRef actorRef, final boolean doCommitOnReady) {
doReturn(doCommitOnReady ? Futures.successful(new CommitTransactionReply().toSerializable()) :
readyTxReply(actorRef.path().toString())).when(mockActorContext).executeOperationAsync(
eq(actorSelection(actorRef)), isA(ReadyLocalTransaction.class), any(Timeout.class));
}
- protected CreateTransactionReply createTransactionReply(ActorRef actorRef, short transactionVersion) {
+ protected CreateTransactionReply createTransactionReply(final ActorRef actorRef, final short transactionVersion) {
return new CreateTransactionReply(actorRef.path().toString(), nextTransactionId(), transactionVersion);
}
- protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem) {
+ protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem) {
return setupActorContextWithoutInitialCreateTransaction(actorSystem, DefaultShardStrategy.DEFAULT_SHARD);
}
- protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName) {
+ protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
+ final String shardName) {
return setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
DataStoreVersions.CURRENT_VERSION);
}
- protected ActorRef setupActorContextWithoutInitialCreateTransaction(ActorSystem actorSystem, String shardName,
- short transactionVersion) {
+ protected ActorRef setupActorContextWithoutInitialCreateTransaction(final ActorSystem actorSystem,
+ final String shardName, final short transactionVersion) {
ActorRef actorRef = actorSystem.actorOf(Props.create(DoNothingActor.class));
log.info("Created mock shard actor {}", actorRef);
return actorRef;
}
- protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef) {
+ protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef) {
return primaryShardInfoReply(actorSystem, actorRef, DataStoreVersions.CURRENT_VERSION);
}
- protected Future<PrimaryShardInfo> primaryShardInfoReply(ActorSystem actorSystem, ActorRef actorRef,
- short transactionVersion) {
+ protected Future<PrimaryShardInfo> primaryShardInfoReply(final ActorSystem actorSystem, final ActorRef actorRef,
+ final short transactionVersion) {
return Futures.successful(new PrimaryShardInfo(actorSystem.actorSelection(actorRef.path()),
transactionVersion));
}
- protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
- TransactionType type, short transactionVersion, String shardName) {
+ protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
+ final TransactionType type, final short transactionVersion, final String shardName) {
ActorRef shardActorRef = setupActorContextWithoutInitialCreateTransaction(actorSystem, shardName,
transactionVersion);
memberName, shardActorRef);
}
- protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem,
- TransactionType type, short transactionVersion, String prefix, ActorRef shardActorRef) {
+ protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
+ final TransactionType type, final short transactionVersion, final String prefix,
+ final ActorRef shardActorRef) {
ActorRef txActorRef;
if (type == TransactionType.WRITE_ONLY
return txActorRef;
}
- protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type) {
+ protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
+ final TransactionType type) {
return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
DefaultShardStrategy.DEFAULT_SHARD);
}
- protected ActorRef setupActorContextWithInitialCreateTransaction(ActorSystem actorSystem, TransactionType type,
- String shardName) {
+ protected ActorRef setupActorContextWithInitialCreateTransaction(final ActorSystem actorSystem,
+ final TransactionType type,
+ final String shardName) {
return setupActorContextWithInitialCreateTransaction(actorSystem, type, DataStoreVersions.CURRENT_VERSION,
shardName);
}
- protected void propagateReadFailedExceptionCause(CheckedFuture<?, ReadFailedException> future) throws Exception {
+ protected void propagateReadFailedExceptionCause(final CheckedFuture<?, ReadFailedException> future)
+ throws Exception {
try {
future.checkedGet(5, TimeUnit.SECONDS);
fail("Expected ReadFailedException");
cause = e.getCause();
}
- Throwables.throwIfInstanceOf(cause, Exception.class);
- Throwables.propagate(cause);
+ Throwables.propagateIfPossible(cause, Exception.class);
+ throw new RuntimeException(cause);
}
}
- protected List<BatchedModifications> captureBatchedModifications(ActorRef actorRef) {
+ protected List<BatchedModifications> captureBatchedModifications(final ActorRef actorRef) {
ArgumentCaptor<BatchedModifications> batchedModificationsCaptor =
ArgumentCaptor.forClass(BatchedModifications.class);
verify(mockActorContext, Mockito.atLeastOnce()).executeOperationAsync(
return batchedModifications;
}
- protected <T> List<T> filterCaptured(ArgumentCaptor<T> captor, Class<T> type) {
+ protected <T> List<T> filterCaptured(final ArgumentCaptor<T> captor, final Class<T> type) {
List<T> captured = new ArrayList<>();
for (T c: captor.getAllValues()) {
if (type.isInstance(c)) {
return captured;
}
- protected void verifyOneBatchedModification(ActorRef actorRef, Modification expected, boolean expIsReady) {
+ protected void verifyOneBatchedModification(final ActorRef actorRef, final Modification expected,
+ final boolean expIsReady) {
List<BatchedModifications> batchedModifications = captureBatchedModifications(actorRef);
assertEquals("Captured BatchedModifications count", 1, batchedModifications.size());
verifyBatchedModifications(batchedModifications.get(0), expIsReady, expIsReady, expected);
}
- protected void verifyBatchedModifications(Object message, boolean expIsReady, Modification... expected) {
+ protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
+ final Modification... expected) {
verifyBatchedModifications(message, expIsReady, false, expected);
}
- protected void verifyBatchedModifications(Object message, boolean expIsReady, boolean expIsDoCommitOnReady,
- Modification... expected) {
+ protected void verifyBatchedModifications(final Object message, final boolean expIsReady,
+ final boolean expIsDoCommitOnReady, final Modification... expected) {
assertEquals("Message type", BatchedModifications.class, message.getClass());
BatchedModifications batchedModifications = (BatchedModifications)message;
assertEquals("BatchedModifications size", expected.length, batchedModifications.getModifications().size());
}
@SuppressWarnings("checkstyle:IllegalCatch")
- protected void verifyCohortFutures(AbstractThreePhaseCommitCohort<?> proxy,
- Object... expReplies) throws Exception {
+ protected void verifyCohortFutures(final AbstractThreePhaseCommitCohort<?> proxy,
+ final Object... expReplies) {
assertEquals("getReadyOperationFutures size", expReplies.length,
proxy.getCohortFutures().size());
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import akka.util.Timeout;
-import com.google.common.base.Throwables;
import java.util.AbstractMap.SimpleEntry;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.opendaylight.controller.cluster.datastore.utils.MockDataTreeChangeListener;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
}
@Test
- public void testChangeListenerWithNoInitialData() throws Exception {
+ public void testChangeListenerWithNoInitialData() {
MockDataTreeChangeListener listener = registerChangeListener(TEST_PATH, 0).getKey();
listener.expectNoMoreChanges("Unexpected initial change event");
}
@Test
- public void testInitialChangeListenerEventWithContainerPath() throws Exception {
+ public void testInitialChangeListenerEventWithContainerPath() throws DataValidationFailedException {
writeToStore(shard.getDataStore(), TEST_PATH, ImmutableNodes.containerNode(TEST_QNAME));
Entry<MockDataTreeChangeListener, ActorSelection> entry = registerChangeListener(TEST_PATH, 1);
}
@Test
- public void testInitialChangeListenerEventWithListPath() throws Exception {
+ public void testInitialChangeListenerEventWithListPath() throws DataValidationFailedException {
mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
MockDataTreeChangeListener listener = registerChangeListener(OUTER_LIST_PATH, 1).getKey();
}
@Test
- public void testInitialChangeListenerEventWithWildcardedListPath() throws Exception {
+ public void testInitialChangeListenerEventWithWildcardedListPath() throws DataValidationFailedException {
mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(1, 2));
MockDataTreeChangeListener listener =
}
@Test
- public void testInitialChangeListenerEventWithNestedWildcardedListsPath() throws Exception {
+ public void testInitialChangeListenerEventWithNestedWildcardedListsPath() throws DataValidationFailedException {
mergeToStore(shard.getDataStore(), TEST_PATH, testNodeWithOuter(outerNode(
outerNodeEntry(1, innerNode("one", "two")), outerNodeEntry(2, innerNode("three", "four")))));
MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener, TestModel.TEST_PATH));
+ RegisterDataTreeNotificationListenerReply reply;
try {
- RegisterDataTreeNotificationListenerReply reply = (RegisterDataTreeNotificationListenerReply)
- Await.result(Patterns.ask(shardActor, new RegisterDataTreeChangeListener(path, dclActor, false),
- new Timeout(5, TimeUnit.SECONDS)), Duration.create(5, TimeUnit.SECONDS));
- return new SimpleEntry<>(listener, getSystem().actorSelection(reply.getListenerRegistrationPath()));
-
+ reply = (RegisterDataTreeNotificationListenerReply)
+ Await.result(Patterns.ask(shardActor, new RegisterDataTreeChangeListener(path, dclActor, false),
+ new Timeout(5, TimeUnit.SECONDS)), Duration.create(5, TimeUnit.SECONDS));
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
- Throwables.propagate(e);
- return null;
+ throw new RuntimeException(e);
}
+ return new SimpleEntry<>(listener, getSystem().actorSelection(reply.getListenerRegistrationPath()));
}
private void createShard() {
txCohort.get().canCommit().get(5, TimeUnit.SECONDS);
fail("Expected NotInitializedException");
} catch (final Exception e) {
- Throwables.propagate(Throwables.getRootCause(e));
+ final Throwable root = Throwables.getRootCause(e);
+ Throwables.throwIfUnchecked(root);
+ throw new RuntimeException(root);
} finally {
blockRecoveryLatch.countDown();
}
txReadFuture.get().checkedGet(5, TimeUnit.SECONDS);
fail("Expected NotInitializedException");
} catch (final ReadFailedException e) {
- Throwables.propagate(Throwables.getRootCause(e));
+ final Throwable root = Throwables.getRootCause(e);
+ Throwables.throwIfUnchecked(root);
+ throw new RuntimeException(root);
} finally {
blockRecoveryLatch.countDown();
}
final boolean done = commitLatch.await(timeoutSec, TimeUnit.SECONDS);
- if (caughtEx.get() != null) {
- Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
- Throwables.propagate(caughtEx.get());
+ final Throwable t = caughtEx.get();
+ if (t != null) {
+ Throwables.propagateIfPossible(t, Exception.class);
+ throw new RuntimeException(t);
}
assertEquals("Commits complete", true, done);
final Failure failure = expectMsgClass(duration("5 seconds"), akka.actor.Status.Failure.class);
if (failure != null) {
- Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
- Throwables.propagate(failure.cause());
+ Throwables.propagateIfPossible(failure.cause(), Exception.class);
+ throw new RuntimeException(failure.cause());
}
}
};
watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
if (failure != null) {
- Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
- Throwables.propagate(failure.cause());
+ Throwables.propagateIfPossible(failure.cause(), Exception.class);
+ throw new RuntimeException(failure.cause());
}
}
};
watcher.expectMsgClass(duration("5 seconds"), Terminated.class);
if (failure != null) {
- Throwables.propagateIfInstanceOf(failure.cause(), Exception.class);
- Throwables.propagate(failure.cause());
+ Throwables.throwIfInstanceOf(failure.cause(), Exception.class);
+ Throwables.throwIfUnchecked(failure.cause());
+ throw new RuntimeException(failure.cause());
}
}
};
new MockClusterWrapper(), new MockConfiguration(), DatastoreContext.newBuilder().build(),
new PrimaryShardInfoFutureCache()) {
@Override
- public Timer getOperationTimer(String operationName) {
+ public Timer getOperationTimer(final String operationName) {
return commitTimer;
}
verifyCohortActors();
}
- private void propagateExecutionExceptionCause(ListenableFuture<?> future) throws Exception {
+ private void propagateExecutionExceptionCause(final ListenableFuture<?> future) throws Exception {
try {
future.get(5, TimeUnit.SECONDS);
fail("Expected ExecutionException");
} catch (ExecutionException e) {
verifyCohortActors();
- Throwables.propagateIfInstanceOf(e.getCause(), Exception.class);
- Throwables.propagate(e.getCause());
+ Throwables.propagateIfPossible(e.getCause(), Exception.class);
+ throw new RuntimeException(e.getCause());
}
}
- private CohortInfo newCohortInfo(CohortActor.Builder builder, final short version) {
+ private CohortInfo newCohortInfo(final CohortActor.Builder builder, final short version) {
TestActorRef<CohortActor> actor = actorFactory.createTestActor(builder.props()
.withDispatcher(Dispatchers.DefaultDispatcherId()), actorFactory.generateActorId("cohort"));
cohortActors.add(actor);
return new CohortInfo(Futures.successful(getSystem().actorSelection(actor.path())), () -> version);
}
- private CohortInfo newCohortInfo(CohortActor.Builder builder) {
+ private CohortInfo newCohortInfo(final CohortActor.Builder builder) {
return newCohortInfo(builder, CURRENT_VERSION);
}
- private static CohortInfo newCohortInfoWithFailedFuture(Exception failure) {
+ private static CohortInfo newCohortInfoWithFailedFuture(final Exception failure) {
return new CohortInfo(Futures.<ActorSelection>failed(failure), () -> CURRENT_VERSION);
}
}
@SuppressWarnings("checkstyle:IllegalCatch")
- private <T> T verifySuccessfulFuture(ListenableFuture<T> future) throws Exception {
+ private <T> T verifySuccessfulFuture(final ListenableFuture<T> future) throws Exception {
try {
return future.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
}
}
- private void verifyCanCommit(ListenableFuture<Boolean> future, boolean expected) throws Exception {
+ private void verifyCanCommit(final ListenableFuture<Boolean> future, final boolean expected) throws Exception {
Boolean actual = verifySuccessfulFuture(future);
assertEquals("canCommit", expected, actual);
}
private final AtomicInteger abortCount = new AtomicInteger();
private volatile AssertionError assertionError;
- private CohortActor(Builder builder) {
+ private CohortActor(final Builder builder) {
this.builder = builder;
}
@Override
- public void onReceive(Object message) {
+ public void onReceive(final Object message) {
if (CanCommitTransaction.isSerializedType(message)) {
canCommitCount.incrementAndGet();
onMessage("CanCommitTransaction", message, CanCommitTransaction.fromSerializable(message),
}
}
- private void onMessage(String name, Object rawMessage, AbstractThreePhaseCommitMessage actualMessage,
- Class<?> expType, Object reply) {
+ private void onMessage(final String name, final Object rawMessage,
+ final AbstractThreePhaseCommitMessage actualMessage, final Class<?> expType, final Object reply) {
try {
assertNotNull("Unexpected " + name, expType);
assertEquals(name + " type", expType, rawMessage.getClass());
private Object abortReply;
private final TransactionIdentifier transactionId;
- Builder(TransactionIdentifier transactionId) {
+ Builder(final TransactionIdentifier transactionId) {
this.transactionId = Preconditions.checkNotNull(transactionId);
}
- Builder expectCanCommit(Class<?> newExpCanCommitType, Object newCanCommitReply) {
+ Builder expectCanCommit(final Class<?> newExpCanCommitType, final Object newCanCommitReply) {
this.expCanCommitType = newExpCanCommitType;
this.canCommitReply = newCanCommitReply;
return this;
}
- Builder expectCanCommit(Object newCanCommitReply) {
+ Builder expectCanCommit(final Object newCanCommitReply) {
return expectCanCommit(CanCommitTransaction.class, newCanCommitReply);
}
- Builder expectCommit(Class<?> newExpCommitType, Object newCommitReply) {
+ Builder expectCommit(final Class<?> newExpCommitType, final Object newCommitReply) {
this.expCommitType = newExpCommitType;
this.commitReply = newCommitReply;
return this;
}
- Builder expectCommit(Object newCommitReply) {
+ Builder expectCommit(final Object newCommitReply) {
return expectCommit(CommitTransaction.class, newCommitReply);
}
- Builder expectAbort(Class<?> newExpAbortType, Object newAbortReply) {
+ Builder expectAbort(final Class<?> newExpAbortType, final Object newAbortReply) {
this.expAbortType = newExpAbortType;
this.abortReply = newAbortReply;
return this;
}
- Builder expectAbort(Object newAbortReply) {
+ Builder expectAbort(final Object newAbortReply) {
return expectAbort(AbortTransaction.class, newAbortReply);
}
Uninterruptibles.awaitUninterruptibly(readComplete, 5, TimeUnit.SECONDS);
- if (caughtEx.get() != null) {
- Throwables.propagateIfInstanceOf(caughtEx.get(), Exception.class);
- Throwables.propagate(caughtEx.get());
+ final Throwable t = caughtEx.get();
+ if (t != null) {
+ Throwables.propagateIfPossible(t, Exception.class);
+ throw new RuntimeException(t);
}
// This sends the batched modification.
package org.opendaylight.controller.md.cluster.datastore.model;
-import com.google.common.base.Throwables;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
try {
return YangParserTestUtils.parseYangSources(new File("src/main/yang/entity-owners.yang"));
} catch (IOException | ReactorException e) {
- throw Throwables.propagate(e);
+ throw new RuntimeException(e);
}
}
}