BUG-5280: add executionTimeNanos 86/48686/16
authorRobert Varga <rovarga@cisco.com>
Fri, 25 Nov 2016 00:06:20 +0000 (01:06 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 28 Nov 2016 15:45:47 +0000 (15:45 +0000)
In order to properly measure impact of requests on the backend
we nead some indication of the complexity involved in servicing
the request. It is not feasible to estimate this by analyzing
the request itself, hence we provide a way for the backend to
communicate how complex it found a request to be back to the
frontend.

Since this measure excludes actor inbox and transport latency,
the frontend can use this measure to weigh the relative complexity
compared to other requests in has sent.

Change-Id: Ia7f435ff8a7fd995a90261b128832c340026de6d
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java

index 326a514abc9400278d12a10be075cca10fd51769..11ec59578c882daa2f86c7361dad5a18628acb07 100644 (file)
@@ -20,7 +20,7 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
     private long sessionId;
     private long txSequence;
 
     private long sessionId;
     private long txSequence;
 
-    protected AbstractEnvelopeProxy() {
+    AbstractEnvelopeProxy() {
         // for Externalizable
     }
 
         // for Externalizable
     }
 
@@ -31,21 +31,21 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
     }
 
     @Override
     }
 
     @Override
-    public final void writeExternal(final ObjectOutput out) throws IOException {
+    public void writeExternal(final ObjectOutput out) throws IOException {
         WritableObjects.writeLongs(out, sessionId, txSequence);
         out.writeObject(message);
     }
 
     @SuppressWarnings("unchecked")
     @Override
         WritableObjects.writeLongs(out, sessionId, txSequence);
         out.writeObject(message);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         final byte header = WritableObjects.readLongHeader(in);
         sessionId = WritableObjects.readFirstLong(in, header);
         txSequence = WritableObjects.readSecondLong(in, header);
         message = (T) in.readObject();
     }
 
         final byte header = WritableObjects.readLongHeader(in);
         sessionId = WritableObjects.readFirstLong(in, header);
         txSequence = WritableObjects.readSecondLong(in, header);
         message = (T) in.readObject();
     }
 
-    abstract Envelope<T> createEnvelope(T wrappedNessage, long envSessionId, long envTxSequence);
+    abstract Envelope<T> createEnvelope(T wrappedNessage, long sessionId, long txSequence);
 
     final Object readResolve() {
         return createEnvelope(message, sessionId, txSequence);
 
     final Object readResolve() {
         return createEnvelope(message, sessionId, txSequence);
index 526c97ce32b3d5dd2eabcc0443fe9efdcc39866b..1dbb86bb2f073de2e0d17223870dbe57cfcdfa51 100644 (file)
@@ -7,17 +7,41 @@
  */
 package org.opendaylight.controller.cluster.access.concepts;
 
  */
 package org.opendaylight.controller.cluster.access.concepts;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
 abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
     private static final long serialVersionUID = 1L;
 
 abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
     private static final long serialVersionUID = 1L;
 
-    protected AbstractResponseEnvelopeProxy() {
+    private long executionTimeNanos;
+
+    AbstractResponseEnvelopeProxy() {
         // for Externalizable
     }
 
     AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
         super(envelope);
         // for Externalizable
     }
 
     AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
         super(envelope);
+        this.executionTimeNanos = envelope.getExecutionTimeNanos();
+    }
+
+    @Override
+    public final void writeExternal(final ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        WritableObjects.writeLong(out, executionTimeNanos);
     }
 
     @Override
     }
 
     @Override
-    abstract ResponseEnvelope<T> createEnvelope(T message, long sequence, long retry);
+    public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        executionTimeNanos = WritableObjects.readLong(in);
+    }
+
+    @Override
+    final ResponseEnvelope<T> createEnvelope(final T message, final long sessionId, final long txSequence) {
+        return createEnvelope(message, sessionId, txSequence, executionTimeNanos);
+    }
+
+    abstract ResponseEnvelope<T> createEnvelope(T message, long sessionId, long txSequence, long executionTimeNanos);
 }
 }
index 6c32ae25541de11bbb4416eb36bd7bc928c3d045..1f641eb1819f945da32ee4a9b76a403e5e7bea3d 100644 (file)
@@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
     private static final long serialVersionUID = 1L;
 
 public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
