import com.google.common.base.Preconditions;
import javax.annotation.concurrent.NotThreadSafe;
import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.yangtools.concepts.Identifiable;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot;
import org.slf4j.Logger;
parent.abortTransaction(this, callback);
}
+ /**
+ * This method is exposed for sake of {@link ShardTransaction}, which is an actor. We need to ensure that
+ * the parent is updated to reflect the transaction has been closed, but no journal actions may be invoked.
+ *
+ * <p>
+ * ShardTransaction is responsible for additionally sending a request to persist an {@link AbortTransactionPayload}
+ * via a message to the Shard actor.
+ */
+ final void abortFromTransactionActor() {
+ if (close()) {
+ parent.abortFromTransactionActor(this);
+ }
+ }
+
@Override
public final String toString() {
return MoreObjects.toStringHelper(this).add("id", id).add("closed", closed).add("snapshot", snapshot)
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
} else if (message instanceof DataTreeCohortActorRegistry.CohortRegistryCommand) {
store.processCohortRegistryCommand(getSender(),
(DataTreeCohortActorRegistry.CohortRegistryCommand) message);
+ } else if (message instanceof PersistAbortTransactionPayload) {
+ final TransactionIdentifier txId = ((PersistAbortTransactionPayload) message).getTransactionId();
+ persistPayload(txId, AbortTransactionPayload.create(txId), true);
} else {
super.handleNonRaftCommand(message);
}
replicatePayload(id, AbortTransactionPayload.create(id), callback);
}
+ @Override
+ void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
+ // No-op for free-standing transactions
+
+ }
+
@Override
ShardDataTreeCohort finishTransaction(final ReadWriteShardDataTreeTransaction transaction) {
final DataTreeModification snapshot = transaction.getSnapshot();
}
@Override
- void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+ void abortFromTransactionActor(final AbstractShardDataTreeTransaction<?> transaction) {
if (transaction instanceof ReadWriteShardDataTreeTransaction) {
Preconditions.checkState(openTransaction != null,
"Attempted to abort transaction %s while none is outstanding", transaction);
LOG.debug("Aborted open transaction {}", transaction);
openTransaction = null;
}
+ }
+ @Override
+ void abortTransaction(final AbstractShardDataTreeTransaction<?> transaction, final Runnable callback) {
+ abortFromTransactionActor(transaction);
dataTree.abortTransaction(transaction, callback);
}
abstract class ShardDataTreeTransactionParent {
+ abstract void abortFromTransactionActor(AbstractShardDataTreeTransaction<?> transaction);
+
abstract void abortTransaction(AbstractShardDataTreeTransaction<?> transaction, Runnable callback);
abstract ShardDataTreeCohort finishTransaction(ReadWriteShardDataTreeTransaction transaction);
abstract ShardDataTreeCohort createReadyCohort(TransactionIdentifier id, DataTreeModification mod);
-
}
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.DataExists;
import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
+import org.opendaylight.controller.cluster.datastore.messages.PersistAbortTransactionPayload;
import org.opendaylight.controller.cluster.datastore.messages.ReadData;
import org.opendaylight.controller.cluster.datastore.messages.ReadDataReply;
import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
}
private void closeTransaction(final boolean sendReply) {
- getDOMStoreTransaction().abort(null);
+ getDOMStoreTransaction().abortFromTransactionActor();
+ shardActor.tell(new PersistAbortTransactionPayload(transactionId), ActorRef.noSender());
if (sendReply && returnCloseTransactionReply()) {
getSender().tell(new CloseTransactionReply(), getSelf());
--- /dev/null
+/*
+ * Copyright (c) 2017 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+/**
+ * A request sent from {@link org.opendaylight.controller.cluster.datastore.ShardTransaction} to
+ * {@link org.opendaylight.controller.cluster.datastore.Shard} to persist an
+ * {@link org.opendaylight.controller.cluster.datastore.persisted.AbortTransactionPayload} after the transaction has
+ * been closed by the frontend and internal backend state has been updated.
+ *
+ * <p>
+ * Since the two are actors, we cannot perform a direct upcall, as that breaks actor containment and wreaks havoc into
+ * Akka itself. This class does not need to be serializable, as both actors are guaranteed to be co-located.
+ *
+ * @author Robert Varga
+ */
+public final class PersistAbortTransactionPayload {
+ private final TransactionIdentifier txId;
+
+ public PersistAbortTransactionPayload(final TransactionIdentifier txId) {
+ this.txId = Preconditions.checkNotNull(txId);
+ }
+
+ public TransactionIdentifier getTransactionId() {
+ return txId;
+ }
+}
new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().abort(null);
+ subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
DataStoreVersions.CURRENT_VERSION), 3000);
new ReadData(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().abort(null);
+ subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
future = akka.pattern.Patterns.ask(subject, new ReadData(YangInstanceIdentifier.EMPTY,
DataStoreVersions.CURRENT_VERSION), 3000);
new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);
Await.result(future, Duration.create(3, TimeUnit.SECONDS));
- subject.underlyingActor().getDOMStoreTransaction().abort(null);
+ subject.underlyingActor().getDOMStoreTransaction().abortFromTransactionActor();
future = akka.pattern.Patterns.ask(subject,
new DataExists(YangInstanceIdentifier.EMPTY, DataStoreVersions.CURRENT_VERSION), 3000);