BUG-5280: add READY protocol 50/49250/5
authorRobert Varga <rovarga@cisco.com>
Mon, 12 Dec 2016 11:43:31 +0000 (12:43 +0100)
committerRobert Varga <rovarga@cisco.com>
Tue, 13 Dec 2016 17:00:09 +0000 (18:00 +0100)
In order to make chained transactions work with remoting
we need a way for the frontend to propagate the ready state
to the backend. This patch adds a READY protocol, which acts
as a preparatory stage before the 'real' protocol kicks in.

With that the backend state is properly updated to reflect
state transitions on the frontend and we do not need to play
weird future-based delays in the frontend, which would be
exceedingly complex.

Change-Id: I51a0ca0c2b900e3c6522426e5897a4fca1b9da19
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/ModifyTransactionRequestBuilder.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/commands/PersistenceProtocol.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadOnlyProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalReadWriteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendReadWriteTransaction.java

index 4e2a8ce..9312e4c 100644 (file)
@@ -30,8 +30,10 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
     private final List<TransactionModification> modifications = new ArrayList<>(1);
     private final TransactionIdentifier identifier;
     private final ActorRef replyTo;
+
     private PersistenceProtocol protocol;
-    private Long sequence;
+    private boolean haveSequence;
+    private long sequence;
 
     public ModifyTransactionRequestBuilder(final TransactionIdentifier identifier, final ActorRef replyTo) {
         this.identifier = Preconditions.checkNotNull(identifier);
@@ -53,7 +55,9 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
     }
 
     public void setSequence(final long sequence) {
+        Preconditions.checkState(!haveSequence, "Sequence has already been set");
         this.sequence = sequence;
+        haveSequence = true;
     }
 
     public void setAbort() {
@@ -68,19 +72,24 @@ public final class ModifyTransactionRequestBuilder implements Builder<ModifyTran
         protocol = coordinated ? PersistenceProtocol.THREE_PHASE : PersistenceProtocol.SIMPLE;
     }
 
+    public void setReady() {
+        checkNotFinished();
+        protocol = PersistenceProtocol.READY;
+    }
+
     public int size() {
         return modifications.size();
     }
 
     @Override
     public ModifyTransactionRequest build() {
-        Preconditions.checkState(sequence != null, "Request sequence has not been set");
+        Preconditions.checkState(haveSequence, "Request sequence has not been set");
 
         final ModifyTransactionRequest ret = new ModifyTransactionRequest(identifier, sequence, replyTo, modifications,
             protocol);
         modifications.clear();
         protocol = null;
-        sequence = null;
+        haveSequence = false;
         return ret;
     }
 }
index c1377fc..ac0329c 100644 (file)
@@ -50,7 +50,16 @@ public enum PersistenceProtocol implements WritableObject {
         byte byteValue() {
             return 3;
         }
-
+    },
+    /**
+     * Transaction is ready. This is not a really a persistence protocol, but an indication that that frontend has
+     * completed modifications on the transaction and considers it ready, without deciding the actual commit protocol.
+     */
+    READY {
+        @Override
+        byte byteValue() {
+            return 4;
+        }
     };
 
     @Override
@@ -78,6 +87,8 @@ public enum PersistenceProtocol implements WritableObject {
                 return SIMPLE;
             case 3:
                 return THREE_PHASE;
+            case 4:
+                return READY;
             default:
                 throw new IllegalArgumentException("Unhandled byte value " + value);
         }
index 22be140..0ba6602 100644 (file)
@@ -238,6 +238,16 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         // Transition user-visible state first
         final boolean success = SEALED_UPDATER.compareAndSet(this, 0, 1);
         Preconditions.checkState(success, "Proxy %s was already sealed", getIdentifier());
+        internalSeal();
+    }
+
+    final void ensureSealed() {
+        if (SEALED_UPDATER.compareAndSet(this, 0, 1)) {
+            internalSeal();
+        }
+    }
+
+    private void internalSeal() {
         doSeal();
         parent.onTransactionSealed(this);
 
@@ -250,7 +260,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
             // still stuck here. The successor has not seen final part of our state, nor the fact it is sealed.
             // Propagate state and seal the successor.
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
@@ -486,7 +496,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         if (SEALED.equals(prevState)) {
             LOG.debug("Proxy {} reconnected while being sealed, propagating state to successor {}", this, successor);
             flushState(successor);
-            successor.seal();
+            successor.ensureSealed();
         }
     }
 
