import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
return txId;
}
- protected CommitProtocolCommand(TransactionIdentifier txId) {
+ protected CommitProtocolCommand(final TransactionIdentifier txId) {
this.txId = Objects.requireNonNull(txId);
}
private final ActorRef cohort;
private final SchemaContext schema;
- CanCommit(TransactionIdentifier txId, Collection<DOMDataTreeCandidate> candidates, SchemaContext schema,
- ActorRef cohort) {
+ CanCommit(final TransactionIdentifier txId, final Collection<DOMDataTreeCandidate> candidates,
+ final SchemaContext schema, final ActorRef cohort) {
super(txId);
this.cohort = Objects.requireNonNull(cohort);
this.candidates = Objects.requireNonNull(candidates);
private final ActorRef cohortRef;
private final TransactionIdentifier txId;
- protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) {
+ protected CommitReply(final ActorRef cohortRef, final TransactionIdentifier txId) {
this.cohortRef = Objects.requireNonNull(cohortRef);
this.txId = Objects.requireNonNull(txId);
}
static final class Success extends CommitReply {
- Success(ActorRef cohortRef, TransactionIdentifier txId) {
+ Success(final ActorRef cohortRef, final TransactionIdentifier txId) {
super(cohortRef, txId);
}
}
static final class PreCommit extends CommitProtocolCommand<Success> {
- PreCommit(TransactionIdentifier txId) {
+ PreCommit(final TransactionIdentifier txId) {
super(txId);
}
}
static final class Abort extends CommitProtocolCommand<Success> {
- Abort(TransactionIdentifier txId) {
+ Abort(final TransactionIdentifier txId) {
super(txId);
}
}
static final class Commit extends CommitProtocolCommand<Success> {
- Commit(TransactionIdentifier txId) {
+ Commit(final TransactionIdentifier txId) {
super(txId);
}
}
private abstract class CohortBehaviour<M extends CommitProtocolCommand<?>, S extends ThreePhaseCommitStep> {
private final Class<M> handledMessageType;
- CohortBehaviour(Class<M> handledMessageType) {
+ CohortBehaviour(final Class<M> handledMessageType) {
this.handledMessageType = Objects.requireNonNull(handledMessageType);
}
- void handle(CommitProtocolCommand<?> command) {
+ void handle(final CommitProtocolCommand<?> command) {
if (handledMessageType.isInstance(command)) {
onMessage(command);
} else if (command instanceof Abort) {
}
}
- private void onMessage(CommitProtocolCommand<?> message) {
+ private void onMessage(final CommitProtocolCommand<?> message) {
final ActorRef sender = getSender();
TransactionIdentifier txId = message.getTxId();
ListenableFuture<S> future = process(handledMessageType.cast(message));
: DataTreeCohortActor.this::executeInSelf;
Futures.addCallback(future, new FutureCallback<S>() {
@Override
- public void onSuccess(S nextStep) {
+ public void onSuccess(final S nextStep) {
success(txId, sender, nextStep);
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
failed(txId, sender, failure);
}
}, callbackExecutor);
}
- private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) {
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private void failed(final TransactionIdentifier txId, final ActorRef sender, final Throwable failure) {
currentStateMap.remove(txId);
sender.tell(new Status.Failure(failure), getSelf());
}
- private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) {
+ @SuppressFBWarnings(value = "UPM_UNCALLED_PRIVATE_METHOD",
+ justification = "https://github.com/spotbugs/spotbugs/issues/811")
+ private void success(final TransactionIdentifier txId, final ActorRef sender, final S nextStep) {
currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep));
sender.tell(new Success(getSelf(), txId), getSelf());
}
- private void onAbort(TransactionIdentifier txId) {
+ private void onAbort(final TransactionIdentifier txId) {
currentStateMap.remove(txId);
final ActorRef sender = getSender();
Futures.addCallback(abort(), new FutureCallback<Object>() {
@Override
- public void onSuccess(Object noop) {
+ public void onSuccess(final Object noop) {
sender.tell(new Success(getSelf(), txId), getSelf());
}
@Override
- public void onFailure(Throwable failure) {
+ public void onFailure(final Throwable failure) {
LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure);
sender.tell(new Status.Failure(failure), getSelf());
}
}
@Override
- ListenableFuture<PostCanCommitStep> process(CanCommit message) {
+ ListenableFuture<PostCanCommitStep> process(final CanCommit message) {
return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates());
}
@Override
- CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostCanCommitStep nextStep) {
+ CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
return new PostCanCommit(txId, nextStep);
}
private final S step;
private final TransactionIdentifier txId;
- CohortStateWithStep(Class<M> handledMessageType, TransactionIdentifier txId, S step) {
+ CohortStateWithStep(final Class<M> handledMessageType, final TransactionIdentifier txId, final S step) {
super(handledMessageType);
this.txId = Objects.requireNonNull(txId);
this.step = Objects.requireNonNull(step);
private class PostCanCommit extends CohortStateWithStep<PreCommit, PostCanCommitStep, PostPreCommitStep> {
- PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) {
+ PostCanCommit(final TransactionIdentifier txId, final PostCanCommitStep nextStep) {
super(PreCommit.class, txId, nextStep);
}
@SuppressWarnings("unchecked")
@Override
- ListenableFuture<PostPreCommitStep> process(PreCommit message) {
+ ListenableFuture<PostPreCommitStep> process(final PreCommit message) {
return (ListenableFuture<PostPreCommitStep>) getStep().preCommit();
}
@Override
- CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, PostPreCommitStep nextStep) {
+ CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final PostPreCommitStep nextStep) {
return new PostPreCommit(txId, nextStep);
}
private class PostPreCommit extends CohortStateWithStep<Commit, PostPreCommitStep, NoopThreePhaseCommitStep> {
- PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) {
+ PostPreCommit(final TransactionIdentifier txId, final PostPreCommitStep step) {
super(Commit.class, txId, step);
}
@SuppressWarnings("unchecked")
@Override
- ListenableFuture<NoopThreePhaseCommitStep> process(Commit message) {
+ ListenableFuture<NoopThreePhaseCommitStep> process(final Commit message) {
return (ListenableFuture<NoopThreePhaseCommitStep>) getStep().commit();
}
@Override
- CohortBehaviour<?, ?> nextBehaviour(TransactionIdentifier txId, NoopThreePhaseCommitStep nextStep) {
+ CohortBehaviour<?, ?> nextBehaviour(final TransactionIdentifier txId, final NoopThreePhaseCommitStep nextStep) {
return null;
}
}