BUG-5280: add READY protocol 50/49250/5
authorRobert Varga <rovarga@cisco.com>
Mon, 12 Dec 2016 11:43:31 +0000 (12:43 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 13 Dec 2016 17:00:09 +0000 (18:00 +0100)
In order to make chained transactions work with remoting
we need a way for the frontend to propagate the ready state
to the backend. This patch adds a READY protocol, which acts
as a preparatory stage before the 'real' protocol kicks in.

With that the backend state is properly updated to reflect
state transitions on the frontend and we do not need to play
weird future-based delays in the frontend, which would be
exceedingly complex.

Change-Id: I51a0ca0c2b900e3c6522426e5897a4fca1b9da19
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PersistenceProtocol.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java

index 4e2a8cec2a26ae1a1485016b698446397cb55142..9312e4c7ac69105690de54a8b471b452bf703133 100644 (file)
@@ -30,8 +30,10 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
     private final List<TransactionModification> modifications = new ArrayList<>(1);
     private final TransactionIdentifier identifier;
     private final ActorRef replyTo;
+
     private PersistenceProtocol protocol;
-    private Long sequence;
+    private boolean haveSequence;
+    private long sequence;
 
     public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -53,7 +55,9 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
     }
 
     public void setSequence(final long sequence) {
+        Preconditions.checkState(!haveSequence, "Sequence has already been set");
         this.sequence = sequence;
+        haveSequence = true;
     }
 
     public void setAbort() {
@@ -68,19 +72,24 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
         protocol = coordinated ? PersistenceProtocol.THREE_PHASE : PersistenceProtocol.SIMPLE;
     }
 
+    public void setReady() {
+        checkNotFinished();
+        protocol = PersistenceProtocol.READY;
+    }
+
     public int size() {
         return modifications.size();
     }
 
     @Override
     public ModifyTransactionRequest build() {
-        Preconditions.checkState(sequence != null, "Request sequence has not been set");
+        Preconditions.checkState(haveSequence, "Request sequence has not been set");
 
         final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, replyTo, modifications,
             protocol);
         modifications.clear();
         protocol = null;
-        sequence = null;
+        haveSequence = false;
         return ret;
     }
 }
index c1377fc102cf58fdbce02ebf84241407a72c8c81..ac0329cd58aba6e30a9dd668b2db8cba2a4788df 100644 (file)
@@ -50,7 +50,16 @@ public enum PersistenceProtocol implements WritableObject {
         byte byteValue() {
             return 3;
         }
-
+    },
+    /**
+     * Transaction is ready. This is not a really a persistence protocol, but an indication that that frontend has
+     * completed modifications on the transaction and considers it ready, without deciding the actual commit protocol.
+     */
+    READY {
+        @Override
+        byte byteValue() {
+            return 4;
+        }
     };
 
     @Override
@@ -78,6 +87,8 @@ public enum PersistenceProtocol implements WritableObject {
                 return SIMPLE;
             case 3:
                 return THREE_PHASE;
+            case 4:
+                return READY;
             default:
                 throw new IllegalArgumentException("Unhandled byte value " + value);
         }
index 22be14036114b3e497b31570337b0cfd01b3b6b1..0ba660234a15cee42c84f1c00b61d2ad2ef244e6 100644 (file)
@@ -238,6 +238,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // Transition user-visible state first
         final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
+        internalSeal();
+    }
+
+    final void ensureSealed() {
+        if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
+            internalSeal();
+        }
+    }
+
+    private void internalSeal() {
         doSeal();
         parent.onTransactionSealed(this);
 
@@ -250,7 +260,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
             // Propagate state and seal the successor.
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
@@ -486,7 +496,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
index f75d443fc79f4acc9d2b1007ae941f6c1482b494..cad6b5f16d53bc46dd886fbc791e86a600c72e60 100644 (file)
@@ -130,7 +130,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
                 }
             });
 
-            successor.seal();
+            successor.ensureSealed();
 
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
index eb2362d88df5026916b3b6770cca0dba59131696..c6e2f8763513d63bfb98bb07b9b25ee5d5c5bc98 100644 (file)
@@ -119,7 +119,7 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
                 }
             });
 
-            successor.seal();
+            successor.ensureSealed();
 
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
index 8195f8dcca26650ddc13d301419cfd0d4d18625b..db24e3c73c867020ab46d210ce52d1b633cf2715 100644 (file)
@@ -155,13 +155,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
 
         final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
         if (maybeProtocol.isPresent()) {
-            seal();
             Verify.verify(callback != null, "Request {} has null callback", request);
+            ensureSealed();
 
             switch (maybeProtocol.get()) {
                 case ABORT:
                     sendAbort(callback);
                     break;
+                case READY:
+                    // No-op, as we have already issued a seal()
+                    break;
                 case SIMPLE:
                     sendRequest(commitRequest(false), callback);
                     break;
@@ -215,7 +218,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
                 }
             });
 
