BUG-8371: Respond to CreateLocalHistoryRequest after replication 50/57950/2
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 24 May 2017 09:36:18 +0000 (11:36 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Sun, 4 Jun 2017 07:45:59 +0000 (09:45 +0200)
CreateLocalHistoryRequest needs to be replicated to followers before
we respond to the frontend, as logically this request has to be
persisted before any subsequent transactions.

While the frontend could replay the request on reconnect, it would
also have to track the implied persistence (via child transactions),
which we do not want because it really is a backend detail and it
would lead to a lot of complexity in the frontend.

Change-Id: Icdfad59d3c2bab3d4125186c6a9b3c901d3934f6
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
(cherry picked from commit 8f18717f60e58eebf726fe0611859311fa83df44)

opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java

index 8704f2ab0cd9d33e02ab6b53a65d1aa5733fc60b..7e5addaefd5b8f8180ad4f39300af7b2f9b1e549 100644 (file)
@@ -107,7 +107,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request);
+                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof DestroyLocalHistoryRequest) {
                 return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
             } else if (request instanceof DestroyLocalHistoryRequest) {
                 return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
@@ -121,30 +121,37 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
     }
 
         }
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(id);
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        final LocalHistoryIdentifier historyId = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(historyId);
         if (existing != null) {
             // History already exists: report success
         if (existing != null) {
             // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
+            LOG.debug("{}: history {} already exists", persistenceId, historyId);
+            return new LocalHistorySuccess(historyId, request.getSequence());
         }
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
         }
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
-        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+        if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
             throw new DeadHistoryException(purgedHistories);
         }
 
         // Update last history we have seen
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
             throw new DeadHistoryException(purgedHistories);
         }
 
         // Update last history we have seen
-        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
-            lastSeenHistory = id.getHistoryId();
+        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+            lastSeenHistory = historyId.getHistoryId();
         }
 
         }
 
-        localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
-        LOG.debug("{}: created history {}", persistenceId, id);
-        return new LocalHistorySuccess(id, request.getSequence());
+        // We have to send the response only after persistence has completed
+        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+            LOG.debug("{}: persisted history {}", persistenceId, historyId);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.ticker().read() - now);
+        });
+
+        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+        LOG.debug("{}: created history {}", persistenceId, historyId);
+        return null;
     }
 
     private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
     }
 
     private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
index 20d18e156e142a80467a41fad459969ba6f83d79..d7c30121bf8a9323a362814ee7449870900acb84 100644 (file)
@@ -36,9 +36,8 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
     }
 
     static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
     }
 
     static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
-            final LocalHistoryIdentifier historyId) {
-        return new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(historyId), ImmutableMap.of(),
-            TreeRangeSet.create());
+            final ShardDataTreeTransactionChain chain) {
+        return new LocalFrontendHistory(persistenceId, tree, chain, ImmutableMap.of(), TreeRangeSet.create());
     }
 
     static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
     }
 
     static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
index 8f3a467d55de9cb739b6d24a7e5db6cfa34910cb..1aa180c51d62a1edaccbb928c3fe2ca70bc3a8e3 100644 (file)
@@ -521,12 +521,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
         return ret;
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+            @Nullable final Runnable callback) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+        } else if (callback != null) {
+            callback.run();
         }
 
         return chain;
         }
 
         return chain;
@@ -537,7 +540,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
             return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
@@ -546,7 +549,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     .newModification());
         }
 
                     .newModification());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
     }
 
     @VisibleForTesting
     }
 
     @VisibleForTesting
@@ -975,7 +978,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return createReadyCohort(txId, mod);
         }
 
             return createReadyCohort(txId, mod);
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")