BUG-5280: add executionTimeNanos 86/48686/16
authorRobert Varga <rovarga@cisco.com>
Fri, 25 Nov 2016 00:06:20 +0000 (01:06 +0100)
committerTom Pantelis <tpanteli@brocade.com>
Mon, 28 Nov 2016 15:45:47 +0000 (15:45 +0000)
In order to properly measure impact of requests on the backend
we nead some indication of the complexity involved in servicing
the request. It is not feasible to estimate this by analyzing
the request itself, hence we provide a way for the backend to
communicate how complex it found a request to be back to the
frontend.

Since this measure excludes actor inbox and transport latency,
the frontend can use this measure to weigh the relative complexity
compared to other requests in has sent.

Change-Id: Ia7f435ff8a7fd995a90261b128832c340026de6d
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java
opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java
opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/FrontendTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java

index 326a514..11ec595 100644 (file)
@@ -20,7 +20,7 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
     private long sessionId;
     private long txSequence;
 
-    protected AbstractEnvelopeProxy() {
+    AbstractEnvelopeProxy() {
         // for Externalizable
     }
 
@@ -31,21 +31,21 @@ abstract class AbstractEnvelopeProxy<T extends Message<?, ?>> implements Externa
     }
 
     @Override
-    public final void writeExternal(final ObjectOutput out) throws IOException {
+    public void writeExternal(final ObjectOutput out) throws IOException {
         WritableObjects.writeLongs(out, sessionId, txSequence);
         out.writeObject(message);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+    public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
         final byte header = WritableObjects.readLongHeader(in);
         sessionId = WritableObjects.readFirstLong(in, header);
         txSequence = WritableObjects.readSecondLong(in, header);
         message = (T) in.readObject();
     }
 
-    abstract Envelope<T> createEnvelope(T wrappedNessage, long envSessionId, long envTxSequence);
+    abstract Envelope<T> createEnvelope(T wrappedNessage, long sessionId, long txSequence);
 
     final Object readResolve() {
         return createEnvelope(message, sessionId, txSequence);
index 526c97c..1dbb86b 100644 (file)
@@ -7,17 +7,41 @@
  */
 package org.opendaylight.controller.cluster.access.concepts;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
 abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
     private static final long serialVersionUID = 1L;
 
-    protected AbstractResponseEnvelopeProxy() {
+    private long executionTimeNanos;
+
+    AbstractResponseEnvelopeProxy() {
         // for Externalizable
     }
 
     AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
         super(envelope);
+        this.executionTimeNanos = envelope.getExecutionTimeNanos();
+    }
+
+    @Override
+    public final void writeExternal(final ObjectOutput out) throws IOException {
+        super.writeExternal(out);
+        WritableObjects.writeLong(out, executionTimeNanos);
     }
 
     @Override
-    abstract ResponseEnvelope<T> createEnvelope(T message, long sequence, long retry);
+    public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+        super.readExternal(in);
+        executionTimeNanos = WritableObjects.readLong(in);
+    }
+
+    @Override
+    final ResponseEnvelope<T> createEnvelope(final T message, final long sessionId, final long txSequence) {
+        return createEnvelope(message, sessionId, txSequence, executionTimeNanos);
+    }
+
+    abstract ResponseEnvelope<T> createEnvelope(T message, long sessionId, long txSequence, long executionTimeNanos);
 }
index 6c32ae2..1f641eb 100644 (file)
@@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
-        super(message, sessionId, txSequence);
+    public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence,
+            final long executionTimeNanos) {
+        super(message, sessionId, txSequence, executionTimeNanos);
     }
 
     @Override
index b9ba98e..adc50e1 100644 (file)
@@ -22,7 +22,8 @@ final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestFa
     }
 
     @Override
-    FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
-        return new FailureEnvelope(message, sessionId, txSequence);
+    ResponseEnvelope<RequestFailure<?, ?>> createEnvelope(final RequestFailure<?, ?> message, final long sessionId,
+            final long txSequence, final long executionTimeNanos) {
+        return new FailureEnvelope(message, sessionId, txSequence, executionTimeNanos);
     }
 }
index 1c6e72c..cb9034d 100644 (file)
@@ -25,10 +25,12 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
      *
      * @param cause Cause of this {@link RequestFailure}
+     * @param executionTimeNanos Time to execute the request, in nanoseconds
      * @throws NullPointerException if cause is null
      */
-    public void sendFailure(final RequestException cause) {
-        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence()));
+    public void sendFailure(final RequestException cause, final long executionTimeNanos) {
+        sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence(),
+            executionTimeNanos));
     }
 
     /**
@@ -37,8 +39,8 @@ public final class RequestEnvelope extends Envelope<Request<?, ?>> {
      * @param success Successful response
      * @throws NullPointerException if success is null
      */