-            successor.seal();
+            successor.ensureSealed();
 
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
@@ -251,7 +254,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             request.getModification().applyToCursor(cursor);
         }
 
-        seal();
+        ensureSealed();
         sendRequest(commitRequest(request.isCoordinated()), callback);
     }
 }
index 846f5c37cfd506eb28f9e01139b0333851492281..d6aa3d3f3fc079db3ac4ed12e46c40173f1c8dc4 100644 (file)
@@ -62,13 +62,6 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             final LocalHistoryIdentifier identifier) {
             super(connection, identifier);
         }
-
-        @Override
-        final AbstractProxyTransaction doCreateTransactionProxy(
-                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
-                final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly);
-        }
     }
 
     private static final class Local extends AbstractLocal {
@@ -164,6 +157,12 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             super(connection, identifier);
         }
 
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+        }
+
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
             return createClient(connection, getIdentifier());
@@ -176,6 +175,12 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             super(connection, identifier);
         }
 
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+        }
+
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
             return createSingle(connection, getIdentifier());
index 1429ec5a7896734e6cf624c6663b1748e5209b78..e777156b3ed53ae6d56f4c4f14972e59aa67cee4 100644 (file)
@@ -63,16 +63,19 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
 
     private final ModifyTransactionRequestBuilder builder;
+    private final boolean sendReadyOnSeal;
     private final boolean snapshotOnly;
 
     private boolean builderBusy;
 
     private volatile Exception operationFailure;
 
+
     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
-            final boolean snapshotOnly) {
+            final boolean snapshotOnly, final boolean sendReadyOnSeal) {
         super(parent);
         this.snapshotOnly = snapshotOnly;
+        this.sendReadyOnSeal = sendReadyOnSeal;
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
     }
 
@@ -131,12 +134,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doAbort() {
-        ensureInitializedBuider();
+        ensureInitializedBuilder();
         builder.setAbort();
         flushBuilder();
     }
 
-    private void ensureInitializedBuider() {
+    private void ensureInitializedBuilder() {
         if (!builderBusy) {
             builder.setSequence(nextSequence());
             builderBusy = true;
@@ -186,7 +189,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     private void appendModification(final TransactionModification modification) {
         if (operationFailure == null) {
-            ensureInitializedBuider();
+            ensureInitializedBuilder();
 
             builder.addModification(modification);
             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
@@ -255,7 +258,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     ModifyTransactionRequest commitRequest(final boolean coordinated) {
-        ensureInitializedBuider();
+        ensureInitializedBuilder();
         builder.setCommit(coordinated);
 
         final ModifyTransactionRequest ret = builder.build();
@@ -265,7 +268,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doSeal() {
-        // No-op
+        if (sendReadyOnSeal) {
+            ensureInitializedBuilder();
+            builder.setReady();
+            flushBuilder();
+        }
     }
 
     @Override
@@ -291,7 +298,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                seal();
+                ensureSealed();
 
                 switch (maybeProto.get()) {
                     case ABORT:
index 79d555c85695d7b437bf4831036ca6a8b9070f9b..0dcaac233ab6c4ee6d6566b1ecaa74594d17e0d5 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.Collection;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
@@ -194,7 +195,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 readyCohort = null;
             }
         });
-
     }
 
     private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
@@ -270,16 +270,19 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
 
-        final DataTreeModification modification = openTransaction.getSnapshot();
-        for (TransactionModification m : request.getModifications()) {
-            if (m instanceof TransactionDelete) {
-                modification.delete(m.getPath());
-            } else if (m instanceof TransactionWrite) {
-                modification.write(m.getPath(), ((TransactionWrite) m).getData());
-            } else if (m instanceof TransactionMerge) {
-                modification.merge(m.getPath(), ((TransactionMerge) m).getData());
-            } else {
-                LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+        final Collection<TransactionModification> mods = request.getModifications();
+        if (!mods.isEmpty()) {
+            final DataTreeModification modification = openTransaction.getSnapshot();
+            for (TransactionModification m : mods) {
+                if (m instanceof TransactionDelete) {
+                    modification.delete(m.getPath());
+                } else if (m instanceof TransactionWrite) {
+                    modification.write(m.getPath(), ((TransactionWrite) m).getData());
+                } else if (m instanceof TransactionMerge) {
+                    modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+                } else {
+                    LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+                }
             }
         }
 
@@ -293,18 +296,29 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 openTransaction.abort();
                 openTransaction = null;
                 return replyModifySuccess(request.getSequence());
+            case READY:
+                ensureReady();
+                return replyModifySuccess(request.getSequence());
             case SIMPLE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
+                ensureReady();
                 directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
+                ensureReady();
                 coordinatedCommit(envelope, now);
                 return null;
             default:
                 throw new UnsupportedRequestException(request);
         }
     }
+
+    private void ensureReady() {
+        // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
+        // only once.
+        if (readyCohort == null) {
+            readyCohort = openTransaction.ready();
+            LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+            openTransaction = null;
+        }
+    }
 }