BUG-5280: fix problems identified by integration tests 06/48706/17
authorRobert Varga <rovarga@cisco.com>
Fri, 25 Nov 2016 15:16:41 +0000 (16:16 +0100)
committerRobert Varga <rovarga@cisco.com>
Wed, 30 Nov 2016 11:46:33 +0000 (12:46 +0100)
Switching the integration test suite has flushed out couple
of problems in the implementation, notably:

- wrong formatting placeholder
- unhandled requests during replay
- uninitialized path in AbstractReadTransactionRequestProxyV1
- missing sequence number bump in local commit case
- wrong writeObject() in ReadTransactionSuccessProxyV1
- IllegalStateException thrown instead of TransactionChainClosedException
- attempt to create history=0 on the backend
- mismatched sequences during preCommit message replay
- ConcurrentModificationException during localAbort()
- missing upcalls to LocalHistory concretizations when transactions abort
  and complete
- incorrect order on enqueue/send, leading to unpaired responses

Change-Id: I252a795dadb917452b9eb6d591a5c12ca5b69a45
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/AbstractReadTransactionRequestProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/CommitLocalTransactionRequest.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ReadTransactionSuccessProxyV1.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/TransactionIdentifier.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ClientActorBehavior.java
opendaylight/md-sal/cds-access-client/src/main/java/org/opendaylight/controller/cluster/access/client/ConnectedClientConnection.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTreeTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/test/resources/simplelogger.properties

index 4951110..48f5cc5 100644 (file)
@@ -33,9 +33,9 @@ public final class CommitLocalTransactionRequest
     private final DataTreeModification mod;
     private final boolean coordinated;
 
-    public CommitLocalTransactionRequest(@Nonnull final TransactionIdentifier identifier,
+    public CommitLocalTransactionRequest(@Nonnull final TransactionIdentifier identifier, final long sequence,
             @Nonnull final ActorRef replyTo, @Nonnull final DataTreeModification mod, final boolean coordinated) {
-        super(identifier, 0, replyTo);
+        super(identifier, sequence, replyTo);
         this.mod = Preconditions.checkNotNull(mod);
         this.coordinated = coordinated;
     }
index 405d1f9..5f96074 100644 (file)
@@ -112,7 +112,8 @@ public final class TransactionIdentifier implements WritableIdentifier {
             String histStr = historyId.getHistoryId() == 0 ? "" : "-chn-" + historyId.getHistoryId();
             shortString = historyId.getClientId().getFrontendId().getMemberName().getName() + "-"
                     + historyId.getClientId().getFrontendId().getClientType().getName() + "-fe-"
-                    + historyId.getClientId().getGeneration() + histStr + "-txn-" + transactionId;
+                    + historyId.getClientId().getGeneration() + histStr + "-txn-" + transactionId
+                    + "-" + historyId.getCookie();
         }
 
         return shortString;
index 97d312c..ddb7bcd 100644 (file)
@@ -202,6 +202,7 @@ public abstract class ClientActorBehavior<T extends BackendInfo> extends
             return;
         }
 
+        LOG.debug("{}: resolved shard {} to {}", persistenceId(), shard, backend);
         final long stamp = connectionsLock.writeLock();
         try {
             // Bring the connection up
index 6c1507c..eab1142 100644 (file)
@@ -25,23 +25,28 @@ public final class ConnectedClientConnection<T extends BackendInfo> extends Abst
         super(context, cookie, backend);
     }
 
-    private TransmittedConnectionEntry transmit(final ConnectionEntry entry) {
+    private void transmit(final ConnectionEntry entry) {
         final long txSequence = nextTxSequence++;
 
         final RequestEnvelope toSend = new RequestEnvelope(entry.getRequest().toVersion(remoteVersion()), sessionId(),
             txSequence);
 
+        // We need to enqueue the request before we send it to the actor, as we may be executing on a different thread
+        // than the client actor thread, in which case the round-trip could be made faster than we can enqueue --
+        // in which case the receive routine would not find the entry.
+        final TransmittedConnectionEntry txEntry = new TransmittedConnectionEntry(entry, sessionId(), txSequence,
+            readTime());
+        appendToInflight(txEntry);
+
         final ActorRef actor = remoteActor();
         LOG.trace("Transmitting request {} as {} to {}", entry.getRequest(), toSend, actor);
         actor.tell(toSend, ActorRef.noSender());
-
-        return new TransmittedConnectionEntry(entry, sessionId(), txSequence, readTime());
     }
 
     @Override
     void enqueueEntry(final ConnectionEntry entry) {
         if (inflightSize() < remoteMaxMessages()) {
-            appendToInflight(transmit(entry));
+            transmit(entry);
             LOG.debug("Enqueued request {} to queue {}", entry.getRequest(), this);
         } else {
             LOG.debug("Queue is at capacity, delayed sending of request {}", entry.getRequest());
@@ -60,7 +65,7 @@ public final class ConnectedClientConnection<T extends BackendInfo> extends Abst
             }
 
             LOG.debug("Transmitting entry {}", e);
-            appendToInflight(transmit(e));
+            transmit(e);
             toSend--;
         }
     }