-        super(message, sessionId, txSequence);
+    public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence,
+            final long executionTimeNanos) {
+        super(message, sessionId, txSequence, executionTimeNanos);
     }
 
     @Override
     }
 
     @Override
index b9ba98e1c55531a80dd6ed9b04232350d9b6f9b9..adc50e1eaed216662135aa255b3849907adafc97 100644 (file)
@@ -22,7 +22,8 @@ final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestFa
     }
 
     @Override
     }
 
     @Override
-    FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
-        return new FailureEnvelope(message, sessionId, txSequence);
+    ResponseEnvelope<RequestFailure<?, ?>> createEnvelope(final RequestFailure<?, ?> message, final long sessionId,
+            final long txSequence, final long executionTimeNanos) {
+        return new FailureEnvelope(message, sessionId, txSequence, executionTimeNanos);
     }
 }
     }
 }
index 1c6e72c59a6596ea498cad2dbc432fd63934101b..cb9034d32483f086a97fc4b1324dcb71856abc27 100644 (file)
@@ -25,10 +25,12 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
      *
      * @param cause Cause of this {@link RequestFailure}
      * Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
      *
      * @param cause Cause of this {@link RequestFailure}
+     * @param executionTimeNanos Time to execute the request, in nanoseconds
      * @throws NullPointerException if cause is null
      */
      * @throws NullPointerException if cause is null
      */
-    public void sendFailure(final RequestException cause) {
-        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence()));
+    public void sendFailure(final RequestException cause, final long executionTimeNanos) {
+        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence(),
+            executionTimeNanos));
     }
 
     /**
     }
 
     /**
@@ -37,8 +39,8 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * @param success Successful response
      * @throws NullPointerException if success is null
      */
      * @param success Successful response
      * @throws NullPointerException if success is null
      */
-    public void sendSuccess(final RequestSuccess<?, ?> success) {
-        sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence()));
+    public void sendSuccess(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
+        sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos));
     }
 
     private void sendResponse(final ResponseEnvelope<?> envelope) {
     }
 
     private void sendResponse(final ResponseEnvelope<?> envelope) {
index 9f998e7fac49b81d71e97487ed306c7205a0dda8..7936baa1696059b184b8ff55f548ac5fd6e9229b 100644 (file)
@@ -7,10 +7,29 @@
  */
 package org.opendaylight.controller.cluster.access.concepts;
 
  */
 package org.opendaylight.controller.cluster.access.concepts;
 
+import com.google.common.base.Preconditions;
+
 public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
     private static final long serialVersionUID = 1L;
 
 public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
     private static final long serialVersionUID = 1L;
 
-    ResponseEnvelope(final T message, final long sessionId, final long txSequence) {
+    private final long executionTimeNanos;
+
+    ResponseEnvelope(final T message, final long sessionId, final long txSequence, final long executionTimeNanos) {
         super(message, sessionId, txSequence);
         super(message, sessionId, txSequence);
+        Preconditions.checkArgument(executionTimeNanos >= 0);
+        this.executionTimeNanos = executionTimeNanos;
     }
     }
+
+    /**
+     * Return the time the request took to execute in nanoseconds. This may not reflect the actual CPU time, but rather
+     * a measure of the complexity involved in servicing the original request.
+     *
+     * @return Time the request took to execute in nanoseconds
+     */
+    public final long getExecutionTimeNanos() {
+        return executionTimeNanos;
+    }
+
+    @Override
+    abstract AbstractResponseEnvelopeProxy<T> createProxy();
 }
 }
index d98e257ce0ac57715120e6c8c87272b4b7e57cfb..3c23a23763cd071abb2e967f7c58b6e9cd633ca1 100644 (file)
@@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
     private static final long serialVersionUID = 1L;
 
 public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
-        super(message, sessionId, txSequence);
+    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence,
+            final long executionTimeNanos) {
+        super(message, sessionId, txSequence, executionTimeNanos);
     }
 
     @Override
     }
 
     @Override
index b9d6183f9e68cb11569ab603042fed5149e87194..3ac388b9dba2c5143558c7cde17657bb3d09f949 100644 (file)
@@ -22,7 +22,8 @@ final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestSu
     }
 
     @Override
     }
 
     @Override
