import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
+import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
import org.opendaylight.controller.cluster.access.concepts.Response;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
-import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.mdsal.common.api.ReadFailedException;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class translating transaction operations towards a particular backend shard.
* @author Robert Varga
*/
abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
- private final DistributedDataStoreClientBehavior client;
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
+ private final ProxyHistory parent;
+
+ private AbstractProxyTransaction successor;
private long sequence;
private boolean sealed;
- AbstractProxyTransaction(final DistributedDataStoreClientBehavior client) {
- this.client = Preconditions.checkNotNull(client);
+ AbstractProxyTransaction(final ProxyHistory parent) {
+ this.parent = Preconditions.checkNotNull(parent);
}
final ActorRef localActor() {
- return client.self();
+ return parent.localActor();
}
final long nextSequence() {
return doRead(path);
}
- final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
- client.sendRequest(request, completer);
+ final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback) {
+ LOG.debug("Transaction proxy {} sending request {} callback {}", this, request, callback);
+ parent.sendRequest(request, callback);
}
/**
- * Seals this transaction when ready.
+ * Seal this transaction before it is either committed or aborted.
*/
final void seal() {
checkNotSealed();
doSeal();
sealed = true;
+ parent.onTransactionSealed(this);
}
private void checkNotSealed() {
final void abort() {
checkNotSealed();
doAbort();
+ parent.abortTransaction(this);
}
- void abort(final VotingFuture<Void> ret) {
+ final void abort(final VotingFuture<Void> ret) {
checkSealed();
- sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> {
+ sendAbort(t -> {
if (t instanceof TransactionAbortSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ parent.completeTransaction(this);
});
}
+ final void sendAbort(final Consumer<Response<?, ?>> callback) {
+ sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), callback);
+ }
+
/**
* Commit this transaction, possibly in a coordinated fashion.
*
checkSealed();
final SettableFuture<Boolean> ret = SettableFuture.create();
- sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
+ sendRequest(Verify.verifyNotNull(commitRequest(false)), t -> {
if (t instanceof TransactionCommitSuccess) {
ret.set(Boolean.TRUE);
} else if (t instanceof RequestFailure) {
} else {
ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ parent.completeTransaction(this);
});
return ret;
}
+
void canCommit(final VotingFuture<?> ret) {
checkSealed();
- sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
+ sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
if (t instanceof TransactionCanCommitSuccess) {
ret.voteYes();
} else if (t instanceof RequestFailure) {
} else {
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
+
+ parent.completeTransaction(this);
});
}
- abstract TransactionRequest<?> doCommit(boolean coordinated);
+ void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+ this.successor = Preconditions.checkNotNull(successor);
+ }
+
+ /**
+ * Invoked from a retired connection for requests which have been in-flight and need to be re-adjusted
+ * and forwarded to the successor connection.
+ *
+ * @param request Request to be forwarded
+ * @param callback Original callback
+ * @throws RequestException when the request is unhandled by the successor
+ */
+ final void replayRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> callback)
+ throws RequestException {
+ Preconditions.checkState(successor != null, "%s does not have a successor set", this);
+
+ if (successor instanceof LocalProxyTransaction) {
+ forwardToLocal((LocalProxyTransaction)successor, request, callback);
+ } else if (successor instanceof RemoteProxyTransaction) {
+ forwardToRemote((RemoteProxyTransaction)successor, request, callback);
+ } else {
+ throw new IllegalStateException("Unhandled successor " + successor);
+ }
+ }
abstract void doDelete(final YangInstanceIdentifier path);
abstract void doSeal();
abstract void doAbort();
+
+ abstract TransactionRequest<?> commitRequest(boolean coordinated);
+
+ /**
+ * Invoked from {@link RemoteProxyTransaction} when it replays its successful requests to its successor. There is
+ * no equivalent of this call from {@link LocalProxyTransaction} because it does not send a request until all
+ * operations are packaged in the message.
+ *
+ * <p>
+ * Note: this method is invoked by the predecessor on the successor.
+ *
+ * @param request Request which needs to be forwarded
+ * @param callback Callback to be invoked once the request completes
+ */
+ abstract void handleForwardedRemoteRequest(TransactionRequest<?> request,
+ @Nullable Consumer<Response<?, ?>> callback);
+
+ /**
+ * Replay a request originating in this proxy to a successor remote proxy.
+ */
+ abstract void forwardToRemote(RemoteProxyTransaction successor, TransactionRequest<?> request,
+ Consumer<Response<?, ?>> callback) throws RequestException;
+
+ /**
+ * Replay a request originating in this proxy to a successor local proxy.
+ */
+ abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
+ Consumer<Response<?, ?>> callback) throws RequestException;
}