index 951b540..8ab58e4 100644 (file)
@@ -22,6 +22,7 @@ import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryReq
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.mdsal.common.api.TransactionChainClosedException;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.slf4j.Logger;
@@ -113,12 +114,17 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
      */
     private ProxyHistory createHistoryProxy(final Long shard) {
         final AbstractClientConnection<ShardBackendInfo> connection = client.getConnection(shard);
-        final ProxyHistory ret = createHistoryProxy(new LocalHistoryIdentifier(identifier.getClientId(),
-            identifier.getHistoryId(), shard), connection);
+        final LocalHistoryIdentifier proxyId = new LocalHistoryIdentifier(identifier.getClientId(),
+            identifier.getHistoryId(), shard);
+        LOG.debug("Created proxyId {} for history {} shard {}", proxyId, identifier, shard);
 
-        // Request creation of the history.
-        connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
-            this::createHistoryCallback);
+        final ProxyHistory ret = createHistoryProxy(proxyId, connection);
+
+        // Request creation of the history, if it is not the single history
+        if (ret.getIdentifier().getHistoryId() != 0) {
+            connection.sendRequest(new CreateLocalHistoryRequest(ret.getIdentifier(), connection.localActor()),
+                this::createHistoryCallback);
+        }
         return ret;
     }
 
@@ -145,8 +151,16 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         }
     }
 