-    SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
-        return new SuccessEnvelope(message, sessionId, txSequence);
+    ResponseEnvelope<RequestSuccess<?, ?>> createEnvelope(final RequestSuccess<?, ?> message, final long sessionId,
+            final long txSequence, final long executionTimeNanos) {
+        return new SuccessEnvelope(message, sessionId, txSequence, executionTimeNanos);
     }
 }
     }
 }
index dec343fce00186cbb731d1616d1b158251be46b5..2d5b696271e13869262be29c60eb953c0ce55871 100644 (file)
@@ -147,7 +147,7 @@ public class ConnectingClientConnectionTest {
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
-        mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
+        mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
         mockCookie = ThreadLocalRandom.current().nextLong();
 
         queue = new ConnectingClientConnection<>(mockContext, mockCookie);
         mockCookie = ThreadLocalRandom.current().nextLong();
 
         queue = new ConnectingClientConnection<>(mockContext, mockCookie);
@@ -160,7 +160,7 @@ public class ConnectingClientConnectionTest {
 
     @Test
     public void testCookie() {
 
     @Test
     public void testCookie() {
-        assertSame(mockCookie, queue.cookie());
+        assertEquals(mockCookie, queue.cookie());
     }
 
     @Test
     }
 
     @Test
index 7a66eab9d0830fe7e80d739e0434e21fa4b1a914..7ddad749d26552676a8057b40ca7526677bac130 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -37,17 +38,23 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
 
     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
     private final String persistenceId;
 
     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
     private final String persistenceId;
+    private final Ticker ticker;
 
 
-    AbstractFrontendHistory(final String persistenceId) {
+    AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
+        this.ticker = Preconditions.checkNotNull(ticker);
     }
 
     final String persistenceId() {
         return persistenceId;
     }
 
     }
 
     final String persistenceId() {
         return persistenceId;
     }
 
+    final long readTime() {
+        return ticker.read();
+    }
+
     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
 
         // FIXME: handle purging of transactions
 
 
         // FIXME: handle purging of transactions
 
@@ -76,7 +83,7 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
             }
         }
 
             }
         }
 
-        return tx.handleRequest(request, envelope);
+        return tx.handleRequest(request, envelope, now);
     }
 
     abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
     }
 
     abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
index 7b542db038a82a7c59c38414485c7fff19a79360..1b15c728a8e14106f182b1933f628d2657e201e5 100644 (file)
@@ -152,25 +152,25 @@ final class FrontendTransaction {
     }
 
     // Sequence has already been checked
     }
 
     // Sequence has already been checked
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
-            throws RequestException {
+    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
         if (request instanceof ModifyTransactionRequest) {
         if (request instanceof ModifyTransactionRequest) {
-            return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
+            return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
         } else if (request instanceof CommitLocalTransactionRequest) {
         } else if (request instanceof CommitLocalTransactionRequest) {
-            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
+            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
             return null;
         } else if (request instanceof ExistsTransactionRequest) {
             return handleExistsTransaction((ExistsTransactionRequest) request);
         } else if (request instanceof ReadTransactionRequest) {
             return handleReadTransaction((ReadTransactionRequest) request);
         } else if (request instanceof TransactionPreCommitRequest) {
             return null;
         } else if (request instanceof ExistsTransactionRequest) {
             return handleExistsTransaction((ExistsTransactionRequest) request);
         } else if (request instanceof ReadTransactionRequest) {
             return handleReadTransaction((ReadTransactionRequest) request);
         } else if (request instanceof TransactionPreCommitRequest) {
-            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
+            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionDoCommitRequest) {
             return null;
         } else if (request instanceof TransactionDoCommitRequest) {
-            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
+            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            handleTransactionAbort((TransactionAbortRequest) request, envelope);
+            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
             return null;
         } else {
             throw new UnsupportedRequestException(request);
             return null;
         } else {
             throw new UnsupportedRequestException(request);
@@ -190,56 +190,62 @@ final class FrontendTransaction {
         return success;
     }
 
         return success;
     }
 
-    private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
+    private long executionTime(final long startTime) {
+        return history.readTime() - startTime;
+    }
+
+    private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+            final TransactionSuccess<?> success) {
         recordResponse(success.getSequence(), success);
         recordResponse(success.getSequence(), success);
-        envelope.sendSuccess(success);
+        envelope.sendSuccess(success, executionTime(startTime));
     }
 
     }
 