-    public void sendSuccess(final RequestSuccess<?, ?> success) {
-        sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence()));
+    public void sendSuccess(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
+        sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos));
     }
 
     private void sendResponse(final ResponseEnvelope<?> envelope) {
index 9f998e7..7936baa 100644 (file)
@@ -7,10 +7,29 @@
  */
 package org.opendaylight.controller.cluster.access.concepts;
 
+import com.google.common.base.Preconditions;
+
 public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
     private static final long serialVersionUID = 1L;
 
-    ResponseEnvelope(final T message, final long sessionId, final long txSequence) {
+    private final long executionTimeNanos;
+
+    ResponseEnvelope(final T message, final long sessionId, final long txSequence, final long executionTimeNanos) {
         super(message, sessionId, txSequence);
+        Preconditions.checkArgument(executionTimeNanos >= 0);
+        this.executionTimeNanos = executionTimeNanos;
     }
+
+    /**
+     * Return the time the request took to execute in nanoseconds. This may not reflect the actual CPU time, but rather
+     * a measure of the complexity involved in servicing the original request.
+     *
+     * @return Time the request took to execute in nanoseconds
+     */
+    public final long getExecutionTimeNanos() {
+        return executionTimeNanos;
+    }
+
+    @Override
+    abstract AbstractResponseEnvelopeProxy<T> createProxy();
 }
index d98e257..3c23a23 100644 (file)
@@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.access.concepts;
 public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
     private static final long serialVersionUID = 1L;
 
-    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
-        super(message, sessionId, txSequence);
+    public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence,
+            final long executionTimeNanos) {
+        super(message, sessionId, txSequence, executionTimeNanos);
     }
 
     @Override
index b9d6183..3ac388b 100644 (file)
@@ -22,7 +22,8 @@ final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy<RequestSu
     }
 
     @Override
-    SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
-        return new SuccessEnvelope(message, sessionId, txSequence);
+    ResponseEnvelope<RequestSuccess<?, ?>> createEnvelope(final RequestSuccess<?, ?> message, final long sessionId,
+            final long txSequence, final long executionTimeNanos) {
+        return new SuccessEnvelope(message, sessionId, txSequence, executionTimeNanos);
     }
 }
index dec343f..2d5b696 100644 (file)
@@ -147,7 +147,7 @@ public class ConnectingClientConnectionTest {
         mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
         mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
         mockResponse = mockRequest.toRequestFailure(mockCause);
-        mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
+        mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
         mockCookie = ThreadLocalRandom.current().nextLong();
 
         queue = new ConnectingClientConnection<>(mockContext, mockCookie);
@@ -160,7 +160,7 @@ public class ConnectingClientConnectionTest {
 
     @Test
     public void testCookie() {
-        assertSame(mockCookie, queue.cookie());
+        assertEquals(mockCookie, queue.cookie());
     }
 
     @Test
index 7a66eab..7ddad74 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -37,17 +38,23 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
 
     private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
     private final String persistenceId;
+    private final Ticker ticker;
 
-    AbstractFrontendHistory(final String persistenceId) {
+    AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
+        this.ticker = Preconditions.checkNotNull(ticker);
     }
 
     final String persistenceId() {
         return persistenceId;
     }
 
+    final long readTime() {
+        return ticker.read();
+    }
+
     final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
 
         // FIXME: handle purging of transactions
 
@@ -76,7 +83,7 @@ abstract class AbstractFrontendHistory implements Identifiable<LocalHistoryIdent
             }
         }
 
-        return tx.handleRequest(request, envelope);
+        return tx.handleRequest(request, envelope, now);
     }
 
     abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
index 7b542db..1b15c72 100644 (file)
@@ -152,25 +152,25 @@ final class FrontendTransaction {
     }
 
     // Sequence has already been checked
-    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
-            throws RequestException {
+    @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
         if (request instanceof ModifyTransactionRequest) {
-            return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
+            return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
         } else if (request instanceof CommitLocalTransactionRequest) {
-            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
+            handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
             return null;
         } else if (request instanceof ExistsTransactionRequest) {
             return handleExistsTransaction((ExistsTransactionRequest) request);
         } else if (request instanceof ReadTransactionRequest) {
             return handleReadTransaction((ReadTransactionRequest) request);
         } else if (request instanceof TransactionPreCommitRequest) {
-            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
+            handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionDoCommitRequest) {
-            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
+            handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
             return null;
         } else if (request instanceof TransactionAbortRequest) {
-            handleTransactionAbort((TransactionAbortRequest) request, envelope);
+            handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
             return null;
         } else {
             throw new UnsupportedRequestException(request);
@@ -190,56 +190,62 @@ final class FrontendTransaction {
         return success;
     }
 
-    private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
+    private long executionTime(final long startTime) {
+        return history.readTime() - startTime;
+    }
+
+    private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+            final TransactionSuccess<?> success) {
         recordResponse(success.getSequence(), success);
-        envelope.sendSuccess(success);
+        envelope.sendSuccess(success, executionTime(startTime));
     }
 
-    private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
+    private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+            final RuntimeRequestException failure) {
         recordResponse(envelope.getMessage().getSequence(), failure);
-        envelope.sendFailure(failure);
+        envelope.sendFailure(failure, executionTime(startTime));
     }
 
     private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