+    /**
+     * Allocate a {@link ClientTransaction}.
+     *
+     * @return A new {@link ClientTransaction}
+     * @throws TransactionChainClosedException if this history is closed
+     */
     public final ClientTransaction createTransaction() {
-        Preconditions.checkState(state != State.CLOSED);
+        if (state == State.CLOSED) {
+            throw new TransactionChainClosedException(String.format("Local history %s is closed", identifier));
+        }
 
         synchronized (this) {
             final ClientTransaction ret = doCreateTransaction();
index adfc0df..803908d 100644 (file)
@@ -14,7 +14,10 @@ import com.google.common.base.Verify;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.function.Consumer;
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
@@ -49,8 +52,21 @@ import org.slf4j.LoggerFactory;
  * @author Robert Varga
  */
 abstract class AbstractProxyTransaction implements Identifiable<TransactionIdentifier> {
+    private static final class IncrementSequence {
+        private long delta = 1;
+
+        long getDelta() {
+            return delta;
+        }
+
+        void incrementDelta() {
+            delta++;
+        }
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(AbstractProxyTransaction.class);
 
+    private final Deque<Object> successfulRequests = new ArrayDeque<>();
     private final ProxyHistory parent;
 
     private AbstractProxyTransaction successor;
@@ -65,8 +81,15 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return parent.localActor();
     }
 
+    private void incrementSequence(final long delta) {
+        sequence += delta;
+        LOG.debug("Transaction {} incremented sequence to {}", this, sequence);
+    }
+
     final long nextSequence() {
-        return sequence++;
+        final long ret = sequence++;
+        LOG.debug("Transaction {} allocated sequence {}", this, ret);
+        return ret;
     }
 
     final void delete(final YangInstanceIdentifier path) {
@@ -117,6 +140,19 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         Preconditions.checkState(sealed, "Transaction %s has not been sealed yet", getIdentifier());
     }
 
+    final void recordSuccessfulRequest(final @Nonnull TransactionRequest<?> req) {
+        successfulRequests.add(Verify.verifyNotNull(req));
+    }
+
+    final void recordFinishedRequest() {
+        final Object last = successfulRequests.peekLast();
+        if (last instanceof IncrementSequence) {
+            ((IncrementSequence) last).incrementDelta();
+        } else {
+            successfulRequests.addLast(new IncrementSequence());
+        }
+    }
+
     /**
      * Abort this transaction. This is invoked only for read-only transactions and will result in an explicit message
      * being sent to the backend.
@@ -139,6 +175,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
+            // This is a terminal request, hence we do not need to record it
+            LOG.debug("Transaction {} abort completed", this);
             parent.completeTransaction(this);
         });
     }
@@ -166,6 +204,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                 ret.setException(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
+            // This is a terminal request, hence we do not need to record it
+            LOG.debug("Transaction {} directCommit completed", this);
             parent.completeTransaction(this);
         });
         return ret;
@@ -175,7 +215,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void canCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        sendRequest(Verify.verifyNotNull(commitRequest(true)), t -> {
+        final TransactionRequest<?> req = Verify.verifyNotNull(commitRequest(true));
+        sendRequest(req, t -> {
             if (t instanceof TransactionCanCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -183,13 +224,18 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             } else {
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
+
+            recordSuccessfulRequest(req);
+            LOG.debug("Transaction {} canCommit completed", this);
         });
     }
 
     void preCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t -> {
+        final TransactionRequest<?> req = new TransactionPreCommitRequest(getIdentifier(), nextSequence(),
+            localActor());
+        sendRequest(req, t -> {
             if (t instanceof TransactionPreCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -197,6 +243,9 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             } else {
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
+
+            recordSuccessfulRequest(req);
+            LOG.debug("Transaction {} preCommit completed", this);
         });
     }
 
@@ -212,12 +261,25 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
                 ret.voteNo(new IllegalStateException("Unhandled response " + t.getClass()));
             }
 
+            LOG.debug("Transaction {} doCommit completed", this);
             parent.completeTransaction(this);
         });
     }
 
-    void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
+    final void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
         this.successor = Preconditions.checkNotNull(successor);
+
+        for (Object obj : successfulRequests) {
+            if (obj instanceof TransactionRequest) {
+                LOG.debug("Forwarding request {} to successor {}", obj, successor);
+                successor.handleForwardedRemoteRequest((TransactionRequest<?>) obj, null);
+            } else {
+                Verify.verify(obj instanceof IncrementSequence);
+                successor.incrementSequence(((IncrementSequence) obj).getDelta());
+            }
+        }
+        LOG.debug("{} replayed {} successful requests", getIdentifier(), successfulRequests.size());
+        successfulRequests.clear();
     }
 
     /**
index b6c2746..ac18728 100644 (file)
@@ -48,6 +48,16 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
         return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), nextTx()));
     }
 
+    @Override
+    void onTransactionAbort(final TransactionIdentifier txId) {
+        final State local = state();
+        if (local == State.TX_OPEN) {
+            updateState(local, State.IDLE);
+        }
+
+        super.onTransactionAbort(txId);
+    }
+
     @Override
     AbstractTransactionCommitCohort onTransactionReady(final TransactionIdentifier txId,
             final AbstractTransactionCommitCohort cohort) {
index 8450c67..abb1345 100644 (file)
@@ -152,20 +152,27 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
      * Release all state associated with this transaction.
      */
     public void abort() {
-        if (ensureClosed()) {
-            for (AbstractProxyTransaction proxy : proxies.values()) {
-                proxy.abort();
-            }
-            proxies.clear();
-
+        if (commonAbort()) {
             parent.onTransactionAbort(transactionId);
         }
     }
 
+    private boolean commonAbort() {
+        if (!ensureClosed()) {
+            return false;
+        }
+
+        for (AbstractProxyTransaction proxy : proxies.values()) {
+            proxy.abort();
+        }
+        proxies.clear();
+        return true;
+    }
+
     @Override
     void localAbort(final Throwable cause) {
-        LOG.debug("Aborting transaction {}", getIdentifier(), cause);
-        abort();
+        LOG.debug("Local abort of transaction {}", getIdentifier(), cause);
+        commonAbort();
     }
 
     Map<Long, AbstractProxyTransaction> getProxies() {
index 3869e66..61f83c2 100644 (file)
@@ -17,11 +17,18 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.AbortLocalTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ExistsTransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
+import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -118,8 +125,8 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     CommitLocalTransactionRequest commitRequest(final boolean coordinated) {
-        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
-            modification, coordinated);
+        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, nextSequence(),
+            localActor(), modification, coordinated);
         modification = new FailedDataTreeModification(this::submittedException);
         return ret;
     }
@@ -173,10 +180,24 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
     @Override
     void handleForwardedRemoteRequest(final TransactionRequest<?> request,
             final @Nullable Consumer<Response<?, ?>> callback) {
-        LOG.debug("Applying forwaded request {}", request);
+        LOG.debug("Applying forwarded request {}", request);
 
         if (request instanceof ModifyTransactionRequest) {
             applyModifyTransactionRequest((ModifyTransactionRequest) request, callback);
+        } else if (request instanceof ReadTransactionRequest) {
+            final YangInstanceIdentifier path = ((ReadTransactionRequest) request).getPath();
+            final Optional<NormalizedNode<?, ?>> result = modification.readNode(path);
+            callback.accept(new ReadTransactionSuccess(request.getTarget(), request.getSequence(), result));
+        } else if (request instanceof ExistsTransactionRequest) {
+            final YangInstanceIdentifier path = ((ExistsTransactionRequest) request).getPath();
+            final boolean result = modification.readNode(path).isPresent();
+            callback.accept(new ExistsTransactionSuccess(request.getTarget(), request.getSequence(), result));
+        } else if (request instanceof TransactionPreCommitRequest) {
+            sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionAbortRequest) {
+            sendAbort(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request " + request);
         }
index ae55379..5257c6c 100644 (file)
@@ -87,7 +87,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
                 final TransactionIdentifier txId) {
-            Preconditions.checkState(lastOpen == null, "Proxy {} is currently open", lastOpen);
+            Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen);
 
             // onTransactionCompleted() runs concurrently
             final LocalProxyTransaction localSealed = lastSealed;
@@ -100,6 +100,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
 
             lastOpen = new LocalProxyTransaction(this, txId,
                 (CursorAwareDataTreeModification) baseSnapshot.newModification());
+            LOG.debug("Proxy {} open transaction {}", this, lastOpen);
             return lastOpen;
         }
 
@@ -196,8 +197,9 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         @Override
         void replaySuccessfulRequests() {
             for (AbstractProxyTransaction t : proxies.values()) {
+                LOG.debug("{} creating successor transaction proxy for {}", identifier, t);
                 final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier());
-                LOG.debug("{} created successor transaction proxy {} for {}", identifier, newProxy, t);
+                LOG.debug("{} created successor transaction proxy {}", identifier, newProxy);
                 t.replaySuccessfulRequests(newProxy);
             }
         }
@@ -282,10 +284,13 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
     }
 
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
-        final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
-
         lock.lock();
         try {
+            if (successor != null) {
+                return successor.createTransactionProxy(txId);
+            }
+
+            final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId());
             final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId);
             proxies.put(proxyId, ret);
             LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId);