-    private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
+    private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+            final RuntimeRequestException failure) {
         recordResponse(envelope.getMessage().getSequence(), failure);
         recordResponse(envelope.getMessage().getSequence(), failure);
-        envelope.sendFailure(failure);
+        envelope.sendFailure(failure, executionTime(startTime));
     }
 
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
     }
 
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
-                recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
                     request.getSequence()));
             }
 
             @Override
             public void onFailure(final Throwable failure) {
                     request.getSequence()));
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
                 readyCohort = null;
             }
         });
     }
 
-    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
-            throws RequestException {
+    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
             @Override
             public void onSuccess(final UnsignedLong result) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
             @Override
             public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope);
+                successfulCommit(envelope, now);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
                 readyCohort = null;
             }
         });
     }
 
-    private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
-            throws RequestException {
+    private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
         readyCohort.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
                 readyCohort = null;
         readyCohort.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
                 readyCohort = null;
-                recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
                 LOG.debug("Transaction {} aborted", id);
             }
 
                 LOG.debug("Transaction {} aborted", id);
             }
 
@@ -247,89 +253,89 @@ final class FrontendTransaction {
             public void onFailure(final Throwable failure) {
                 readyCohort = null;
                 LOG.warn("Transaction {} abort failed", id, failure);
             public void onFailure(final Throwable failure) {
                 readyCohort = null;
                 LOG.warn("Transaction {} abort failed", id, failure);
-                recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
             }
         });
     }
 
             }
         });
     }
 
-    private void coordinatedCommit(final RequestEnvelope envelope) {
+    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
         readyCohort.canCommit(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
         readyCohort.canCommit(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
                     envelope.getMessage().getSequence()));
             }
 
             @Override
             public void onFailure(final Throwable failure) {
                     envelope.getMessage().getSequence()));
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
                 readyCohort = null;
             }
         });
     }
 
-    private void directCommit(final RequestEnvelope envelope) {
+    private void directCommit(final RequestEnvelope envelope, final long now) {
         readyCohort.canCommit(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
         readyCohort.canCommit(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                successfulDirectCanCommit(envelope);
+                successfulDirectCanCommit(envelope, now);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
                 readyCohort = null;
             }
         });
 
     }
 
                 readyCohort = null;
             }
         });
 
     }
 
-    private void successfulDirectCanCommit(final RequestEnvelope envelope) {
+    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
-                successfulDirectPreCommit(envelope);
+                successfulDirectPreCommit(envelope, startTime);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
+                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
                 readyCohort = null;
             }
         });
     }
 
-    private void successfulDirectPreCommit(final RequestEnvelope envelope) {
+    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
 
             @Override
             public void onSuccess(final UnsignedLong result) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
 
             @Override
             public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope);
+                successfulCommit(envelope, startTime);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
+                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
                 readyCohort = null;
             }
         });
     }
 
-    private void successfulCommit(final RequestEnvelope envelope) {
-        recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
             envelope.getMessage().getSequence()));
         readyCohort = null;
     }
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
             envelope.getMessage().getSequence()));
         readyCohort = null;
     }
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         if (sealedModification.equals(request.getModification())) {
             readyCohort = history.createReadyCohort(id, sealedModification);
 
             if (request.isCoordinated()) {
         if (sealedModification.equals(request.getModification())) {
             readyCohort = history.createReadyCohort(id, sealedModification);
 
             if (request.isCoordinated()) {
-                coordinatedCommit(envelope);
+                coordinatedCommit(envelope, now);
             } else {
             } else {
-                directCommit(envelope);
+                directCommit(envelope, now);
             }
         } else {
             throw new UnsupportedRequestException(request);
             }
         } else {
             throw new UnsupportedRequestException(request);
@@ -343,7 +349,8 @@ final class FrontendTransaction {
             data.isPresent()));
     }
 
             data.isPresent()));
     }
 
-    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
+    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+            throws RequestException {
         final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
     }
         final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
     }
