import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.common.Empty;
+import org.opendaylight.yangtools.yang.data.tree.api.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.model.api.EffectiveModelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.compat.java8.FutureConverters;
* <p/>
* It tracks current operation and list of cohorts which successfuly finished previous phase in
* case, if abort is necessary to invoke it only on cohort steps which are still active.
- *
*/
class CompositeDataTreeCohort {
private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
ABORTED
}
- static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<Object>() {
+ static final Recover<Object> EXCEPTION_TO_MESSAGE = new Recover<>() {
@Override
public Failure recover(final Throwable error) {
return new Failure(error);
private final DataTreeCohortActorRegistry registry;
private final TransactionIdentifier txId;
- private final SchemaContext schema;
+ private final EffectiveModelContext schema;
private final Executor callbackExecutor;
private final Timeout timeout;
- private @NonNull List<Success> successfulFromPrevious = Collections.emptyList();
+ private @NonNull List<Success> successfulFromPrevious = List.of();
private State state = State.IDLE;
CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
- final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
+ final EffectiveModelContext schema, final Executor callbackExecutor, final Timeout timeout) {
this.registry = requireNonNull(registry);
- this.txId = requireNonNull(transactionID);
+ txId = requireNonNull(transactionID);
this.schema = requireNonNull(schema);
this.callbackExecutor = requireNonNull(callbackExecutor);
this.timeout = requireNonNull(timeout);
throw new IllegalStateException("Unhandled state " + state);
}
- successfulFromPrevious = Collections.emptyList();
+ successfulFromPrevious = List.of();
state = State.IDLE;
}
- Optional<CompletionStage<Void>> canCommit(final DataTreeCandidate tip) {
+ Optional<CompletionStage<Empty>> canCommit(final DataTreeCandidate tip) {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: canCommit - candidate: {}", txId, tip);
} else {
final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
LOG.debug("{}: canCommit - messages: {}", txId, messages);
if (messages.isEmpty()) {
- successfulFromPrevious = Collections.emptyList();
+ successfulFromPrevious = List.of();
changeStateFrom(State.IDLE, State.CAN_COMMIT_SUCCESSFUL);
return Optional.empty();
}
return Optional.of(processResponses(futures, State.CAN_COMMIT_SENT, State.CAN_COMMIT_SUCCESSFUL));
}
- Optional<CompletionStage<Void>> preCommit() {
+ Optional<CompletionStage<Empty>> preCommit() {
LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
return Optional.of(processResponses(futures, State.PRE_COMMIT_SENT, State.PRE_COMMIT_SUCCESSFUL));
}
- Optional<CompletionStage<Void>> commit() {
+ Optional<CompletionStage<Empty>> commit() {
LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
return ret;
}
- private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+ private @NonNull CompletionStage<Empty> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
final State currentState, final State afterState) {
LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
- final CompletableFuture<Void> returnFuture = new CompletableFuture<>();
+ final CompletableFuture<Empty> returnFuture = new CompletableFuture<>();
Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
ExecutionContexts.global());
return returnFuture;
}
- // FB issues violation for passing null to CompletableFuture#complete but it is valid and necessary when the
- // generic type is Void.
- @SuppressFBWarnings(value = { "NP_NONNULL_PARAM_VIOLATION", "UPM_UNCALLED_PRIVATE_METHOD" },
- justification = "https://github.com/spotbugs/spotbugs/issues/811")
private void processResponses(final Throwable failure, final Iterable<Object> results,
- final State currentState, final State afterState, final CompletableFuture<Void> resultFuture) {
+ final State currentState, final State afterState, final CompletableFuture<Empty> resultFuture) {
if (failure != null) {
- successfulFromPrevious = Collections.emptyList();
+ successfulFromPrevious = List.of();
resultFuture.completeExceptionally(failure);
return;
}
firstEx.addSuppressed(it.next().cause());
}
- successfulFromPrevious = Collections.emptyList();
+ successfulFromPrevious = List.of();
resultFuture.completeExceptionally(firstEx);
} else {
successfulFromPrevious = successful;
changeStateFrom(currentState, afterState);
- resultFuture.complete(null);
+ resultFuture.complete(Empty.value());
}
}