Ignored exceptions, mergeable if statements and similar.
Change-Id: Iea262b7c410cfde16fd4f101d3c7c8195ff1ea1f
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
package ntfbenchmark.impl;
import org.opendaylight.mdsal.binding.api.NotificationPublishService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NtfbenchBlockingProducer extends AbstractNtfbenchProducer {
+ private static final Logger LOG = LoggerFactory.getLogger(NtfbenchBlockingProducer.class);
public NtfbenchBlockingProducer(final NotificationPublishService publishService, final int iterations,
final int payloadSize) {
int ntfOk = 0;
int ntfError = 0;
- for (int i = 0; i < this.iterations; i++) {
+ for (int i = 0; i < iterations; i++) {
try {
- this.publishService.putNotification(this.ntf);
+ publishService.putNotification(ntf);
ntfOk++;
} catch (final Exception e) {
ntfError++;
+ LOG.debug("Failed to push notification", e);
}
}
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Optional;
-import javax.xml.parsers.ParserConfigurationException;
import javax.xml.stream.XMLStreamException;
import org.opendaylight.mdsal.binding.dom.codec.api.BindingNormalizedNodeSerializer;
import org.opendaylight.mdsal.dom.api.DOMSchemaService;
return Resources.getResource(testClass, defaultAppConfigFileName);
}
- public T createDefaultInstance() throws ConfigXMLReaderException, ParserConfigurationException, XMLStreamException,
- IOException, SAXException, URISyntaxException {
+ public T createDefaultInstance() throws ConfigXMLReaderException, XMLStreamException, IOException, SAXException,
+ URISyntaxException {
return createDefaultInstance(dataSchema -> {
throw new IllegalArgumentException(
"Failed to read XML (not creating model from defaults as runtime would, for better clarity in tests)");
// Our own clock implementation so we do not have to rely on synchronized clocks. This basically functions as an
// increasing counter which is fine for our needs as we only ever have a single writer since t supervisor is
// running in a cluster-singleton
- private final LWWRegister.Clock<String> clock = (currentTimestamp, value) -> currentTimestamp + 1;
+ private static final LWWRegister.Clock<String> CLOCK = (currentTimestamp, value) -> currentTimestamp + 1;
private final Cluster cluster;
private final SelfUniqueAddress node;
new LWWRegister<>(node.uniqueAddress(), candidate, 0),
Replicator.writeLocal(),
askReplyTo,
- register -> register.withValue(node, candidate, clock)),
+ register -> register.withValue(node, candidate, CLOCK)),
OwnerChanged::new);
}
* </ul>
*/
public abstract class RaftActor extends AbstractUntypedPersistentActor {
-
- private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50L); // 50 millis
+ private static final long APPLY_STATE_DELAY_THRESHOLD_IN_NANOS = TimeUnit.MILLISECONDS.toNanos(50);
/**
* This context should NOT be passed directly to any other actor it is
return this;
}
- if (message instanceof RaftRPC rpc) {
- // If RPC request or response contains term T > currentTerm:
- // set currentTerm = T, convert to follower (§5.1)
- // This applies to all RPC messages and responses
- if (rpc.getTerm() > context.getTermInformation().getCurrentTerm() && shouldUpdateTerm(rpc)) {
- log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
- logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
-
- context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
-
- // This is a special case. Normally when stepping down as leader we don't process and reply to the
- // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
- // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
- // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
- // state and starting a new election and grabbing leadership back before the other candidate node can
- // start a new election due to lack of responses. This case would only occur if there isn't a majority
- // of other nodes available that can elect the requesting candidate. Since we're transferring
- // leadership, we should make every effort to get the requesting node elected.
- if (rpc instanceof RequestVote requestVote && context.getRaftActorLeadershipTransferCohort() != null) {
- log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
- requestVote(sender, requestVote);
- }
-
- return internalSwitchBehavior(RaftState.Follower);
+ // If RPC request or response contains term T > currentTerm:
+ // set currentTerm = T, convert to follower (§5.1)
+ // This applies to all RPC messages and responses
+ if (message instanceof RaftRPC rpc && rpc.getTerm() > context.getTermInformation().getCurrentTerm()
+ && shouldUpdateTerm(rpc)) {
+
+ log.info("{}: Term {} in \"{}\" message is greater than leader's term {} - switching to Follower",
+ logName(), rpc.getTerm(), rpc, context.getTermInformation().getCurrentTerm());
+
+ context.getTermInformation().updateAndPersist(rpc.getTerm(), null);
+
+ // This is a special case. Normally when stepping down as leader we don't process and reply to the
+ // RaftRPC as per raft. But if we're in the process of transferring leadership and we get a
+ // RequestVote, process the RequestVote before switching to Follower. This enables the requesting
+ // candidate node to be elected the leader faster and avoids us possibly timing out in the Follower
+ // state and starting a new election and grabbing leadership back before the other candidate node can
+ // start a new election due to lack of responses. This case would only occur if there isn't a majority
+ // of other nodes available that can elect the requesting candidate. Since we're transferring
+ // leadership, we should make every effort to get the requesting node elected.
+ if (rpc instanceof RequestVote requestVote && context.getRaftActorLeadershipTransferCohort() != null) {
+ log.debug("{}: Leadership transfer in progress - processing RequestVote", logName());
+ requestVote(sender, requestVote);
}
+
+ return internalSwitchBehavior(RaftState.Follower);
}
if (message instanceof SendHeartBeat) {
} catch (IOException e) {
log.warn("{}: Unable to send chunk: {}/{}. Reseting snapshot progress. Snapshot state: {}", logName(),
installSnapshotState.getChunkIndex(), installSnapshotState.getTotalChunks(),
- installSnapshotState);
+ installSnapshotState, e);
installSnapshotState.reset();
}
}
} catch (IOException e) {
log.debug("{}: Exception in InstallSnapshot of follower", logName(), e);
- sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(),
- -1, false), actor());
+ sender.tell(new InstallSnapshotReply(currentTerm(), context.getId(), -1, false), actor());
closeSnapshotTracker();
}
private int nextChunkHashCode = INITIAL_LAST_CHUNK_HASH_CODE;
private long snapshotSize;
private InputStream snapshotInputStream;
- private Stopwatch chunkTimer = Stopwatch.createUnstarted();
+ private final Stopwatch chunkTimer = Stopwatch.createUnstarted();
private byte[] currentChunk = null;
LeaderInstallSnapshotState(final int snapshotChunkSize, final String logName) {
try {
snapshotInputStream.close();
} catch (IOException e) {
- LOG.warn("{}: Error closing snapshot stream", logName);
+ LOG.warn("{}: Error closing snapshot stream", logName, e);
}
snapshotInputStream = null;
* @param lastChunkHashCode the optional hash code for the chunk
* @return true if this is the last chunk is received
* @throws InvalidChunkException if the chunk index is invalid or out of order
+ * @throws IOException if there is a problem writing to the stream
*/
boolean addChunk(final int chunkIndex, final byte[] chunk, final OptionalInt maybeLastChunkHashCode)
- throws InvalidChunkException, IOException {
+ throws IOException {
log.debug("addChunk: chunkIndex={}, lastChunkIndex={}, collectedChunks.size={}, lastChunkHashCode={}",
- chunkIndex, lastChunkIndex, count, this.lastChunkHashCode);
+ chunkIndex, lastChunkIndex, count, lastChunkHashCode);
if (sealed) {
throw new InvalidChunkException("Invalid chunk received with chunkIndex " + chunkIndex
throw new InvalidChunkException("Expected chunkIndex " + (lastChunkIndex + 1) + " got " + chunkIndex);
}
- if (maybeLastChunkHashCode.isPresent() && maybeLastChunkHashCode.getAsInt() != this.lastChunkHashCode) {
+ if (maybeLastChunkHashCode.isPresent() && maybeLastChunkHashCode.getAsInt() != lastChunkHashCode) {
throw new InvalidChunkException("The hash code of the recorded last chunk does not match "
- + "the senders hash code, expected " + this.lastChunkHashCode + " was "
+ + "the senders hash code, expected " + lastChunkHashCode + " was "
+ maybeLastChunkHashCode.getAsInt());
}
count += chunk.length;
sealed = chunkIndex == totalChunks;
lastChunkIndex = chunkIndex;
- this.lastChunkHashCode = Arrays.hashCode(chunk);
+ lastChunkHashCode = Arrays.hashCode(chunk);
return sealed;
}
}
}
- Object translateScalar(final DataSchemaContextNode<?> context, final Object value) throws IOException {
+ Object translateScalar(final DataSchemaContextNode<?> context, final Object value) {
// Default is pass-through
return value;
}
}
@Override
- Object translateScalar(final DataSchemaContextNode<?> context, final Object value) throws IOException {
+ Object translateScalar(final DataSchemaContextNode<?> context, final Object value) {
final DataSchemaNode schema = context.getDataSchemaNode();
return schema instanceof TypedDataSchemaNode ? adaptValue(((TypedDataSchemaNode) schema).getType(), value)
: value;