From cd2a6fa0d8fa6281be28d3c7b9828ecf4e932811 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 25 Nov 2016 01:06:20 +0100 Subject: [PATCH] BUG-5280: add executionTimeNanos In order to properly measure impact of requests on the backend we nead some indication of the complexity involved in servicing the request. It is not feasible to estimate this by analyzing the request itself, hence we provide a way for the backend to communicate how complex it found a request to be back to the frontend. Since this measure excludes actor inbox and transport latency, the frontend can use this measure to weigh the relative complexity compared to other requests in has sent. Change-Id: Ia7f435ff8a7fd995a90261b128832c340026de6d Signed-off-by: Robert Varga --- .../concepts/AbstractEnvelopeProxy.java | 8 +- .../AbstractResponseEnvelopeProxy.java | 28 +++++- .../access/concepts/FailureEnvelope.java | 5 +- .../access/concepts/FailureEnvelopeProxy.java | 5 +- .../access/concepts/RequestEnvelope.java | 10 +- .../access/concepts/ResponseEnvelope.java | 21 ++++- .../access/concepts/SuccessEnvelope.java | 5 +- .../access/concepts/SuccessEnvelopeProxy.java | 5 +- .../ConnectingClientConnectionTest.java | 4 +- .../datastore/AbstractFrontendHistory.java | 13 ++- .../datastore/FrontendTransaction.java | 93 ++++++++++--------- .../datastore/LeaderFrontendState.java | 24 ++--- .../datastore/LocalFrontendHistory.java | 7 +- .../controller/cluster/datastore/Shard.java | 18 ++-- .../cluster/datastore/ShardDataTree.java | 7 +- .../datastore/StandaloneFrontendHistory.java | 6 +- 16 files changed, 168 insertions(+), 91 deletions(-) diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java index 326a514abc..11ec59578c 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractEnvelopeProxy.java @@ -20,7 +20,7 @@ abstract class AbstractEnvelopeProxy> implements Externa private long sessionId; private long txSequence; - protected AbstractEnvelopeProxy() { + AbstractEnvelopeProxy() { // for Externalizable } @@ -31,21 +31,21 @@ abstract class AbstractEnvelopeProxy> implements Externa } @Override - public final void writeExternal(final ObjectOutput out) throws IOException { + public void writeExternal(final ObjectOutput out) throws IOException { WritableObjects.writeLongs(out, sessionId, txSequence); out.writeObject(message); } @SuppressWarnings("unchecked") @Override - public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { final byte header = WritableObjects.readLongHeader(in); sessionId = WritableObjects.readFirstLong(in, header); txSequence = WritableObjects.readSecondLong(in, header); message = (T) in.readObject(); } - abstract Envelope createEnvelope(T wrappedNessage, long envSessionId, long envTxSequence); + abstract Envelope createEnvelope(T wrappedNessage, long sessionId, long txSequence); final Object readResolve() { return createEnvelope(message, sessionId, txSequence); diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java index 526c97ce32..1dbb86bb2f 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/AbstractResponseEnvelopeProxy.java @@ -7,17 +7,41 @@ */ package org.opendaylight.controller.cluster.access.concepts; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import org.opendaylight.yangtools.concepts.WritableObjects; + abstract class AbstractResponseEnvelopeProxy> extends AbstractEnvelopeProxy { private static final long serialVersionUID = 1L; - protected AbstractResponseEnvelopeProxy() { + private long executionTimeNanos; + + AbstractResponseEnvelopeProxy() { // for Externalizable } AbstractResponseEnvelopeProxy(final ResponseEnvelope envelope) { super(envelope); + this.executionTimeNanos = envelope.getExecutionTimeNanos(); + } + + @Override + public final void writeExternal(final ObjectOutput out) throws IOException { + super.writeExternal(out); + WritableObjects.writeLong(out, executionTimeNanos); } @Override - abstract ResponseEnvelope createEnvelope(T message, long sequence, long retry); + public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { + super.readExternal(in); + executionTimeNanos = WritableObjects.readLong(in); + } + + @Override + final ResponseEnvelope createEnvelope(final T message, final long sessionId, final long txSequence) { + return createEnvelope(message, sessionId, txSequence, executionTimeNanos); + } + + abstract ResponseEnvelope createEnvelope(T message, long sessionId, long txSequence, long executionTimeNanos); } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java index 6c32ae2554..1f641eb181 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelope.java @@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.access.concepts; public final class FailureEnvelope extends ResponseEnvelope> { private static final long serialVersionUID = 1L; - public FailureEnvelope(final RequestFailure message, final long sessionId, final long txSequence) { - super(message, sessionId, txSequence); + public FailureEnvelope(final RequestFailure message, final long sessionId, final long txSequence, + final long executionTimeNanos) { + super(message, sessionId, txSequence, executionTimeNanos); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java index b9ba98e1c5..adc50e1eae 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/FailureEnvelopeProxy.java @@ -22,7 +22,8 @@ final class FailureEnvelopeProxy extends AbstractResponseEnvelopeProxy message, final long sessionId, final long txSequence) { - return new FailureEnvelope(message, sessionId, txSequence); + ResponseEnvelope> createEnvelope(final RequestFailure message, final long sessionId, + final long txSequence, final long executionTimeNanos) { + return new FailureEnvelope(message, sessionId, txSequence, executionTimeNanos); } } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java index 1c6e72c59a..cb9034d324 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/RequestEnvelope.java @@ -25,10 +25,12 @@ public final class RequestEnvelope extends Envelope> { * Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}. * * @param cause Cause of this {@link RequestFailure} + * @param executionTimeNanos Time to execute the request, in nanoseconds * @throws NullPointerException if cause is null */ - public void sendFailure(final RequestException cause) { - sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence())); + public void sendFailure(final RequestException cause, final long executionTimeNanos) { + sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence(), + executionTimeNanos)); } /** @@ -37,8 +39,8 @@ public final class RequestEnvelope extends Envelope> { * @param success Successful response * @throws NullPointerException if success is null */ - public void sendSuccess(final RequestSuccess success) { - sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence())); + public void sendSuccess(final RequestSuccess success, final long executionTimeNanos) { + sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos)); } private void sendResponse(final ResponseEnvelope envelope) { diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java index 9f998e7fac..7936baa169 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/ResponseEnvelope.java @@ -7,10 +7,29 @@ */ package org.opendaylight.controller.cluster.access.concepts; +import com.google.common.base.Preconditions; + public abstract class ResponseEnvelope> extends Envelope { private static final long serialVersionUID = 1L; - ResponseEnvelope(final T message, final long sessionId, final long txSequence) { + private final long executionTimeNanos; + + ResponseEnvelope(final T message, final long sessionId, final long txSequence, final long executionTimeNanos) { super(message, sessionId, txSequence); + Preconditions.checkArgument(executionTimeNanos >= 0); + this.executionTimeNanos = executionTimeNanos; } + + /** + * Return the time the request took to execute in nanoseconds. This may not reflect the actual CPU time, but rather + * a measure of the complexity involved in servicing the original request. + * + * @return Time the request took to execute in nanoseconds + */ + public final long getExecutionTimeNanos() { + return executionTimeNanos; + } + + @Override + abstract AbstractResponseEnvelopeProxy createProxy(); } diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java index d98e257ce0..3c23a23763 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelope.java @@ -10,8 +10,9 @@ package org.opendaylight.controller.cluster.access.concepts; public final class SuccessEnvelope extends ResponseEnvelope> { private static final long serialVersionUID = 1L; - public SuccessEnvelope(final RequestSuccess message, final long sessionId, final long txSequence) { - super(message, sessionId, txSequence); + public SuccessEnvelope(final RequestSuccess message, final long sessionId, final long txSequence, + final long executionTimeNanos) { + super(message, sessionId, txSequence, executionTimeNanos); } @Override diff --git a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java index b9d6183f9e..3ac388b9db 100644 --- a/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java +++ b/opendaylight/md-sal/cds-access-api/src/main/java/org/opendaylight/controller/cluster/access/concepts/SuccessEnvelopeProxy.java @@ -22,7 +22,8 @@ final class SuccessEnvelopeProxy extends AbstractResponseEnvelopeProxy message, final long sessionId, final long txSequence) { - return new SuccessEnvelope(message, sessionId, txSequence); + ResponseEnvelope> createEnvelope(final RequestSuccess message, final long sessionId, + final long txSequence, final long executionTimeNanos) { + return new SuccessEnvelope(message, sessionId, txSequence, executionTimeNanos); } } diff --git a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java index dec343fce0..2d5b696271 100644 --- a/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java +++ b/opendaylight/md-sal/cds-access-client/src/test/java/org/opendaylight/controller/cluster/access/client/ConnectingClientConnectionTest.java @@ -147,7 +147,7 @@ public class ConnectingClientConnectionTest { mockRequest = new MockRequest(mockIdentifier, mockReplyTo); mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo); mockResponse = mockRequest.toRequestFailure(mockCause); - mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0); + mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0); mockCookie = ThreadLocalRandom.current().nextLong(); queue = new ConnectingClientConnection<>(mockContext, mockCookie); @@ -160,7 +160,7 @@ public class ConnectingClientConnectionTest { @Test public void testCookie() { - assertSame(mockCookie, queue.cookie()); + assertEquals(mockCookie, queue.cookie()); } @Test diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java index 7a66eab9d0..7ddad749d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractFrontendHistory.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -37,17 +38,23 @@ abstract class AbstractFrontendHistory implements Identifiable transactions = new HashMap<>(); private final String persistenceId; + private final Ticker ticker; - AbstractFrontendHistory(final String persistenceId) { + AbstractFrontendHistory(final String persistenceId, final Ticker ticker) { this.persistenceId = Preconditions.checkNotNull(persistenceId); + this.ticker = Preconditions.checkNotNull(ticker); } final String persistenceId() { return persistenceId; } + final long readTime() { + return ticker.read(); + } + final @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { // FIXME: handle purging of transactions @@ -76,7 +83,7 @@ abstract class AbstractFrontendHistory implements Identifiable handleRequest(final TransactionRequest request, final RequestEnvelope envelope) - throws RequestException { + @Nullable TransactionSuccess handleRequest(final TransactionRequest request, final RequestEnvelope envelope, + final long now) throws RequestException { if (request instanceof ModifyTransactionRequest) { - return handleModifyTransaction((ModifyTransactionRequest) request, envelope); + return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now); } else if (request instanceof CommitLocalTransactionRequest) { - handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope); + handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now); return null; } else if (request instanceof ExistsTransactionRequest) { return handleExistsTransaction((ExistsTransactionRequest) request); } else if (request instanceof ReadTransactionRequest) { return handleReadTransaction((ReadTransactionRequest) request); } else if (request instanceof TransactionPreCommitRequest) { - handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope); + handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionDoCommitRequest) { - handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope); + handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now); return null; } else if (request instanceof TransactionAbortRequest) { - handleTransactionAbort((TransactionAbortRequest) request, envelope); + handleTransactionAbort((TransactionAbortRequest) request, envelope, now); return null; } else { throw new UnsupportedRequestException(request); @@ -190,56 +190,62 @@ final class FrontendTransaction { return success; } - private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess success) { + private long executionTime(final long startTime) { + return history.readTime() - startTime; + } + + private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime, + final TransactionSuccess success) { recordResponse(success.getSequence(), success); - envelope.sendSuccess(success); + envelope.sendSuccess(success, executionTime(startTime)); } - private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) { + private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime, + final RuntimeRequestException failure) { recordResponse(envelope.getMessage().getSequence(), failure); - envelope.sendFailure(failure); + envelope.sendFailure(failure, executionTime(startTime)); } private void handleTransactionPreCommit(final TransactionPreCommitRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { - recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), + recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(), request.getSequence())); } @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure)); + recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure)); readyCohort = null; } }); } - private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope) - throws RequestException { + private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope, + final long now) throws RequestException { readyCohort.commit(new FutureCallback() { @Override public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope); + successfulCommit(envelope, now); } @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure)); + recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure)); readyCohort = null; } }); } - private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope) - throws RequestException { + private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope, + final long now) throws RequestException { readyCohort.abort(new FutureCallback() { @Override public void onSuccess(final Void result) { readyCohort = null; - recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence())); + recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence())); LOG.debug("Transaction {} aborted", id); } @@ -247,89 +253,89 @@ final class FrontendTransaction { public void onFailure(final Throwable failure) { readyCohort = null; LOG.warn("Transaction {} abort failed", id, failure); - recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure)); + recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure)); } }); } - private void coordinatedCommit(final RequestEnvelope envelope) { + private void coordinatedCommit(final RequestEnvelope envelope, final long now) { readyCohort.canCommit(new FutureCallback() { @Override public void onSuccess(final Void result) { - recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), + recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(), envelope.getMessage().getSequence())); } @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure)); + recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); readyCohort = null; } }); } - private void directCommit(final RequestEnvelope envelope) { + private void directCommit(final RequestEnvelope envelope, final long now) { readyCohort.canCommit(new FutureCallback() { @Override public void onSuccess(final Void result) { - successfulDirectCanCommit(envelope); + successfulDirectCanCommit(envelope, now); } @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure)); + recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure)); readyCohort = null; } }); } - private void successfulDirectCanCommit(final RequestEnvelope envelope) { + private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.preCommit(new FutureCallback() { @Override public void onSuccess(final DataTreeCandidate result) { - successfulDirectPreCommit(envelope); + successfulDirectPreCommit(envelope, startTime); } @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure)); + recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure)); readyCohort = null; } }); } - private void successfulDirectPreCommit(final RequestEnvelope envelope) { + private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) { readyCohort.commit(new FutureCallback() { @Override public void onSuccess(final UnsignedLong result) { - successfulCommit(envelope); + successfulCommit(envelope, startTime); } @Override public void onFailure(final Throwable failure) { - recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure)); + recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure)); readyCohort = null; } }); } - private void successfulCommit(final RequestEnvelope envelope) { - recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(), + private void successfulCommit(final RequestEnvelope envelope, final long startTime) { + recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(), envelope.getMessage().getSequence())); readyCohort = null; } private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { if (sealedModification.equals(request.getModification())) { readyCohort = history.createReadyCohort(id, sealedModification); if (request.isCoordinated()) { - coordinatedCommit(envelope); + coordinatedCommit(envelope, now); } else { - directCommit(envelope); + directCommit(envelope, now); } } else { throw new UnsupportedRequestException(request); @@ -343,7 +349,8 @@ final class FrontendTransaction { data.isPresent())); } - private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException { + private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) + throws RequestException { final Optional> data = openTransaction.getSnapshot().readNode(request.getPath()); return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data)); } @@ -357,7 +364,7 @@ final class FrontendTransaction { } private @Nullable TransactionSuccess handleModifyTransaction(final ModifyTransactionRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { final DataTreeModification modification = openTransaction.getSnapshot(); for (TransactionModification m : request.getModifications()) { @@ -385,12 +392,12 @@ final class FrontendTransaction { case SIMPLE: readyCohort = openTransaction.ready(); openTransaction = null; - directCommit(envelope); + directCommit(envelope, now); return null; case THREE_PHASE: readyCohort = openTransaction.ready(); openTransaction = null; - coordinatedCommit(envelope); + coordinatedCommit(envelope, now); return null; default: throw new UnsupportedRequestException(request); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java index 3c65b799d1..297759b5c8 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LeaderFrontendState.java @@ -75,7 +75,7 @@ final class LeaderFrontendState implements Identifiable { this.persistenceId = Preconditions.checkNotNull(persistenceId); this.clientId = Preconditions.checkNotNull(clientId); this.tree = Preconditions.checkNotNull(tree); - standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree); + standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree); } @Override @@ -94,16 +94,16 @@ final class LeaderFrontendState implements Identifiable { } @Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { checkRequestSequence(envelope); try { if (request instanceof CreateLocalHistoryRequest) { return handleCreateHistory((CreateLocalHistoryRequest) request); } else if (request instanceof DestroyLocalHistoryRequest) { - return handleDestroyHistory((DestroyLocalHistoryRequest) request); + return handleDestroyHistory((DestroyLocalHistoryRequest) request, now); } else if (request instanceof PurgeLocalHistoryRequest) { - return handlePurgeHistory((PurgeLocalHistoryRequest)request); + return handlePurgeHistory((PurgeLocalHistoryRequest)request, now); } else { throw new UnsupportedRequestException(request); } @@ -133,12 +133,13 @@ final class LeaderFrontendState implements Identifiable { lastSeenHistory = id.getHistoryId(); } - localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id))); + localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id))); LOG.debug("{}: created history {}", persistenceId, id); return new LocalHistorySuccess(id, request.getSequence()); } - private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException { + private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now) + throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.get(id); if (existing == null) { @@ -147,10 +148,11 @@ final class LeaderFrontendState implements Identifiable { return new LocalHistorySuccess(id, request.getSequence()); } - return existing.destroy(request.getSequence()); + return existing.destroy(request.getSequence(), now); } - private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException { + private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now) + throws RequestException { final LocalHistoryIdentifier id = request.getTarget(); final LocalFrontendHistory existing = localHistories.remove(id); if (existing != null) { @@ -158,7 +160,7 @@ final class LeaderFrontendState implements Identifiable { if (!existing.isDestroyed()) { LOG.warn("{}: purging undestroyed history {}", persistenceId, id); - existing.destroy(request.getSequence()); + existing.destroy(request.getSequence(), now); } // FIXME: record a PURGE tombstone in the journal @@ -172,7 +174,7 @@ final class LeaderFrontendState implements Identifiable { } @Nullable TransactionSuccess handleTransactionRequest(final TransactionRequest request, - final RequestEnvelope envelope) throws RequestException { + final RequestEnvelope envelope, final long now) throws RequestException { checkRequestSequence(envelope); try { @@ -189,7 +191,7 @@ final class LeaderFrontendState implements Identifiable { history = standaloneHistory; } - return history.handleTransactionRequest(request, envelope); + return history.handleTransactionRequest(request, envelope, now); } finally { expectNextRequest(); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java index a03b54fbef..50d1dc5009 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/LocalFrontendHistory.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; import org.opendaylight.controller.cluster.access.commands.DeadTransactionException; import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -36,8 +37,8 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { private Long lastSeenTransaction; private State state = State.OPEN; - LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) { - super(persistenceId); + LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) { + super(persistenceId, ticker); this.chain = Preconditions.checkNotNull(chain); } @@ -66,7 +67,7 @@ final class LocalFrontendHistory extends AbstractFrontendHistory { return chain.createReadyCohort(id, mod); } - LocalHistorySuccess destroy(final long sequence) throws RequestException { + LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException { if (state != State.CLOSED) { LOG.debug("{}: closing history {}", persistenceId(), getIdentifier()); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 26f18bd932..05b30932d2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -264,18 +264,21 @@ public class Shard extends RaftActor { } if (message instanceof RequestEnvelope) { + final long now = ticker().read(); final RequestEnvelope envelope = (RequestEnvelope)message; + try { - final RequestSuccess success = handleRequest(envelope); + final RequestSuccess success = handleRequest(envelope, now); if (success != null) { - envelope.sendSuccess(success); + envelope.sendSuccess(success, ticker().read() - now); } } catch (RequestException e) { LOG.debug("{}: request {} failed", persistenceId(), envelope, e); - envelope.sendFailure(e); + envelope.sendFailure(e, ticker().read() - now); } catch (Exception e) { LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e); - envelope.sendFailure(new RuntimeRequestException("Request failed to process", e)); + envelope.sendFailure(new RuntimeRequestException("Request failed to process", e), + ticker().read() - now); } } else if (message instanceof ConnectClientRequest) { handleConnectClient((ConnectClientRequest)message); @@ -389,7 +392,8 @@ public class Shard extends RaftActor { } } - private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope) throws RequestException { + private @Nullable RequestSuccess handleRequest(final RequestEnvelope envelope, final long now) + throws RequestException { // We are not the leader, hence we want to fail-fast. if (!isLeader() || !isLeaderActive()) { LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope); @@ -400,11 +404,11 @@ public class Shard extends RaftActor { if (request instanceof TransactionRequest) { final TransactionRequest txReq = (TransactionRequest)request; final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId(); - return getFrontend(clientId).handleTransactionRequest(txReq, envelope); + return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now); } else if (request instanceof LocalHistoryRequest) { final LocalHistoryRequest lhReq = (LocalHistoryRequest)request; final ClientIdentifier clientId = lhReq.getTarget().getClientId(); - return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope); + return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now); } else { LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request); throw new UnsupportedRequestException(request); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java index f18271ae36..5b015ead9a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardDataTree.java @@ -13,6 +13,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.base.Verify; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -136,10 +137,14 @@ public class ShardDataTree extends ShardDataTreeTransactionParent { new DefaultShardDataChangeListenerPublisher(), ""); } - String logContext() { + final String logContext() { return logContext; } + final Ticker ticker() { + return shard.ticker(); + } + public TipProducingDataTree getDataTree() { return dataTree; } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java index 1f2eb72560..14b0eecaa2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/StandaloneFrontendHistory.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; import com.google.common.base.Preconditions; +import com.google.common.base.Ticker; import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestException; @@ -24,8 +25,9 @@ final class StandaloneFrontendHistory extends AbstractFrontendHistory { private final LocalHistoryIdentifier identifier; private final ShardDataTree tree; - StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) { - super(persistenceId); + StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId, + final ShardDataTree tree) { + super(persistenceId, ticker); this.identifier = new LocalHistoryIdentifier(clientId, 0); this.tree = Preconditions.checkNotNull(tree); } -- 2.36.6