*/
package org.opendaylight.controller.cluster.databroker.actors.dds;
+import akka.actor.ActorRef;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
+import java.util.function.Consumer;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
/**
* Class translating transaction operations towards a particular backend shard.
this.client = Preconditions.checkNotNull(client);
}
- /**
- * Instantiate a new tracker for a transaction. This method bases its decision on which implementation to use
- * based on provided {@link ShardBackendInfo}. If no information is present, it will choose the remote
- * implementation, which is fine, as the queueing logic in ClientActorBehavior will hold on to the requests until
- * the backend is located.
- *
- * @param client Client behavior
- * @param historyId Local history identifier
- * @param transactionId Transaction identifier
- * @param backend Optional backend identifier
- * @return A new state tracker
- */
- static AbstractProxyTransaction create(final DistributedDataStoreClientBehavior client,
- final LocalHistoryIdentifier historyId, final long transactionId,
- final java.util.Optional<ShardBackendInfo> backend) {
-
- final java.util.Optional<DataTree> dataTree = backend.flatMap(ShardBackendInfo::getDataTree);
- final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
- if (dataTree.isPresent()) {
- return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
- } else {
- return new RemoteProxyTransaction(client, identifier);
- }
- }
-
- final DistributedDataStoreClientBehavior client() {
- return client;
+ final ActorRef localActor() {
+ return client.self();
}
final long nextSequence() {
return doRead(path);
}
+ final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+ client.sendRequest(request, completer);
+ }
+
/**
* Seal this transaction before it is either
*/
checkSealed();
final SettableFuture<Boolean> ret = SettableFuture.create();
- client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
+ sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
if (t instanceof TransactionCommitSuccess) {
ret.set(Boolean.TRUE);
} else if (t instanceof RequestFailure) {
void abort(final VotingFuture<Void> ret) {
checkSealed();
- client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> {
+ sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> {
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void canCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
+ sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
if (t instanceof TransactionCanCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void preCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
+ sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
if (t instanceof TransactionPreCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
void doCommit(final VotingFuture<?> ret) {
checkSealed();
- client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
+ sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
if (t instanceof TransactionCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {