--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import com.google.common.annotations.Beta;
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A transaction request to perform the final transaction transition, which is purging it from the protocol view,
+ * meaning the frontend has no further knowledge of the transaction. The backend is free to purge any state related
+ * to the transaction and responds with a {@link TransactionPurgeResponse}.
+ *
+ * @author Robert Varga
+ */
+@Beta
+public final class TransactionPurgeRequest extends TransactionRequest<TransactionPurgeRequest> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionPurgeRequest(final TransactionIdentifier target, final long sequence, final ActorRef replyTo) {
+ super(target, sequence, replyTo);
+ }
+
+ @Override
+ protected TransactionPurgeRequestProxyV1 externalizableProxy(final ABIVersion version) {
+ return new TransactionPurgeRequestProxyV1(this);
+ }
+
+ @Override
+ protected TransactionPurgeRequest cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
\ No newline at end of file
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import akka.actor.ActorRef;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPurgeRequest}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPurgeRequestProxyV1 extends AbstractTransactionRequestProxy<TransactionPurgeRequest> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public TransactionPurgeRequestProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionPurgeRequestProxyV1(final TransactionPurgeRequest request) {
+ super(request);
+ }
+
+ @Override
+ protected TransactionPurgeRequest createRequest(final TransactionIdentifier target, final long sequence,
+ final ActorRef replyTo) {
+ return new TransactionPurgeRequest(target, sequence, replyTo);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.ABIVersion;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Successful reply to a {@link TransactionPurgeRequest}.
+ *
+ * @author Robert Varga
+ */
+public final class TransactionPurgeResponse extends TransactionSuccess<TransactionPurgeResponse> {
+ private static final long serialVersionUID = 1L;
+
+ public TransactionPurgeResponse(final TransactionIdentifier identifier, final long sequence) {
+ super(identifier, sequence);
+ }
+
+ @Override
+ protected AbstractTransactionSuccessProxy<TransactionPurgeResponse> externalizableProxy(
+ final ABIVersion version) {
+ return new TransactionPurgeResponseProxyV1(this);
+ }
+
+ @Override
+ protected TransactionPurgeResponse cloneAsVersion(final ABIVersion version) {
+ return this;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.access.commands;
+
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * Externalizable proxy for use with {@link TransactionPurgeResponse}. It implements the initial (Boron)
+ * serialization format.
+ *
+ * @author Robert Varga
+ */
+final class TransactionPurgeResponseProxyV1 extends AbstractTransactionSuccessProxy<TransactionPurgeResponse> {
+ private static final long serialVersionUID = 1L;
+
+ // checkstyle flags the public modifier as redundant however it is explicitly needed for Java serialization to
+ // be able to create instances via reflection.
+ @SuppressWarnings("checkstyle:RedundantModifier")
+ public TransactionPurgeResponseProxyV1() {
+ // For Externalizable
+ }
+
+ TransactionPurgeResponseProxyV1(final TransactionPurgeResponse success) {
+ super(success);
+ }
+
+ @Override
+ protected TransactionPurgeResponse createSuccess(final TransactionIdentifier target, final long sequence) {
+ return new TransactionPurgeResponse(target, sequence);
+ }
+}
package org.opendaylight.controller.cluster.databroker.actors.dds;
import akka.actor.ActorRef;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} abort completed", this);
- parent.completeTransaction(this);
+ purge();
});
}
// This is a terminal request, hence we do not need to record it
LOG.debug("Transaction {} directCommit completed", this);
- parent.completeTransaction(this);
+ purge();
});
return ret;
ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
}
- recordSuccessfulRequest(req);
- LOG.debug("Transaction {} preCommit completed", this);
+ onPreCommitComplete(req);
});
}
+ private void onPreCommitComplete(final TransactionRequest<?> req) {
+ /*
+ * The backend has agreed that the transaction has entered PRE_COMMIT phase, meaning it will be committed
+ * to storage after the timeout completes.
+ *
+ * All state has been replicated to the backend, hence we do not need to keep it around. Retain only
+ * the precommit request, so we know which request to use for resync.
+ */
+ LOG.debug("Transaction {} preCommit completed, clearing successfulRequests", this);
+ successfulRequests.clear();
+
+ // TODO: this works, but can contain some useless state (like batched operations). Create an empty
+ // equivalent of this request and store that.
+ recordSuccessfulRequest(req);
+ }
+
final void doCommit(final VotingFuture<?> ret) {
checkReadWrite();
checkSealed();
}
LOG.debug("Transaction {} doCommit completed", this);
+ purge();
+ });
+ }
+
+ private void purge() {
+ successfulRequests.clear();
+
+ final TransactionRequest<?> req = new TransactionPurgeRequest(getIdentifier(), nextSequence(), localActor());
+ sendRequest(req, t -> {
+ LOG.debug("Transaction {} purge completed", this);
parent.completeTransaction(this);
});
}
*/
abstract void forwardToLocal(LocalProxyTransaction successor, TransactionRequest<?> request,
Consumer<Response<?, ?>> callback);
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).add("identifier", getIdentifier()).add("state", state).toString();
+ }
}
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Ticker;
import java.util.HashMap;
import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
import org.opendaylight.controller.cluster.access.commands.OutOfOrderRequestException;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
final RequestEnvelope envelope, final long now) throws RequestException {
-
- // FIXME: handle purging of transactions
-
final TransactionIdentifier id = request.getTarget();
- FrontendTransaction tx = transactions.get(id);
- if (tx == null) {
- // The transaction does not exist and we are about to create it, check sequence number
- if (request.getSequence() != 0) {
- LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
- throw UNSEQUENCED_START;
- }
- tx = createTransaction(request, id);
- transactions.put(id, tx);
+ FrontendTransaction tx;
+ if (request instanceof TransactionPurgeRequest) {
+ tx = transactions.remove(id);
+ if (tx == null) {
+ // We have no record of the transaction, nothing to do
+ LOG.debug("{}: no state for transaction {}, purge is complete", persistenceId(), id);
+ return new TransactionPurgeResponse(id, request.getSequence());
+ }
} else {
- final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
- if (maybeReplay.isPresent()) {
- final TransactionSuccess<?> replay = maybeReplay.get();
- LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
- return replay;
+ tx = transactions.get(id);
+ if (tx == null) {
+ // The transaction does not exist and we are about to create it, check sequence number
+ if (request.getSequence() != 0) {
+ LOG.debug("{}: no transaction state present, unexpected request {}", persistenceId(), request);
+ throw UNSEQUENCED_START;
+ }
+
+ tx = createTransaction(request, id);
+ transactions.put(id, tx);
+ } else {
+ final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
+ if (maybeReplay.isPresent()) {
+ final TransactionSuccess<?> replay = maybeReplay.get();
+ LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
+ return replay;
+ }
}
}
throws RequestException;
abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
+ .add("persistenceId", persistenceId).add("transactions", transactions).toString();
+ }
}
import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
return handleReadTransaction((ReadTransactionRequest) request);
} else if (request instanceof TransactionAbortRequest) {
return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ } else if (request instanceof TransactionPurgeRequest) {
+ // No-op for now
+ return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
} else {
throw new UnsupportedRequestException(request);
}
import org.opendaylight.controller.cluster.access.commands.TransactionModification;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeRequest;
+import org.opendaylight.controller.cluster.access.commands.TransactionPurgeResponse;
import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
return null;
} else if (request instanceof TransactionAbortRequest) {
return handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
+ } else if (request instanceof TransactionPurgeRequest) {
+ // No-op for now
+ return new TransactionPurgeResponse(request.getTarget(), request.getSequence());
} else {
throw new UnsupportedRequestException(request);
}
*/
package org.opendaylight.controller.cluster.datastore;
+import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import java.util.ArrayDeque;
recordResponse(envelope.getMessage().getSequence(), failure);
envelope.sendFailure(failure, executionTime(startTime));
}
+
+ @Override
+ public final String toString() {
+ return MoreObjects.toStringHelper(this).omitNullValues().add("identifier", getIdentifier())
+ .add("expectedSequence", expectedSequence).add("firstReplaySequence", firstReplaySequence)
+ .add("lastPurgedSequence", lastPurgedSequence)
+ .toString();
+ }
}