import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.google.common.collect.Lists;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.CanCommit;
import org.opendaylight.controller.cluster.datastore.DataTreeCohortActor.Success;
* <p/>
* It tracks current operation and list of cohorts which successfuly finished previous phase in
* case, if abort is necessary to invoke it only on cohort steps which are still active.
* <p/>
* It tracks current operation and list of cohorts which successfuly finished previous phase in
* case, if abort is necessary to invoke it only on cohort steps which are still active.
*/
class CompositeDataTreeCohort {
private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
*/
class CompositeDataTreeCohort {
private static final Logger LOG = LoggerFactory.getLogger(CompositeDataTreeCohort.class);
private State state = State.IDLE;
CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
this.registry = requireNonNull(registry);
private State state = State.IDLE;
CompositeDataTreeCohort(final DataTreeCohortActorRegistry registry, final TransactionIdentifier transactionID,
final SchemaContext schema, final Executor callbackExecutor, final Timeout timeout) {
this.registry = requireNonNull(registry);
this.schema = requireNonNull(schema);
this.callbackExecutor = requireNonNull(callbackExecutor);
this.timeout = requireNonNull(timeout);
this.schema = requireNonNull(schema);
this.callbackExecutor = requireNonNull(callbackExecutor);
this.timeout = requireNonNull(timeout);
if (LOG.isTraceEnabled()) {
LOG.trace("{}: canCommit - candidate: {}", txId, tip);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("{}: canCommit - candidate: {}", txId, tip);
} else {
final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
LOG.debug("{}: canCommit - messages: {}", txId, messages);
if (messages.isEmpty()) {
final List<CanCommit> messages = registry.createCanCommitMessages(txId, tip, schema);
LOG.debug("{}: canCommit - messages: {}", txId, messages);
if (messages.isEmpty()) {
LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
LOG.debug("{}: preCommit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
LOG.debug("{}: commit - successfulFromPrevious: {}", txId, successfulFromPrevious);
if (successfulFromPrevious.isEmpty()) {
changeStateFrom(State.PRE_COMMIT_SUCCESSFUL, State.COMMITED);
- private @NonNull CompletionStage<Void> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
+ private @NonNull CompletionStage<Empty> processResponses(final List<Entry<ActorRef, Future<Object>>> futures,
final State currentState, final State afterState) {
LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
final State currentState, final State afterState) {
LOG.debug("{}: processResponses - currentState: {}, afterState: {}", txId, currentState, afterState);
Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
ExecutionContexts.global());
Future<Iterable<Object>> aggregateFuture = Futures.sequence(Lists.transform(futures, Entry::getValue),
ExecutionContexts.global());
resultFuture.completeExceptionally(firstEx);
} else {
successfulFromPrevious = successful;
changeStateFrom(currentState, afterState);
resultFuture.completeExceptionally(firstEx);
} else {
successfulFromPrevious = successful;
changeStateFrom(currentState, afterState);