@@ -299,6 +304,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         lock.lock();
         try {
             proxies.remove(tx.getIdentifier());
+            LOG.debug("Proxy {} aborting transaction {}", this, tx);
+            onTransactionAborted(tx);
         } finally {
             lock.unlock();
         }
@@ -308,6 +315,8 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         lock.lock();
         try {
             proxies.remove(tx.getIdentifier());
+            LOG.debug("Proxy {} completing transaction {}", this, tx);
+            onTransactionCompleted(tx);
         } finally {
             lock.unlock();
         }
@@ -332,6 +341,7 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
         }
 
         successor = createSuccessor(newConnection);
+        LOG.debug("History {} instantiated successor {}", this, successor);
         return new ReconnectCohort();
     }
 
index 347c7ea..1754838 100644 (file)
@@ -13,8 +13,6 @@ import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.opendaylight.controller.cluster.access.commands.AbstractReadTransactionRequest;
@@ -25,9 +23,12 @@ import org.opendaylight.controller.cluster.access.commands.ModifyTransactionRequ
 import org.opendaylight.controller.cluster.access.commands.PersistenceProtocol;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.ReadTransactionSuccess;
+import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionDelete;
+import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionMerge;
 import org.opendaylight.controller.cluster.access.commands.TransactionModification;
+import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionWrite;
@@ -62,7 +63,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     // FIXME: make this tuneable
     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
 