-                recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+                recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
                     request.getSequence()));
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
-    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
-            throws RequestException {
+    private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
             @Override
             public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope);
+                successfulCommit(envelope, now);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
-    private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
-            throws RequestException {
+    private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+            final long now) throws RequestException {
         readyCohort.abort(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
                 readyCohort = null;
-                recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
+                recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
                 LOG.debug("Transaction {} aborted", id);
             }
 
@@ -247,89 +253,89 @@ final class FrontendTransaction {
             public void onFailure(final Throwable failure) {
                 readyCohort = null;
                 LOG.warn("Transaction {} abort failed", id, failure);
-                recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
             }
         });
     }
 
-    private void coordinatedCommit(final RequestEnvelope envelope) {
+    private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
         readyCohort.canCommit(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+                recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
                     envelope.getMessage().getSequence()));
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
-    private void directCommit(final RequestEnvelope envelope) {
+    private void directCommit(final RequestEnvelope envelope, final long now) {
         readyCohort.canCommit(new FutureCallback<Void>() {
             @Override
             public void onSuccess(final Void result) {
-                successfulDirectCanCommit(envelope);
+                successfulDirectCanCommit(envelope, now);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+                recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
                 readyCohort = null;
             }
         });
 
     }
 
-    private void successfulDirectCanCommit(final RequestEnvelope envelope) {
+    private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
             @Override
             public void onSuccess(final DataTreeCandidate result) {
-                successfulDirectPreCommit(envelope);
+                successfulDirectPreCommit(envelope, startTime);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
+                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
-    private void successfulDirectPreCommit(final RequestEnvelope envelope) {
+    private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
         readyCohort.commit(new FutureCallback<UnsignedLong>() {
 
             @Override
             public void onSuccess(final UnsignedLong result) {
-                successfulCommit(envelope);
+                successfulCommit(envelope, startTime);
             }
 
             @Override
             public void onFailure(final Throwable failure) {
-                recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
+                recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
                 readyCohort = null;
             }
         });
     }
 
-    private void successfulCommit(final RequestEnvelope envelope) {
-        recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+    private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+        recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
             envelope.getMessage().getSequence()));
         readyCohort = null;
     }
 
     private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         if (sealedModification.equals(request.getModification())) {
             readyCohort = history.createReadyCohort(id, sealedModification);
 
             if (request.isCoordinated()) {
-                coordinatedCommit(envelope);
+                coordinatedCommit(envelope, now);
             } else {
-                directCommit(envelope);
+                directCommit(envelope, now);
             }
         } else {
             throw new UnsupportedRequestException(request);
@@ -343,7 +349,8 @@ final class FrontendTransaction {
             data.isPresent()));
     }
 
-    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
+    private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+            throws RequestException {
         final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
         return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
     }
@@ -357,7 +364,7 @@ final class FrontendTransaction {
     }
 
     private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
 
         final DataTreeModification modification = openTransaction.getSnapshot();
         for (TransactionModification m : request.getModifications()) {
@@ -385,12 +392,12 @@ final class FrontendTransaction {
             case SIMPLE:
                 readyCohort = openTransaction.ready();
                 openTransaction = null;
-                directCommit(envelope);
+                directCommit(envelope, now);
                 return null;
             case THREE_PHASE:
                 readyCohort = openTransaction.ready();
                 openTransaction = null;
-                coordinatedCommit(envelope);
+                coordinatedCommit(envelope, now);
                 return null;
             default:
                 throw new UnsupportedRequestException(request);
index 3c65b79..297759b 100644 (file)
@@ -75,7 +75,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
         this.persistenceId = Preconditions.checkNotNull(persistenceId);
         this.clientId = Preconditions.checkNotNull(clientId);
         this.tree = Preconditions.checkNotNull(tree);
-        standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+        standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
     }
 
     @Override
@@ -94,16 +94,16 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         checkRequestSequence(envelope);
 
         try {
             if (request instanceof CreateLocalHistoryRequest) {
                 return handleCreateHistory((CreateLocalHistoryRequest) request);
             } else if (request instanceof DestroyLocalHistoryRequest) {
-                return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+                return handleDestroyHistory((DestroyLocalHistoryRequest) request, now);
             } else if (request instanceof PurgeLocalHistoryRequest) {
-                return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+                return handlePurgeHistory((PurgeLocalHistoryRequest)request, now);
             } else {
                 throw new UnsupportedRequestException(request);
             }
@@ -133,12 +133,13 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             lastSeenHistory = id.getHistoryId();
         }
 
-        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
+        localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id)));
         LOG.debug("{}: created history {}", persistenceId, id);
         return new LocalHistorySuccess(id, request.getSequence());
     }
 