index f75d443..cad6b5f 100644 (file)
@@ -130,7 +130,7 @@ abstract class LocalProxyTransaction extends AbstractProxyTransaction {
                 }
             });
 
-            successor.seal();
+            successor.ensureSealed();
 
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
index eb2362d..c6e2f87 100644 (file)
@@ -119,7 +119,7 @@ final class LocalReadOnlyProxyTransaction extends LocalProxyTransaction {
                 }
             });
 
-            successor.seal();
+            successor.ensureSealed();
 
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
index 8195f8d..db24e3c 100644 (file)
@@ -155,13 +155,16 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
 
         final java.util.Optional<PersistenceProtocol> maybeProtocol = request.getPersistenceProtocol();
         if (maybeProtocol.isPresent()) {
-            seal();
             Verify.verify(callback != null, "Request {} has null callback", request);
+            ensureSealed();
 
             switch (maybeProtocol.get()) {
                 case ABORT:
                     sendAbort(callback);
                     break;
+                case READY:
+                    // No-op, as we have already issued a seal()
+                    break;
                 case SIMPLE:
                     sendRequest(commitRequest(false), callback);
                     break;
@@ -215,7 +218,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
                 }
             });
 
-            successor.seal();
+            successor.ensureSealed();
 
             final ModifyTransactionRequest successorReq = successor.commitRequest(req.isCoordinated());
             successor.sendRequest(successorReq, callback);
@@ -251,7 +254,7 @@ final class LocalReadWriteProxyTransaction extends LocalProxyTransaction {
             request.getModification().applyToCursor(cursor);
         }
 
-        seal();
+        ensureSealed();
         sendRequest(commitRequest(request.isCoordinated()), callback);
     }
 }
index 846f5c3..d6aa3d3 100644 (file)
@@ -62,13 +62,6 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             final LocalHistoryIdentifier identifier) {
             super(connection, identifier);
         }
-
-        @Override
-        final AbstractProxyTransaction doCreateTransactionProxy(
-                final AbstractClientConnection<ShardBackendInfo> connection, final TransactionIdentifier txId,
-                final boolean snapshotOnly) {
-            return new RemoteProxyTransaction(this, txId, snapshotOnly);
-        }
     }
 
     private static final class Local extends AbstractLocal {
@@ -164,6 +157,12 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             super(connection, identifier);
         }
 
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, true);
+        }
+
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
             return createClient(connection, getIdentifier());
@@ -176,6 +175,12 @@ abstract class ProxyHistory implements Identifiable<LocalHistoryIdentifier> {
             super(connection, identifier);
         }
 