-    private final Collection<TransactionRequest<?>> successfulRequests = new ArrayList<>();
     private final ModifyTransactionRequestBuilder builder;
 
     private boolean builderBusy;
@@ -195,7 +195,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
         if (response instanceof TransactionSuccess) {
             // Happy path
-            successfulRequests.add(request);
+            recordSuccessfulRequest(request);
         } else {
             recordFailedResponse(response);
         }
@@ -229,6 +229,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         } else {
             failFuture(future, response);
         }
+
+        recordFinishedRequest();
     }
 
     private void completeRead(final SettableFuture<Optional<NormalizedNode<?, ?>>> future,
@@ -240,6 +242,8 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         } else {
             failFuture(future, response);
         }
+
+        recordFinishedRequest();
     }
 
     @Override
@@ -257,17 +261,6 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
         // No-op
     }
 
-    @Override
-    void replaySuccessfulRequests(final AbstractProxyTransaction successor) {
-        super.replaySuccessfulRequests(successor);
-
-        for (TransactionRequest<?> req : successfulRequests) {
-            LOG.debug("Forwarding request {} to successor {}", req, successor);
-            successor.handleForwardedRemoteRequest(req, null);
-        }
-        successfulRequests.clear();
-    }
-
     @Override
     void forwardToRemote(final RemoteProxyTransaction successor, final TransactionRequest<?> request,
             final Consumer<Response<?, ?>> callback) throws RequestException {
@@ -299,6 +292,23 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
                         throw new IllegalArgumentException("Unhandled protocol " + maybeProto.get());
                 }
             }
+        } else if (request instanceof ReadTransactionRequest) {
+            ensureFlushedBuider();
+            sendRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+                ((ReadTransactionRequest) request).getPath()), callback);
+        } else if (request instanceof ExistsTransactionRequest) {
+            ensureFlushedBuider();
+            sendRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(),
+                ((ExistsTransactionRequest) request).getPath()), callback);
+        } else if (request instanceof TransactionPreCommitRequest) {
+            ensureFlushedBuider();
+            sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionDoCommitRequest) {
+            ensureFlushedBuider();
+            sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), callback);
+        } else if (request instanceof TransactionAbortRequest) {
+            ensureFlushedBuider();
+            sendAbort(callback);
         } else {
             throw new IllegalArgumentException("Unhandled request {}" + request);
         }
index 7ddad74..1adca56 100644 (file)
@@ -77,9 +77,11 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
 
             transactions.put(id, tx);
         } else {
-            final Optional<TransactionSuccess<?>> replay = tx.replaySequence(request.getSequence());
-            if (replay.isPresent()) {
-                return replay.get();
+            final Optional<TransactionSuccess<?>> maybeReplay = tx.replaySequence(request.getSequence());
+            if (maybeReplay.isPresent()) {
+                final TransactionSuccess<?> replay = maybeReplay.get();
+                LOG.debug("{}: envelope {} replaying response {}", persistenceId(), envelope, replay);
+                return replay;
             }
         }
 
index 9a8e89e..312e112 100644 (file)
@@ -42,10 +42,12 @@ final class ShardDataTreeTransactionChain extends ShardDataTreeTransactionParent
         Preconditions.checkState(openTransaction == null, "Transaction %s is open", openTransaction);
 
         if (previousTx == null) {
+            LOG.debug("Opening an unchained snapshot in {}", chainId);
             return dataTree.takeSnapshot();
-        } else {
-            return previousTx.getSnapshot();
         }
+
+        LOG.debug("Reusing a chained snapshot in {}", chainId);
+        return previousTx.getSnapshot();
     }
 
     ReadOnlyShardDataTreeTransaction newReadOnlyTransaction(final TransactionIdentifier txId) {
index db271ba..4cd5b22 100644 (file)
@@ -3,5 +3,8 @@ org.slf4j.simpleLogger.dateTimeFormat=hh:mm:ss,S a
 org.slf4j.simpleLogger.logFile=System.out
 org.slf4j.simpleLogger.showShortLogName=true
 org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.access=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker=debug
 org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore=debug
-org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off
\ No newline at end of file
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.databroker.actors.dds=debug
+org.slf4j.simpleLogger.log.org.opendaylight.controller.cluster.datastore.node.utils.stream=off

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.