From: Robert Varga Date: Fri, 25 Nov 2016 15:16:41 +0000 (+0100) Subject: BUG-5280: fix problems identified by integration tests X-Git-Tag: release/carbon~377 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=db9a673c114febc785fbd324947ac2c3e3095d06 BUG-5280: fix problems identified by integration tests Switching the integration test suite has flushed out couple of problems in the implementation, notably: - wrong formatting placeholder - unhandled requests during replay - uninitialized path in AbstractReadTransactionRequestProxyV1 - missing sequence number bump in local commit case - wrong writeObject() in ReadTransactionSuccessProxyV1 - IllegalStateException thrown instead of TransactionChainClosedException - attempt to create history=0 on the backend - mismatched sequences during preCommit message replay - ConcurrentModificationException during localAbort() - missing upcalls to LocalHistory concretizations when transactions abort and complete - incorrect order on enqueue/send, leading to unpaired responses Change-Id: I252a795dadb917452b9eb6d591a5c12ca5b69a45 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java index b73928574f..d3a60af635 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java @@ -35,6 +35,7 @@ abstract class AbstractReadTransactionRequestProxyV1 extends return; } + LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend); final long stamp = connectionsLock.writeLock(); try { // Bring the connection up diff --git a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java index 6c1507c50d..eab11429b8 100644 --- a/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java +++ b/opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java @@ -25,23 +25,28 @@ public final class ConnectedClientConnection extends Abst super(context, cookie, backend); } - private TransmittedConnectionEntry transmit(final ConnectionEntry entry) { + private void transmit(final ConnectionEntry entry) { final long txSequence = nextTxSequence++; final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(), txSequence); + // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread + // than the client actor thread, in which case the round-trip could be made faster than we can enqueue -- + // in which case the receive routine would not find the entry. + final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, sessionId(), txSequence, + readTime()); + appendToInflight(txEntry); + final ActorRef actor = remoteActor(); LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor); actor.tell(toSend, ActorRef.noSender()); - - return new TransmittedConnectionEntry(entry, sessionId(), txSequence, readTime()); } @Override void enqueueEntry(final ConnectionEntry entry) { if (inflightSize() < remoteMaxMessages()) { - appendToInflight(transmit(entry)); + transmit(entry); LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this); } else { LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest()); @@ -60,7 +65,7 @@ public final class ConnectedClientConnection extends Abst } LOG.debug("Transmitting entry {}", e); - appendToInflight(transmit(e)); + transmit(e); toSend--; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index 951b540f1d..8ab58e410a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.mdsal.common.api.TransactionChainClosedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.slf4j.Logger; @@ -113,12 +114,17 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia */ private ProxyHistory createHistoryProxy(final Long shard) { final AbstractClientConnection connection = client.getConnection(shard); - final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(), - identifier.getHistoryId(), shard), connection); + final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(), + identifier.getHistoryId(), shard); + LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard); - // Request creation of the history. - connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()), - this::createHistoryCallback); + final ProxyHistory ret = createHistoryProxy(proxyId, connection); + + // Request creation of the history, if it is not the single history + if (ret.getIdentifier().getHistoryId() != 0) { + connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()), + this::createHistoryCallback); + } return ret; } @@ -145,8 +151,16 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia } } + /** + * Allocate a {@link ClientTransaction}. + * + * @return A new {@link ClientTransaction} + * @throws TransactionChainClosedException if this history is closed + */ public final ClientTransaction createTransaction() { - Preconditions.checkState(state != State.CLOSED); + if (state == State.CLOSED) { + throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier)); + } synchronized (this) { final ClientTransaction ret = doCreateTransaction(); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index adfc0df876..803908d8c6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -14,7 +14,10 @@ import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.function.Consumer; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; @@ -49,8 +52,21 @@ import org.slf4j.LoggerFactory; * @author Robert Varga */ abstract class AbstractProxyTransaction implements Identifiable { + private static final class IncrementSequence { + private long delta = 1; + + long getDelta() { + return delta; + } + + void incrementDelta() { + delta++; + } + } + private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class); + private final Deque successfulRequests = new ArrayDeque<>(); private final ProxyHistory parent; private AbstractProxyTransaction successor; @@ -65,8 +81,15 @@ abstract class AbstractProxyTransaction implements Identifiable req) { + successfulRequests.add(Verify.verifyNotNull(req)); + } + + final void recordFinishedRequest() { + final Object last = successfulRequests.peekLast(); + if (last instanceof IncrementSequence) { + ((IncrementSequence) last).incrementDelta(); + } else { + successfulRequests.addLast(new IncrementSequence()); + } + } + /** * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message * being sent to the backend. @@ -139,6 +175,8 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> { + final TransactionRequest req = Verify.verifyNotNull(commitRequest(true)); + sendRequest(req, t -> { if (t instanceof TransactionCanCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -183,13 +224,18 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> { + final TransactionRequest req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(), + localActor()); + sendRequest(req, t -> { if (t instanceof TransactionPreCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -197,6 +243,9 @@ abstract class AbstractProxyTransaction implements Identifiable) obj, null); + } else { + Verify.verify(obj instanceof IncrementSequence); + successor.incrementSequence(((IncrementSequence) obj).getDelta()); + } + } + LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size()); + successfulRequests.clear(); } /** diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index b6c274628c..ac1872835a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -48,6 +48,16 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx())); } + @Override + void onTransactionAbort(final TransactionIdentifier txId) { + final State local = state(); + if (local == State.TX_OPEN) { + updateState(local, State.IDLE); + } + + super.onTransactionAbort(txId); + } + @Override AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId, final AbstractTransactionCommitCohort cohort) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 8450c67224..abb1345269 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -152,20 +152,27 @@ public final class ClientTransaction extends LocalAbortable implements Identifia * Release all state associated with this transaction. */ public void abort() { - if (ensureClosed()) { - for (AbstractProxyTransaction proxy : proxies.values()) { - proxy.abort(); - } - proxies.clear(); - + if (commonAbort()) { parent.onTransactionAbort(transactionId); } } + private boolean commonAbort() { + if (!ensureClosed()) { + return false; + } + + for (AbstractProxyTransaction proxy : proxies.values()) { + proxy.abort(); + } + proxies.clear(); + return true; + } + @Override void localAbort(final Throwable cause) { - LOG.debug("Aborting transaction {}", getIdentifier(), cause); - abort(); + LOG.debug("Local abort of transaction {}", getIdentifier(), cause); + commonAbort(); } Map getProxies() { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 3869e66d3e..61f83c2db1 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -17,11 +17,18 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest; import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess; import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest; import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; +import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; import org.opendaylight.controller.cluster.access.concepts.RequestException; @@ -118,8 +125,8 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { @Override CommitLocalTransactionRequest commitRequest(final boolean coordinated) { - final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(), - modification, coordinated); + final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(), + localActor(), modification, coordinated); modification = new FailedDataTreeModification(this::submittedException); return ret; } @@ -173,10 +180,24 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { @Override void handleForwardedRemoteRequest(final TransactionRequest request, final @Nullable Consumer> callback) { - LOG.debug("Applying forwaded request {}", request); + LOG.debug("Applying forwarded request {}", request); if (request instanceof ModifyTransactionRequest) { applyModifyTransactionRequest((ModifyTransactionRequest) request, callback); + } else if (request instanceof ReadTransactionRequest) { + final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath(); + final Optional> result = modification.readNode(path); + callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result)); + } else if (request instanceof ExistsTransactionRequest) { + final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath(); + final boolean result = modification.readNode(path).isPresent(); + callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result)); + } else if (request instanceof TransactionPreCommitRequest) { + sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionDoCommitRequest) { + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionAbortRequest) { + sendAbort(callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index ae55379155..5257c6c7f6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -87,7 +87,7 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, final TransactionIdentifier txId) { - Preconditions.checkState(lastOpen == null, "Proxy {} is currently open", lastOpen); + Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); // onTransactionCompleted() runs concurrently final LocalProxyTransaction localSealed = lastSealed; @@ -100,6 +100,7 @@ abstract class ProxyHistory implements Identifiable { lastOpen = new LocalProxyTransaction(this, txId, (CursorAwareDataTreeModification) baseSnapshot.newModification()); + LOG.debug("Proxy {} open transaction {}", this, lastOpen); return lastOpen; } @@ -196,8 +197,9 @@ abstract class ProxyHistory implements Identifiable { @Override void replaySuccessfulRequests() { for (AbstractProxyTransaction t : proxies.values()) { + LOG.debug("{} creating successor transaction proxy for {}", identifier, t); final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); - LOG.debug("{} created successor transaction proxy {} for {}", identifier, newProxy, t); + LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); t.replaySuccessfulRequests(newProxy); } } @@ -282,10 +284,13 @@ abstract class ProxyHistory implements Identifiable { } final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { - final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); - lock.lock(); try { + if (successor != null) { + return successor.createTransactionProxy(txId); + } + + final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId); proxies.put(proxyId, ret); LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); @@ -299,6 +304,8 @@ abstract class ProxyHistory implements Identifiable { lock.lock(); try { proxies.remove(tx.getIdentifier()); + LOG.debug("Proxy {} aborting transaction {}", this, tx); + onTransactionAborted(tx); } finally { lock.unlock(); } @@ -308,6 +315,8 @@ abstract class ProxyHistory implements Identifiable { lock.lock(); try { proxies.remove(tx.getIdentifier()); + LOG.debug("Proxy {} completing transaction {}", this, tx); + onTransactionCompleted(tx); } finally { lock.unlock(); } @@ -332,6 +341,7 @@ abstract class ProxyHistory implements Identifiable { } successor = createSuccessor(newConnection); + LOG.debug("History {} instantiated successor {}", this, successor); return new ReconnectCohort(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 347c7eac03..175483855f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -13,8 +13,6 @@ import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.util.ArrayList; -import java.util.Collection; import java.util.function.Consumer; import javax.annotation.Nullable; import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest; @@ -25,9 +23,12 @@ import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequ import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol; import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest; import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess; +import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionDelete; +import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionMerge; import org.opendaylight.controller.cluster.access.commands.TransactionModification; +import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.commands.TransactionSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionWrite; @@ -62,7 +63,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // FIXME: make this tuneable private static final int REQUEST_MAX_MODIFICATIONS = 1000; - private final Collection> successfulRequests = new ArrayList<>(); private final ModifyTransactionRequestBuilder builder; private boolean builderBusy; @@ -195,7 +195,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { if (response instanceof TransactionSuccess) { // Happy path - successfulRequests.add(request); + recordSuccessfulRequest(request); } else { recordFailedResponse(response); } @@ -229,6 +229,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(); } private void completeRead(final SettableFuture>> future, @@ -240,6 +242,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } else { failFuture(future, response); } + + recordFinishedRequest(); } @Override @@ -257,17 +261,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // No-op } - @Override - void replaySuccessfulRequests(final AbstractProxyTransaction successor) { - super.replaySuccessfulRequests(successor); - - for (TransactionRequest req : successfulRequests) { - LOG.debug("Forwarding request {} to successor {}", req, successor); - successor.handleForwardedRemoteRequest(req, null); - } - successfulRequests.clear(); - } - @Override void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest request, final Consumer> callback) throws RequestException { @@ -299,6 +292,23 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get()); } } + } else if (request instanceof ReadTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ReadTransactionRequest) request).getPath()), callback); + } else if (request instanceof ExistsTransactionRequest) { + ensureFlushedBuider(); + sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), + ((ExistsTransactionRequest) request).getPath()), callback); + } else if (request instanceof TransactionPreCommitRequest) { + ensureFlushedBuider(); + sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionDoCommitRequest) { + ensureFlushedBuider(); + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback); + } else if (request instanceof TransactionAbortRequest) { + ensureFlushedBuider(); + sendAbort(callback); } else { throw new IllegalArgumentException("Unhandled request {}" + request); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 7ddad749d2..1adca56af2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -77,9 +77,11 @@ abstract class AbstractFrontendHistory implements Identifiable> replay = tx.replaySequence(request.getSequence()); - if (replay.isPresent()) { - return replay.get(); + final Optional> maybeReplay = tx.replaySequence(request.getSequence()); + if (maybeReplay.isPresent()) { + final TransactionSuccess replay = maybeReplay.get(); + LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay); + return replay; } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java index 9a8e89eb42..312e11290c 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java @@ -42,10 +42,12 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent Preconditions.checkState(openTransaction == null, "Transaction %s is open", openTransaction); if (previousTx == null) { + LOG.debug("Opening an unchained snapshot in {}", chainId); return dataTree.takeSnapshot(); - } else { - return previousTx.getSnapshot(); } + + LOG.debug("Reusing a chained snapshot in {}", chainId); + return previousTx.getSnapshot(); } ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties index db271ba737..4cd5b22007 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties @@ -3,5 +3,8 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a org.slf4j.simpleLogger.logFile=System.out org.slf4j.simpleLogger.showShortLogName=true org.slf4j.simpleLogger.levelInBrackets=true +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access=debug +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker=debug org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug -org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off \ No newline at end of file +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug +org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off