@@ -357,7 +364,7 @@ final class FrontendTransaction {
     }
 
     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
     }
 
     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
 
         final DataTreeModification modification = openTransaction.getSnapshot();
         for (TransactionModification m : request.getModifications()) {
 
         final DataTreeModification modification = openTransaction.getSnapshot();
         for (TransactionModification m : request.getModifications()) {
@@ -385,12 +392,12 @@ final class FrontendTransaction {
             case SIMPLE:
                 readyCohort = openTransaction.ready();
                 openTransaction = null;
             case SIMPLE:
                 readyCohort = openTransaction.ready();
                 openTransaction = null;
-                directCommit(envelope);
+                directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
                 readyCohort = openTransaction.ready();
                 openTransaction = null;
                 return null;
             case THREE_PHASE:
                 readyCohort = openTransaction.ready();
                 openTransaction = null;
-                coordinatedCommit(envelope);
+                coordinatedCommit(envelope, now);
                 return null;
             default:
                 throw new UnsupportedRequestException(request);
                 return null;
             default:
                 throw new UnsupportedRequestException(request);
index 3c65b799d1831c498c6321df878305fb2c56a737..297759b5c86ffd798b0612ce32e96fe6f7e6af25 100644 (file)
@@ -75,7 +75,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
         this.tree = Preconditions.checkNotNull(tree);
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
         this.tree = Preconditions.checkNotNull(tree);
-        standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+        standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
     }
 
     @Override
     }
 
     @Override
@@ -94,16 +94,16 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
     }
 
     @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         checkRequestSequence(envelope);
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
                 return handleCreateHistory((CreateLocalHistoryRequest) request);
             } else if (request instanceof DestroyLocalHistoryRequest) {
         checkRequestSequence(envelope);
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
                 return handleCreateHistory((CreateLocalHistoryRequest) request);
             } else if (request instanceof DestroyLocalHistoryRequest) {
-                return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+                return handleDestroyHistory((DestroyLocalHistoryRequest) request, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
             } else if (request instanceof PurgeLocalHistoryRequest) {
-                return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+                return handlePurgeHistory((PurgeLocalHistoryRequest)request, now);
             } else {
                 throw new UnsupportedRequestException(request);
             }
             } else {
                 throw new UnsupportedRequestException(request);
             }
@@ -133,12 +133,13 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             lastSeenHistory = id.getHistoryId();
         }
 
             lastSeenHistory = id.getHistoryId();
         }
 
-        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
+        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id)));
         LOG.debug("{}: created history {}", persistenceId, id);
         return new LocalHistorySuccess(id, request.getSequence());
     }
 
         LOG.debug("{}: created history {}", persistenceId, id);
         return new LocalHistorySuccess(id, request.getSequence());
     }
 
-    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now)
+            throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.get(id);
         if (existing == null) {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.get(id);
         if (existing == null) {
@@ -147,10 +148,11 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             return new LocalHistorySuccess(id, request.getSequence());
         }
 
             return new LocalHistorySuccess(id, request.getSequence());
         }
 
-        return existing.destroy(request.getSequence());
+        return existing.destroy(request.getSequence(), now);
     }
 
     }
 
-    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now)
+            throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.remove(id);
         if (existing != null) {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.remove(id);
         if (existing != null) {
@@ -158,7 +160,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
             if (!existing.isDestroyed()) {
                 LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
 
             if (!existing.isDestroyed()) {
                 LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
-                existing.destroy(request.getSequence());
+                existing.destroy(request.getSequence(), now);
             }
 
             // FIXME: record a PURGE tombstone in the journal
             }
 
             // FIXME: record a PURGE tombstone in the journal
@@ -172,7 +174,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
     }
 
     @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         checkRequestSequence(envelope);
 
         try {
         checkRequestSequence(envelope);
 
         try {
@@ -189,7 +191,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
                 history = standaloneHistory;
             }
 
                 history = standaloneHistory;
             }
 
-            return history.handleTransactionRequest(request, envelope);
+            return history.handleTransactionRequest(request, envelope, now);
         } finally {
             expectNextRequest();
         }
         } finally {
             expectNextRequest();
         }
index a03b54fbef584bbb715edea0e68de5bfa25dc7b4..50d1dc5009db491e8b531598d504766c0e562aa0 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -36,8 +37,8 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
     private Long lastSeenTransaction;
     private State state = State.OPEN;
 
     private Long lastSeenTransaction;
     private State state = State.OPEN;
 
