*/
package org.opendaylight.controller.cluster.datastore;
+import static java.util.Objects.requireNonNull;
+
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Status.Failure;
private final Set<MessageInfo> messagesToRetry = new LinkedHashSet<>();
private final Shard shard;
- ShardTransactionMessageRetrySupport(Shard shard) {
+ ShardTransactionMessageRetrySupport(final Shard shard) {
this.shard = shard;
}
- void addMessageToRetry(Object message, ActorRef replyTo, String failureMessage) {
+ void addMessageToRetry(final Object message, final ActorRef replyTo, final String failureMessage) {
LOG.debug("{}: Adding message {} to retry", shard.persistenceId(), message);
MessageInfo messageInfo = new MessageInfo(message, replyTo, failureMessage);
}
void retryMessages() {
- if(messagesToRetry.isEmpty()) {
+ if (messagesToRetry.isEmpty()) {
return;
}
MessageInfo[] copy = messagesToRetry.toArray(new MessageInfo[messagesToRetry.size()]);
messagesToRetry.clear();
- for(MessageInfo info: copy) {
+ for (MessageInfo info: copy) {
LOG.debug("{}: Retrying message {}", shard.persistenceId(), info.message);
info.retry(shard);
}
}
- void onTimerMessage(Object message) {
+ void onTimerMessage(final Object message) {
MessageInfo messageInfo = (MessageInfo)message;
LOG.debug("{}: Timer expired for message {}", shard.persistenceId(), messageInfo.message);
@Override
public void close() {
- for(MessageInfo info: messagesToRetry) {
+ for (MessageInfo info: messagesToRetry) {
info.timedOut(shard);
}
final String failureMessage;
Cancellable timer;
- MessageInfo(Object message, ActorRef replyTo, String failureMessage) {
+ MessageInfo(final Object message, final ActorRef replyTo, final String failureMessage) {
this.message = message;
this.replyTo = replyTo;
- this.failureMessage = failureMessage;
+ this.failureMessage = requireNonNull(failureMessage);
}
- void retry(Shard shard) {
+ void retry(final Shard shard) {
timer.cancel();
shard.getSelf().tell(message, replyTo);
}
- void timedOut(Shard shard) {
- replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())), shard.getSelf());
+ void timedOut(final Shard shard) {
+ replyTo.tell(new Failure(new NoShardLeaderException(failureMessage, shard.persistenceId())),
+ shard.getSelf());
}
}
}