BUG-8371: Respond to CreateLocalHistoryRequest after replication 51/57751/4
authorRobert Varga <robert.varga@pantheon.tech>
Wed, 24 May 2017 09:36:18 +0000 (11:36 +0200)
committerRobert Varga <robert.varga@pantheon.tech>
Mon, 29 May 2017 08:57:28 +0000 (10:57 +0200)
CreateLocalHistoryRequest needs to be replicated to followers before
we respond to the frontend, as logically this request has to be
persisted before any subsequent transactions.

While the frontend could replay the request on reconnect, it would
also have to track the implied persistence (via child transactions),
which we do not want because it really is a backend detail and it
would lead to a lot of complexity in the frontend.

Change-Id: Icdfad59d3c2bab3d4125186c6a9b3c901d3934f6
Signed-off-by: Robert Varga <robert.varga@pantheon.tech>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java

index 8704f2ab0cd9d33e02ab6b53a65d1aa5733fc60b..7e5addaefd5b8f8180ad4f39300af7b2f9b1e549 100644 (file)
@@ -107,7 +107,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
-                return handleCreateHistory((CreateLocalHistoryRequest) request);
+                return handleCreateHistory((CreateLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof DestroyLocalHistoryRequest) {
                 return handleDestroyHistory((DestroyLocalHistoryRequest) request, envelope, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
@@ -121,30 +121,37 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         }
     }
 
-    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request) throws RequestException {
-        final LocalHistoryIdentifier id = request.getTarget();
-        final AbstractFrontendHistory existing = localHistories.get(id);
+    private LocalHistorySuccess handleCreateHistory(final CreateLocalHistoryRequest request,
+            final RequestEnvelope envelope, final long now) throws RequestException {
+        final LocalHistoryIdentifier historyId = request.getTarget();
+        final AbstractFrontendHistory existing = localHistories.get(historyId);
         if (existing != null) {
             // History already exists: report success
-            LOG.debug("{}: history {} already exists", persistenceId, id);
-            return new LocalHistorySuccess(id, request.getSequence());
+            LOG.debug("{}: history {} already exists", persistenceId, historyId);
+            return new LocalHistorySuccess(historyId, request.getSequence());
         }
 
         // We have not found the history. Before we create it we need to check history ID sequencing so that we do not
         // end up resurrecting a purged history.
-        if (purgedHistories.contains(UnsignedLong.fromLongBits(id.getHistoryId()))) {
+        if (purgedHistories.contains(UnsignedLong.fromLongBits(historyId.getHistoryId()))) {
             LOG.debug("{}: rejecting purged request {}", persistenceId, request);
             throw new DeadHistoryException(purgedHistories);
         }
 
         // Update last history we have seen
-        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, id.getHistoryId()) < 0) {
-            lastSeenHistory = id.getHistoryId();
+        if (lastSeenHistory == null || Long.compareUnsigned(lastSeenHistory, historyId.getHistoryId()) < 0) {
+            lastSeenHistory = historyId.getHistoryId();
         }
 
-        localHistories.put(id, LocalFrontendHistory.create(persistenceId, tree, id));
-        LOG.debug("{}: created history {}", persistenceId, id);
-        return new LocalHistorySuccess(id, request.getSequence());
+        // We have to send the response only after persistence has completed
+        final ShardDataTreeTransactionChain chain = tree.ensureTransactionChain(historyId, () -> {
+            LOG.debug("{}: persisted history {}", persistenceId, historyId);
+            envelope.sendSuccess(new LocalHistorySuccess(historyId, request.getSequence()), tree.ticker().read() - now);
+        });
+
+        localHistories.put(historyId, LocalFrontendHistory.create(persistenceId, tree, chain));
+        LOG.debug("{}: created history {}", persistenceId, historyId);
+        return null;
     }
 
     private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request,
index 20d18e156e142a80467a41fad459969ba6f83d79..d7c30121bf8a9323a362814ee7449870900acb84 100644 (file)
@@ -36,9 +36,8 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
     }
 
     static LocalFrontendHistory create(final String persistenceId, final ShardDataTree tree,
-            final LocalHistoryIdentifier historyId) {
-        return new LocalFrontendHistory(persistenceId, tree, tree.ensureTransactionChain(historyId), ImmutableMap.of(),
-            TreeRangeSet.create());
+            final ShardDataTreeTransactionChain chain) {
+        return new LocalFrontendHistory(persistenceId, tree, chain, ImmutableMap.of(), TreeRangeSet.create());
     }
 
     static LocalFrontendHistory recreate(final String persistenceId, final ShardDataTree tree,
index ae09f1da78394fafd2ae0820b14386519978546b..ed36813d49a06f94d11678d626183aeb04288a97 100644 (file)
@@ -523,12 +523,15 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
         return ret;
     }
 
-    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId) {
+    ShardDataTreeTransactionChain ensureTransactionChain(final LocalHistoryIdentifier historyId,
+            @Nullable final Runnable callback) {
         ShardDataTreeTransactionChain chain = transactionChains.get(historyId);
         if (chain == null) {
             chain = new ShardDataTreeTransactionChain(historyId, this);
             transactionChains.put(historyId, chain);
-            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), null);
+            replicatePayload(historyId, CreateLocalHistoryPayload.create(historyId), callback);
+        } else if (callback != null) {
+            callback.run();
         }
 
         return chain;
@@ -539,7 +542,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return new ReadOnlyShardDataTreeTransaction(this, txId, dataTree.takeSnapshot());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadOnlyTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadOnlyTransaction(txId);
     }
 
     ReadWriteShardDataTreeTransaction newReadWriteTransaction(final TransactionIdentifier txId) {
@@ -548,7 +551,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                     .newModification());
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).newReadWriteTransaction(txId);
+        return ensureTransactionChain(txId.getHistoryId(), null).newReadWriteTransaction(txId);
     }
 
     @VisibleForTesting
@@ -976,7 +979,7 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
             return createReadyCohort(txId, mod);
         }
 
-        return ensureTransactionChain(txId.getHistoryId()).createReadyCohort(txId, mod);
+        return ensureTransactionChain(txId.getHistoryId(), null).createReadyCohort(txId, mod);
     }
 
     @SuppressFBWarnings(value = "DB_DUPLICATE_SWITCH_CLAUSES", justification = "See inline comments below.")