-    LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) {
-        super(persistenceId);
+    LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) {
+        super(persistenceId, ticker);
         this.chain = Preconditions.checkNotNull(chain);
     }
 
         this.chain = Preconditions.checkNotNull(chain);
     }
 
@@ -66,7 +67,7 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
         return chain.createReadyCohort(id, mod);
     }
 
         return chain.createReadyCohort(id, mod);
     }
 
-    LocalHistorySuccess destroy(final long sequence) throws RequestException {
+    LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException {
         if (state != State.CLOSED) {
             LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
 
         if (state != State.CLOSED) {
             LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
 
index 26f18bd932ec27ef81de5d08428d70b03ff9ef60..05b30932d225bdad02252d1f8be9e3895df7ef94 100644 (file)
@@ -264,18 +264,21 @@ public class Shard extends RaftActor {
             }
 
             if (message instanceof RequestEnvelope) {
             }
 
             if (message instanceof RequestEnvelope) {
+                final long now = ticker().read();
                 final RequestEnvelope envelope = (RequestEnvelope)message;
                 final RequestEnvelope envelope = (RequestEnvelope)message;
+
                 try {
                 try {
-                    final RequestSuccess<?, ?> success = handleRequest(envelope);
+                    final RequestSuccess<?, ?> success = handleRequest(envelope, now);
                     if (success != null) {
                     if (success != null) {
-                        envelope.sendSuccess(success);
+                        envelope.sendSuccess(success, ticker().read() - now);
                     }
                 } catch (RequestException e) {
                     LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
                     }
                 } catch (RequestException e) {
                     LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
-                    envelope.sendFailure(e);
+                    envelope.sendFailure(e, ticker().read() - now);
                 } catch (Exception e) {
                     LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
                 } catch (Exception e) {
                     LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
-                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+                        ticker().read() - now);
                 }
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
                 }
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
@@ -389,7 +392,8 @@ public class Shard extends RaftActor {
         }
     }
 
         }
     }
 
-    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+            throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || !isLeaderActive()) {
             LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || !isLeaderActive()) {
             LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
@@ -400,11 +404,11 @@ public class Shard extends RaftActor {
         if (request instanceof TransactionRequest) {
             final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
             final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
         if (request instanceof TransactionRequest) {
             final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
             final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
-            return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+            return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
         } else if (request instanceof LocalHistoryRequest) {
             final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
             final ClientIdentifier clientId = lhReq.getTarget().getClientId();
         } else if (request instanceof LocalHistoryRequest) {
             final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
             final ClientIdentifier clientId = lhReq.getTarget().getClientId();
-            return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+            return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
         } else {
             LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
             throw new UnsupportedRequestException(request);
         } else {
             LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
             throw new UnsupportedRequestException(request);
index f18271ae36671c342d7b5172a9e40c6b6c662d2b..5b015ead9acee3034600a86c6bad4a93ea4c62b5 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -136,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 new DefaultShardDataChangeListenerPublisher(), "");
     }
 
                 new DefaultShardDataChangeListenerPublisher(), "");
     }
 
-    String logContext() {
+    final String logContext() {
         return logContext;
     }
 
         return logContext;
     }
 
+    final Ticker ticker() {
+        return shard.ticker();
+    }
+
     public TipProducingDataTree getDataTree() {
         return dataTree;
     }
     public TipProducingDataTree getDataTree() {
         return dataTree;
     }
index 1f2eb72560aef600a2deb9520600f8f845600fd3..14b0eecaa2edbd3b2535573e2f6d06ce46f87b44 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -24,8 +25,9 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
     private final LocalHistoryIdentifier identifier;
     private final ShardDataTree tree;
 
     private final LocalHistoryIdentifier identifier;
     private final ShardDataTree tree;
 
-    StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
-        super(persistenceId);
+    StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId,
+            final ShardDataTree tree) {
+        super(persistenceId, ticker);
         this.identifier = new LocalHistoryIdentifier(clientId, 0);
         this.tree = Preconditions.checkNotNull(tree);
     }
         this.identifier = new LocalHistoryIdentifier(clientId, 0);
         this.tree = Preconditions.checkNotNull(tree);
     }