/* * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, * and is available at http://www.eclipse.org/legal/epl-v10.html */ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.Executor; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; import org.opendaylight.mdsal.common.api.PostCanCommitStep; import org.opendaylight.mdsal.common.api.PostPreCommitStep; import org.opendaylight.mdsal.common.api.ThreePhaseCommitStep; import org.opendaylight.mdsal.dom.api.DOMDataTreeCandidate; import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.model.api.SchemaContext; /** * Proxy actor which acts as a facade to the user-provided commit cohort. Responsible for * decapsulating DataTreeChanged messages and dispatching their context to the user. */ final class DataTreeCohortActor extends AbstractUntypedActor { private final Idle idleState = new Idle(); private final DOMDataTreeCommitCohort cohort; private final YangInstanceIdentifier registeredPath; private final Map> currentStateMap = new HashMap<>(); private DataTreeCohortActor(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) { this.cohort = Objects.requireNonNull(cohort); this.registeredPath = Objects.requireNonNull(registeredPath); } @Override protected void handleReceive(final Object message) { if (!(message instanceof CommitProtocolCommand)) { unknownMessage(message); return; } CommitProtocolCommand command = (CommitProtocolCommand)message; CohortBehaviour currentState = currentStateMap.computeIfAbsent(command.getTxId(), key -> idleState); LOG.debug("handleReceive for cohort {} - currentState: {}, message: {}", cohort.getClass().getName(), currentState, message); currentState.handle(command); } /** * Abstract message base for messages handled by {@link DataTreeCohortActor}. * * @param Reply message type */ abstract static class CommitProtocolCommand { private final TransactionIdentifier txId; final TransactionIdentifier getTxId() { return txId; } protected CommitProtocolCommand(TransactionIdentifier txId) { this.txId = Objects.requireNonNull(txId); } @Override public String toString() { return getClass().getSimpleName() + " [txId=" + txId + "]"; } } static final class CanCommit extends CommitProtocolCommand { private final Collection candidates; private final ActorRef cohort; private final SchemaContext schema; CanCommit(TransactionIdentifier txId, Collection candidates, SchemaContext schema, ActorRef cohort) { super(txId); this.cohort = Objects.requireNonNull(cohort); this.candidates = Objects.requireNonNull(candidates); this.schema = Objects.requireNonNull(schema); } Collection getCandidates() { return candidates; } SchemaContext getSchema() { return schema; } ActorRef getCohort() { return cohort; } @Override public String toString() { return "CanCommit [txId=" + getTxId() + ", candidates=" + candidates + ", cohort=" + cohort + "]"; } } abstract static class CommitReply { private final ActorRef cohortRef; private final TransactionIdentifier txId; protected CommitReply(ActorRef cohortRef, TransactionIdentifier txId) { this.cohortRef = Objects.requireNonNull(cohortRef); this.txId = Objects.requireNonNull(txId); } ActorRef getCohort() { return cohortRef; } final TransactionIdentifier getTxId() { return txId; } @Override public String toString() { return getClass().getSimpleName() + " [txId=" + txId + ", cohortRef=" + cohortRef + "]"; } } static final class Success extends CommitReply { Success(ActorRef cohortRef, TransactionIdentifier txId) { super(cohortRef, txId); } } static final class PreCommit extends CommitProtocolCommand { PreCommit(TransactionIdentifier txId) { super(txId); } } static final class Abort extends CommitProtocolCommand { Abort(TransactionIdentifier txId) { super(txId); } } static final class Commit extends CommitProtocolCommand { Commit(TransactionIdentifier txId) { super(txId); } } private abstract class CohortBehaviour, S extends ThreePhaseCommitStep> { private final Class handledMessageType; CohortBehaviour(Class handledMessageType) { this.handledMessageType = Objects.requireNonNull(handledMessageType); } void handle(CommitProtocolCommand command) { if (handledMessageType.isInstance(command)) { onMessage(command); } else if (command instanceof Abort) { onAbort(((Abort)command).getTxId()); } else { getSender().tell(new Status.Failure(new IllegalArgumentException(String.format( "Unexpected message %s in cohort behavior %s", command.getClass(), getClass().getSimpleName()))), getSelf()); } } private void onMessage(CommitProtocolCommand message) { final ActorRef sender = getSender(); TransactionIdentifier txId = message.getTxId(); ListenableFuture future = process(handledMessageType.cast(message)); Executor callbackExecutor = future.isDone() ? MoreExecutors.directExecutor() : DataTreeCohortActor.this::executeInSelf; Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(S nextStep) { success(txId, sender, nextStep); } @Override public void onFailure(Throwable failure) { failed(txId, sender, failure); } }, callbackExecutor); } private void failed(TransactionIdentifier txId, ActorRef sender, Throwable failure) { currentStateMap.remove(txId); sender.tell(new Status.Failure(failure), getSelf()); } private void success(TransactionIdentifier txId, ActorRef sender, S nextStep) { currentStateMap.computeIfPresent(txId, (key, behaviour) -> nextBehaviour(txId, nextStep)); sender.tell(new Success(getSelf(), txId), getSelf()); } private void onAbort(TransactionIdentifier txId) { currentStateMap.remove(txId); final ActorRef sender = getSender(); Futures.addCallback(abort(), new FutureCallback() { @Override public void onSuccess(Object noop) { sender.tell(new Success(getSelf(), txId), getSelf()); } @Override public void onFailure(Throwable failure) { LOG.warn("Abort of transaction {} failed for cohort {}", txId, cohort, failure); sender.tell(new Status.Failure(failure), getSelf()); } }, MoreExecutors.directExecutor()); } @Nullable abstract CohortBehaviour nextBehaviour(TransactionIdentifier txId, S nextStep); @Nonnull abstract ListenableFuture process(M command); abstract ListenableFuture abort(); @Override public String toString() { return getClass().getSimpleName(); } } private class Idle extends CohortBehaviour { Idle() { super(CanCommit.class); } @Override ListenableFuture process(CanCommit message) { return cohort.canCommit(message.getTxId(), message.getSchema(), message.getCandidates()); } @Override CohortBehaviour nextBehaviour(TransactionIdentifier txId, PostCanCommitStep nextStep) { return new PostCanCommit(txId, nextStep); } @Override ListenableFuture abort() { return ThreePhaseCommitStep.NOOP_ABORT_FUTURE; } } private abstract class CohortStateWithStep, S extends ThreePhaseCommitStep, N extends ThreePhaseCommitStep> extends CohortBehaviour { private final S step; private final TransactionIdentifier txId; CohortStateWithStep(Class handledMessageType, TransactionIdentifier txId, S step) { super(handledMessageType); this.txId = Objects.requireNonNull(txId); this.step = Objects.requireNonNull(step); } final S getStep() { return step; } @Override ListenableFuture abort() { return getStep().abort(); } @Override public String toString() { return getClass().getSimpleName() + " [txId=" + txId + ", step=" + step + "]"; } } private class PostCanCommit extends CohortStateWithStep { PostCanCommit(TransactionIdentifier txId, PostCanCommitStep nextStep) { super(PreCommit.class, txId, nextStep); } @SuppressWarnings("unchecked") @Override ListenableFuture process(PreCommit message) { return (ListenableFuture) getStep().preCommit(); } @Override CohortBehaviour nextBehaviour(TransactionIdentifier txId, PostPreCommitStep nextStep) { return new PostPreCommit(txId, nextStep); } } private class PostPreCommit extends CohortStateWithStep { PostPreCommit(TransactionIdentifier txId, PostPreCommitStep step) { super(Commit.class, txId, step); } @SuppressWarnings("unchecked") @Override ListenableFuture process(Commit message) { return (ListenableFuture) getStep().commit(); } @Override CohortBehaviour nextBehaviour(TransactionIdentifier txId, NoopThreePhaseCommitStep nextStep) { return null; } } private interface NoopThreePhaseCommitStep extends ThreePhaseCommitStep { } static Props props(final DOMDataTreeCommitCohort cohort, final YangInstanceIdentifier registeredPath) { return Props.create(DataTreeCohortActor.class, cohort, registeredPath); } }