BUG-7033: Remove payload replication short-circuits
[controller.git] / opendaylight / md-sal / sal-distributed-datastore / src / main / java / org / opendaylight / controller / cluster / datastore / Shard.java
index 26f18bd932ec27ef81de5d08428d70b03ff9ef60..84c399deb26be96fe969f76bf66692f7ee4b8438 100644 (file)
@@ -185,7 +185,8 @@ public class Shard extends RaftActor {
                     treeChangeListenerPublisher, dataChangeListenerPublisher, name);
         } else {
             store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
-                    treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+                    builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
+                    dataChangeListenerPublisher, name);
         }
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
@@ -264,18 +265,21 @@ public class Shard extends RaftActor {
             }
 
             if (message instanceof RequestEnvelope) {
+                final long now = ticker().read();
                 final RequestEnvelope envelope = (RequestEnvelope)message;
+
                 try {
-                    final RequestSuccess<?, ?> success = handleRequest(envelope);
+                    final RequestSuccess<?, ?> success = handleRequest(envelope, now);
                     if (success != null) {
-                        envelope.sendSuccess(success);
+                        envelope.sendSuccess(success, ticker().read() - now);
                     }
                 } catch (RequestException e) {
                     LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
-                    envelope.sendFailure(e);
+                    envelope.sendFailure(e, ticker().read() - now);
                 } catch (Exception e) {
                     LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
-                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+                        ticker().read() - now);
                 }
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
@@ -389,7 +393,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+            throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || !isLeaderActive()) {
             LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
@@ -400,11 +405,11 @@ public class Shard extends RaftActor {
         if (request instanceof TransactionRequest) {
             final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
             final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
-            return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+            return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
         } else if (request instanceof LocalHistoryRequest) {
             final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
             final ClientIdentifier clientId = lhReq.getTarget().getClientId();
-            return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+            return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
         } else {
             LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
             throw new UnsupportedRequestException(request);
@@ -445,16 +450,15 @@ public class Shard extends RaftActor {
         updateConfigParams(datastoreContext.getShardRaftConfig());
     }
 
-    boolean canSkipPayload() {
-        // If we do not have any followers and we are not using persistence we can apply modification to the state
-        // immediately
-        return !hasFollowers() && !persistence().isRecoveryApplicable();
-    }
-
     // applyState() will be invoked once consensus is reached on the payload
-    void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
-        // We are faking the sender
-        persistData(self(), transactionId, payload);
+    void persistPayload(final TransactionIdentifier transactionId, final Payload payload, boolean batchHint) {
+        boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+        if (canSkipPayload) {
+            applyState(self(), transactionId, payload);
+        } else {
+            // We are faking the sender
+            persistData(self(), transactionId, payload, batchHint);
+        }
     }
 
     private void handleCommitTransaction(final CommitTransaction commit) {