-    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+    private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now)
+            throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.get(id);
         if (existing == null) {
@@ -147,10 +148,11 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
             return new LocalHistorySuccess(id, request.getSequence());
         }
 
-        return existing.destroy(request.getSequence());
+        return existing.destroy(request.getSequence(), now);
     }
 
-    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+    private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now)
+            throws RequestException {
         final LocalHistoryIdentifier id = request.getTarget();
         final LocalFrontendHistory existing = localHistories.remove(id);
         if (existing != null) {
@@ -158,7 +160,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
 
             if (!existing.isDestroyed()) {
                 LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
-                existing.destroy(request.getSequence());
+                existing.destroy(request.getSequence(), now);
             }
 
             // FIXME: record a PURGE tombstone in the journal
@@ -172,7 +174,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
     }
 
     @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
-            final RequestEnvelope envelope) throws RequestException {
+            final RequestEnvelope envelope, final long now) throws RequestException {
         checkRequestSequence(envelope);
 
         try {
@@ -189,7 +191,7 @@ final class LeaderFrontendState implements Identifiable<ClientIdentifier> {
                 history = standaloneHistory;
             }
 
-            return history.handleTransactionRequest(request, envelope);
+            return history.handleTransactionRequest(request, envelope, now);
         } finally {
             expectNextRequest();
         }
index a03b54f..50d1dc5 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
 import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
@@ -36,8 +37,8 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
     private Long lastSeenTransaction;
     private State state = State.OPEN;
 
-    LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) {
-        super(persistenceId);
+    LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) {
+        super(persistenceId, ticker);
         this.chain = Preconditions.checkNotNull(chain);
     }
 
@@ -66,7 +67,7 @@ final class LocalFrontendHistory extends AbstractFrontendHistory {
         return chain.createReadyCohort(id, mod);
     }
 
-    LocalHistorySuccess destroy(final long sequence) throws RequestException {
+    LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException {
         if (state != State.CLOSED) {
             LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
 
index 26f18bd..05b3093 100644 (file)
@@ -264,18 +264,21 @@ public class Shard extends RaftActor {
             }
 
             if (message instanceof RequestEnvelope) {
+                final long now = ticker().read();
                 final RequestEnvelope envelope = (RequestEnvelope)message;
+
                 try {
-                    final RequestSuccess<?, ?> success = handleRequest(envelope);
+                    final RequestSuccess<?, ?> success = handleRequest(envelope, now);
                     if (success != null) {
-                        envelope.sendSuccess(success);
+                        envelope.sendSuccess(success, ticker().read() - now);
                     }
                 } catch (RequestException e) {
                     LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
-                    envelope.sendFailure(e);
+                    envelope.sendFailure(e, ticker().read() - now);
                 } catch (Exception e) {
                     LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
-                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+                    envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+                        ticker().read() - now);
                 }
             } else if (message instanceof ConnectClientRequest) {
                 handleConnectClient((ConnectClientRequest)message);
@@ -389,7 +392,8 @@ public class Shard extends RaftActor {
         }
     }
 
-    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+    private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+            throws RequestException {
         // We are not the leader, hence we want to fail-fast.
         if (!isLeader() || !isLeaderActive()) {
             LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
@@ -400,11 +404,11 @@ public class Shard extends RaftActor {
         if (request instanceof TransactionRequest) {
             final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
             final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
-            return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+            return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
         } else if (request instanceof LocalHistoryRequest) {
             final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
             final ClientIdentifier clientId = lhReq.getTarget().getClientId();
-            return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+            return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
         } else {
             LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
             throw new UnsupportedRequestException(request);
index f18271a..5b015ea 100644 (file)
@@ -13,6 +13,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
 import com.google.common.base.Verify;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -136,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent {
                 new DefaultShardDataChangeListenerPublisher(), "");
     }
 
-    String logContext() {
+    final String logContext() {
         return logContext;
     }
 
+    final Ticker ticker() {
+        return shard.ticker();
+    }
+
     public TipProducingDataTree getDataTree() {
         return dataTree;
     }
index 1f2eb72..14b0eec 100644 (file)
@@ -8,6 +8,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestException;
@@ -24,8 +25,9 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory {
     private final LocalHistoryIdentifier identifier;
     private final ShardDataTree tree;
 
-    StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
-        super(persistenceId);
+    StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId,
+            final ShardDataTree tree) {
+        super(persistenceId, ticker);
         this.identifier = new LocalHistoryIdentifier(clientId, 0);
         this.tree = Preconditions.checkNotNull(tree);
     }

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.