+        @Override
+        AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection<ShardBackendInfo> connection,
+                final TransactionIdentifier txId, final boolean snapshotOnly) {
+            return new RemoteProxyTransaction(this, txId, snapshotOnly, false);
+        }
+
         @Override
         ProxyHistory createSuccessor(final AbstractClientConnection<ShardBackendInfo> connection) {
             return createSingle(connection, getIdentifier());
index 1429ec5..e777156 100644 (file)
@@ -63,16 +63,19 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     private static final int REQUEST_MAX_MODIFICATIONS = 1000;
 
     private final ModifyTransactionRequestBuilder builder;
+    private final boolean sendReadyOnSeal;
     private final boolean snapshotOnly;
 
     private boolean builderBusy;
 
     private volatile Exception operationFailure;
 
+
     RemoteProxyTransaction(final ProxyHistory parent, final TransactionIdentifier identifier,
-            final boolean snapshotOnly) {
+            final boolean snapshotOnly, final boolean sendReadyOnSeal) {
         super(parent);
         this.snapshotOnly = snapshotOnly;
+        this.sendReadyOnSeal = sendReadyOnSeal;
         builder = new ModifyTransactionRequestBuilder(identifier, localActor());
     }
 
@@ -131,12 +134,12 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doAbort() {
-        ensureInitializedBuider();
+        ensureInitializedBuilder();
         builder.setAbort();
         flushBuilder();
     }
 
-    private void ensureInitializedBuider() {
+    private void ensureInitializedBuilder() {
         if (!builderBusy) {
             builder.setSequence(nextSequence());
             builderBusy = true;
@@ -186,7 +189,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     private void appendModification(final TransactionModification modification) {
         if (operationFailure == null) {
-            ensureInitializedBuider();
+            ensureInitializedBuilder();
 
             builder.addModification(modification);
             if (builder.size() >= REQUEST_MAX_MODIFICATIONS) {
@@ -255,7 +258,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     ModifyTransactionRequest commitRequest(final boolean coordinated) {
-        ensureInitializedBuider();
+        ensureInitializedBuilder();
         builder.setCommit(coordinated);
 
         final ModifyTransactionRequest ret = builder.build();
@@ -265,7 +268,11 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doSeal() {
-        // No-op
+        if (sendReadyOnSeal) {
+            ensureInitializedBuilder();
+            builder.setReady();
+            flushBuilder();
+        }
     }
 
     @Override
@@ -291,7 +298,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
             final java.util.Optional<PersistenceProtocol> maybeProto = req.getPersistenceProtocol();
             if (maybeProto.isPresent()) {
-                seal();
+                ensureSealed();
 
                 switch (maybeProto.get()) {
                     case ABORT:
index 79d555c..0dcaac2 100644 (file)
@@ -11,6 +11,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.primitives.UnsignedLong;
 import com.google.common.util.concurrent.FutureCallback;
+import java.util.Collection;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 import org.opendaylight.controller.cluster.access.commands.CommitLocalTransactionRequest;
@@ -194,7 +195,6 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 readyCohort = null;
             }
         });
-
     }
 
     private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
@@ -270,16 +270,19 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
             final RequestEnvelope envelope, final long now) throws RequestException {
 
-        final DataTreeModification modification = openTransaction.getSnapshot();
-        for (TransactionModification m : request.getModifications()) {
-            if (m instanceof TransactionDelete) {
-                modification.delete(m.getPath());
-            } else if (m instanceof TransactionWrite) {
-                modification.write(m.getPath(), ((TransactionWrite) m).getData());
-            } else if (m instanceof TransactionMerge) {
-                modification.merge(m.getPath(), ((TransactionMerge) m).getData());
-            } else {
-                LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+        final Collection<TransactionModification> mods = request.getModifications();
+        if (!mods.isEmpty()) {
+            final DataTreeModification modification = openTransaction.getSnapshot();
+            for (TransactionModification m : mods) {
+                if (m instanceof TransactionDelete) {
+                    modification.delete(m.getPath());
+                } else if (m instanceof TransactionWrite) {
+                    modification.write(m.getPath(), ((TransactionWrite) m).getData());
+                } else if (m instanceof TransactionMerge) {
+                    modification.merge(m.getPath(), ((TransactionMerge) m).getData());
+                } else {
+                    LOG.warn("{}: ignoring unhandled modification {}", history().persistenceId(), m);
+                }
             }
         }
 
@@ -293,18 +296,29 @@ final class FrontendReadWriteTransaction extends FrontendTransaction {
                 openTransaction.abort();
                 openTransaction = null;
                 return replyModifySuccess(request.getSequence());
+            case READY:
+                ensureReady();
+                return replyModifySuccess(request.getSequence());
             case SIMPLE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
+                ensureReady();
                 directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
-                readyCohort = openTransaction.ready();
-                openTransaction = null;
+                ensureReady();
                 coordinatedCommit(envelope, now);
                 return null;
             default:
                 throw new UnsupportedRequestException(request);
         }
     }
+
+    private void ensureReady() {
+        // We may have a combination of READY + SIMPLE/THREE_PHASE , in which case we want to ready the transaction
+        // only once.
+        if (readyCohort == null) {
+            readyCohort = openTransaction.ready();
+            LOG.debug("{}: transitioned {} to ready", history().persistenceId(), openTransaction.getIdentifier());
+            openTransaction = null;
+        }
+    }
 }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.