In order to properly measure impact of requests on the backend
we nead some indication of the complexity involved in servicing
the request. It is not feasible to estimate this by analyzing
the request itself, hence we provide a way for the backend to
communicate how complex it found a request to be back to the
frontend.
Since this measure excludes actor inbox and transport latency,
the frontend can use this measure to weigh the relative complexity
compared to other requests in has sent.
Change-Id: Ia7f435ff8a7fd995a90261b128832c340026de6d
Signed-off-by: Robert Varga <rovarga@cisco.com>
16 files changed:
private long sessionId;
private long txSequence;
private long sessionId;
private long txSequence;
- protected AbstractEnvelopeProxy() {
+ AbstractEnvelopeProxy() {
- public final void writeExternal(final ObjectOutput out) throws IOException {
+ public void writeExternal(final ObjectOutput out) throws IOException {
WritableObjects.writeLongs(out, sessionId, txSequence);
out.writeObject(message);
}
@SuppressWarnings("unchecked")
@Override
WritableObjects.writeLongs(out, sessionId, txSequence);
out.writeObject(message);
}
@SuppressWarnings("unchecked")
@Override
- public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
final byte header = WritableObjects.readLongHeader(in);
sessionId = WritableObjects.readFirstLong(in, header);
txSequence = WritableObjects.readSecondLong(in, header);
message = (T) in.readObject();
}
final byte header = WritableObjects.readLongHeader(in);
sessionId = WritableObjects.readFirstLong(in, header);
txSequence = WritableObjects.readSecondLong(in, header);
message = (T) in.readObject();
}
- abstract Envelope<T> createEnvelope(T wrappedNessage, long envSessionId, long envTxSequence);
+ abstract Envelope<T> createEnvelope(T wrappedNessage, long sessionId, long txSequence);
final Object readResolve() {
return createEnvelope(message, sessionId, txSequence);
final Object readResolve() {
return createEnvelope(message, sessionId, txSequence);
*/
package org.opendaylight.controller.cluster.access.concepts;
*/
package org.opendaylight.controller.cluster.access.concepts;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
private static final long serialVersionUID = 1L;
abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
private static final long serialVersionUID = 1L;
- protected AbstractResponseEnvelopeProxy() {
+ private long executionTimeNanos;
+
+ AbstractResponseEnvelopeProxy() {
// for Externalizable
}
AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
super(envelope);
// for Externalizable
}
AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
super(envelope);
+ this.executionTimeNanos = envelope.getExecutionTimeNanos();
+ }
+
+ @Override
+ public final void writeExternal(final ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ WritableObjects.writeLong(out, executionTimeNanos);
- abstract ResponseEnvelope<T> createEnvelope(T message, long sequence, long retry);
+ public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ executionTimeNanos = WritableObjects.readLong(in);
+ }
+
+ @Override
+ final ResponseEnvelope<T> createEnvelope(final T message, final long sessionId, final long txSequence) {
+ return createEnvelope(message, sessionId, txSequence, executionTimeNanos);
+ }
+
+ abstract ResponseEnvelope<T> createEnvelope(T message, long sessionId, long txSequence, long executionTimeNanos);
public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
private static final long serialVersionUID = 1L;
public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
private static final long serialVersionUID = 1L;
- public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
- super(message, sessionId, txSequence);
+ public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence,
+ final long executionTimeNanos) {
+ super(message, sessionId, txSequence, executionTimeNanos);
- FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
- return new FailureEnvelope(message, sessionId, txSequence);
+ ResponseEnvelope<RequestFailure<?, ?>> createEnvelope(final RequestFailure<?, ?> message, final long sessionId,
+ final long txSequence, final long executionTimeNanos) {
+ return new FailureEnvelope(message, sessionId, txSequence, executionTimeNanos);
* Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
*
* @param cause Cause of this {@link RequestFailure}
* Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
*
* @param cause Cause of this {@link RequestFailure}
+ * @param executionTimeNanos Time to execute the request, in nanoseconds
* @throws NullPointerException if cause is null
*/
* @throws NullPointerException if cause is null
*/
- public void sendFailure(final RequestException cause) {
- sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence()));
+ public void sendFailure(final RequestException cause, final long executionTimeNanos) {
+ sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence(),
+ executionTimeNanos));
* @param success Successful response
* @throws NullPointerException if success is null
*/
* @param success Successful response
* @throws NullPointerException if success is null
*/
- public void sendSuccess(final RequestSuccess<?, ?> success) {
- sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence()));
+ public void sendSuccess(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
+ sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos));
}
private void sendResponse(final ResponseEnvelope<?> envelope) {
}
private void sendResponse(final ResponseEnvelope<?> envelope) {
*/
package org.opendaylight.controller.cluster.access.concepts;
*/
package org.opendaylight.controller.cluster.access.concepts;
+import com.google.common.base.Preconditions;
+
public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
private static final long serialVersionUID = 1L;
public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
private static final long serialVersionUID = 1L;
- ResponseEnvelope(final T message, final long sessionId, final long txSequence) {
+ private final long executionTimeNanos;
+
+ ResponseEnvelope(final T message, final long sessionId, final long txSequence, final long executionTimeNanos) {
super(message, sessionId, txSequence);
super(message, sessionId, txSequence);
+ Preconditions.checkArgument(executionTimeNanos >= 0);
+ this.executionTimeNanos = executionTimeNanos;
+
+ /**
+ * Return the time the request took to execute in nanoseconds. This may not reflect the actual CPU time, but rather
+ * a measure of the complexity involved in servicing the original request.
+ *
+ * @return Time the request took to execute in nanoseconds
+ */
+ public final long getExecutionTimeNanos() {
+ return executionTimeNanos;
+ }
+
+ @Override
+ abstract AbstractResponseEnvelopeProxy<T> createProxy();
public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
private static final long serialVersionUID = 1L;
public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
private static final long serialVersionUID = 1L;
- public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
- super(message, sessionId, txSequence);
+ public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence,
+ final long executionTimeNanos) {
+ super(message, sessionId, txSequence, executionTimeNanos);
- SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
- return new SuccessEnvelope(message, sessionId, txSequence);
+ ResponseEnvelope<RequestSuccess<?, ?>> createEnvelope(final RequestSuccess<?, ?> message, final long sessionId,
+ final long txSequence, final long executionTimeNanos) {
+ return new SuccessEnvelope(message, sessionId, txSequence, executionTimeNanos);
mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
mockResponse = mockRequest.toRequestFailure(mockCause);
mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
mockResponse = mockRequest.toRequestFailure(mockCause);
- mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
+ mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
mockCookie = ThreadLocalRandom.current().nextLong();
queue = new ConnectingClientConnection<>(mockContext, mockCookie);
mockCookie = ThreadLocalRandom.current().nextLong();
queue = new ConnectingClientConnection<>(mockContext, mockCookie);
@Test
public void testCookie() {
@Test
public void testCookie() {
- assertSame(mockCookie, queue.cookie());
+ assertEquals(mockCookie, queue.cookie());
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
private final String persistenceId;
private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
private final String persistenceId;
+ private final Ticker ticker;
- AbstractFrontendHistory(final String persistenceId) {
+ AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.ticker = Preconditions.checkNotNull(ticker);
}
final String persistenceId() {
return persistenceId;
}
}
final String persistenceId() {
return persistenceId;
}
+ final long readTime() {
+ return ticker.read();
+ }
+
final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
// FIXME: handle purging of transactions
// FIXME: handle purging of transactions
- return tx.handleRequest(request, envelope);
+ return tx.handleRequest(request, envelope, now);
}
abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
}
abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
}
// Sequence has already been checked
}
// Sequence has already been checked
- @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
- throws RequestException {
+ @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
if (request instanceof ModifyTransactionRequest) {
if (request instanceof ModifyTransactionRequest) {
- return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
+ return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
} else if (request instanceof CommitLocalTransactionRequest) {
} else if (request instanceof CommitLocalTransactionRequest) {
- handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
+ handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
return null;
} else if (request instanceof ExistsTransactionRequest) {
return handleExistsTransaction((ExistsTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
return handleReadTransaction((ReadTransactionRequest) request);
} else if (request instanceof TransactionPreCommitRequest) {
return null;
} else if (request instanceof ExistsTransactionRequest) {
return handleExistsTransaction((ExistsTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
return handleReadTransaction((ReadTransactionRequest) request);
} else if (request instanceof TransactionPreCommitRequest) {
- handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
+ handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionDoCommitRequest) {
return null;
} else if (request instanceof TransactionDoCommitRequest) {
- handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
+ handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionAbortRequest) {
return null;
} else if (request instanceof TransactionAbortRequest) {
- handleTransactionAbort((TransactionAbortRequest) request, envelope);
+ handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
return null;
} else {
throw new UnsupportedRequestException(request);
return null;
} else {
throw new UnsupportedRequestException(request);
- private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
+ private long executionTime(final long startTime) {
+ return history.readTime() - startTime;
+ }
+
+ private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+ final TransactionSuccess<?> success) {
recordResponse(success.getSequence(), success);
recordResponse(success.getSequence(), success);
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, executionTime(startTime));
- private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
+ private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+ final RuntimeRequestException failure) {
recordResponse(envelope.getMessage().getSequence(), failure);
recordResponse(envelope.getMessage().getSequence(), failure);
- envelope.sendFailure(failure);
+ envelope.sendFailure(failure, executionTime(startTime));
}
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
}
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+ recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
request.getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
request.getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
readyCohort = null;
}
});
}
readyCohort = null;
}
});
}
- private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
- throws RequestException {
+ private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
+ successfulCommit(envelope, now);
}
@Override
public void onFailure(final Throwable failure) {
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
readyCohort = null;
}
});
}
readyCohort = null;
}
});
}
- private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
- throws RequestException {
+ private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
readyCohort.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
readyCohort = null;
readyCohort.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
readyCohort = null;
- recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
+ recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
LOG.debug("Transaction {} aborted", id);
}
LOG.debug("Transaction {} aborted", id);
}
public void onFailure(final Throwable failure) {
readyCohort = null;
LOG.warn("Transaction {} abort failed", id, failure);
public void onFailure(final Throwable failure) {
readyCohort = null;
LOG.warn("Transaction {} abort failed", id, failure);
- recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
- private void coordinatedCommit(final RequestEnvelope envelope) {
+ private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+ recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
envelope.getMessage().getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
envelope.getMessage().getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
readyCohort = null;
}
});
}
readyCohort = null;
}
});
}
- private void directCommit(final RequestEnvelope envelope) {
+ private void directCommit(final RequestEnvelope envelope, final long now) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope);
+ successfulDirectCanCommit(envelope, now);
}
@Override
public void onFailure(final Throwable failure) {
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
readyCohort = null;
}
});
}
readyCohort = null;
}
});
}
- private void successfulDirectCanCommit(final RequestEnvelope envelope) {
+ private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
- successfulDirectPreCommit(envelope);
+ successfulDirectPreCommit(envelope, startTime);
}
@Override
public void onFailure(final Throwable failure) {
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
readyCohort = null;
}
});
}
readyCohort = null;
}
});
}
- private void successfulDirectPreCommit(final RequestEnvelope envelope) {
+ private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
+ successfulCommit(envelope, startTime);
}
@Override
public void onFailure(final Throwable failure) {
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
readyCohort = null;
}
});
}
readyCohort = null;
}
});
}
- private void successfulCommit(final RequestEnvelope envelope) {
- recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+ private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+ recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
envelope.getMessage().getSequence()));
readyCohort = null;
}
private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
envelope.getMessage().getSequence()));
readyCohort = null;
}
private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
if (sealedModification.equals(request.getModification())) {
readyCohort = history.createReadyCohort(id, sealedModification);
if (request.isCoordinated()) {
if (sealedModification.equals(request.getModification())) {
readyCohort = history.createReadyCohort(id, sealedModification);
if (request.isCoordinated()) {
- coordinatedCommit(envelope);
+ coordinatedCommit(envelope, now);
- directCommit(envelope);
+ directCommit(envelope, now);
}
} else {
throw new UnsupportedRequestException(request);
}
} else {
throw new UnsupportedRequestException(request);
- private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
+ private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+ throws RequestException {
final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
}
final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
}
}
private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
}
private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
final DataTreeModification modification = openTransaction.getSnapshot();
for (TransactionModification m : request.getModifications()) {
final DataTreeModification modification = openTransaction.getSnapshot();
for (TransactionModification m : request.getModifications()) {
case SIMPLE:
readyCohort = openTransaction.ready();
openTransaction = null;
case SIMPLE:
readyCohort = openTransaction.ready();
openTransaction = null;
- directCommit(envelope);
+ directCommit(envelope, now);
return null;
case THREE_PHASE:
readyCohort = openTransaction.ready();
openTransaction = null;
return null;
case THREE_PHASE:
readyCohort = openTransaction.ready();
openTransaction = null;
- coordinatedCommit(envelope);
+ coordinatedCommit(envelope, now);
return null;
default:
throw new UnsupportedRequestException(request);
return null;
default:
throw new UnsupportedRequestException(request);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.clientId = Preconditions.checkNotNull(clientId);
this.tree = Preconditions.checkNotNull(tree);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.clientId = Preconditions.checkNotNull(clientId);
this.tree = Preconditions.checkNotNull(tree);
- standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+ standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
}
@Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
}
@Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
checkRequestSequence(envelope);
try {
if (request instanceof CreateLocalHistoryRequest) {
return handleCreateHistory((CreateLocalHistoryRequest) request);
} else if (request instanceof DestroyLocalHistoryRequest) {
checkRequestSequence(envelope);
try {
if (request instanceof CreateLocalHistoryRequest) {
return handleCreateHistory((CreateLocalHistoryRequest) request);
} else if (request instanceof DestroyLocalHistoryRequest) {
- return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+ return handleDestroyHistory((DestroyLocalHistoryRequest) request, now);
} else if (request instanceof PurgeLocalHistoryRequest) {
} else if (request instanceof PurgeLocalHistoryRequest) {
- return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+ return handlePurgeHistory((PurgeLocalHistoryRequest)request, now);
} else {
throw new UnsupportedRequestException(request);
}
} else {
throw new UnsupportedRequestException(request);
}
lastSeenHistory = id.getHistoryId();
}
lastSeenHistory = id.getHistoryId();
}
- localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
+ localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id)));
LOG.debug("{}: created history {}", persistenceId, id);
return new LocalHistorySuccess(id, request.getSequence());
}
LOG.debug("{}: created history {}", persistenceId, id);
return new LocalHistorySuccess(id, request.getSequence());
}
- private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+ private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now)
+ throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.get(id);
if (existing == null) {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.get(id);
if (existing == null) {
return new LocalHistorySuccess(id, request.getSequence());
}
return new LocalHistorySuccess(id, request.getSequence());
}
- return existing.destroy(request.getSequence());
+ return existing.destroy(request.getSequence(), now);
- private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+ private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now)
+ throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.remove(id);
if (existing != null) {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.remove(id);
if (existing != null) {
if (!existing.isDestroyed()) {
LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
if (!existing.isDestroyed()) {
LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
- existing.destroy(request.getSequence());
+ existing.destroy(request.getSequence(), now);
}
// FIXME: record a PURGE tombstone in the journal
}
// FIXME: record a PURGE tombstone in the journal
}
@Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
}
@Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
checkRequestSequence(envelope);
try {
checkRequestSequence(envelope);
try {
history = standaloneHistory;
}
history = standaloneHistory;
}
- return history.handleTransactionRequest(request, envelope);
+ return history.handleTransactionRequest(request, envelope, now);
} finally {
expectNextRequest();
}
} finally {
expectNextRequest();
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
private Long lastSeenTransaction;
private State state = State.OPEN;
private Long lastSeenTransaction;
private State state = State.OPEN;
- LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) {
- super(persistenceId);
+ LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) {
+ super(persistenceId, ticker);
this.chain = Preconditions.checkNotNull(chain);
}
this.chain = Preconditions.checkNotNull(chain);
}
return chain.createReadyCohort(id, mod);
}
return chain.createReadyCohort(id, mod);
}
- LocalHistorySuccess destroy(final long sequence) throws RequestException {
+ LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException {
if (state != State.CLOSED) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
if (state != State.CLOSED) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
}
if (message instanceof RequestEnvelope) {
}
if (message instanceof RequestEnvelope) {
+ final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
final RequestEnvelope envelope = (RequestEnvelope)message;
- final RequestSuccess<?, ?> success = handleRequest(envelope);
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, ticker().read() - now);
}
} catch (RequestException e) {
LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
}
} catch (RequestException e) {
LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e);
+ envelope.sendFailure(e, ticker().read() - now);
} catch (Exception e) {
LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
} catch (Exception e) {
LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
}
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
}
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || !isLeaderActive()) {
LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || !isLeaderActive()) {
LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
if (request instanceof TransactionRequest) {
final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
if (request instanceof TransactionRequest) {
final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
- return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
} else if (request instanceof LocalHistoryRequest) {
final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
} else if (request instanceof LocalHistoryRequest) {
final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
- return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
} else {
LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
new DefaultShardDataChangeListenerPublisher(), "");
}
new DefaultShardDataChangeListenerPublisher(), "");
}
+ final String logContext() {
+ final Ticker ticker() {
+ return shard.ticker();
+ }
+
public TipProducingDataTree getDataTree() {
return dataTree;
}
public TipProducingDataTree getDataTree() {
return dataTree;
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
private final LocalHistoryIdentifier identifier;
private final ShardDataTree tree;
private final LocalHistoryIdentifier identifier;
private final ShardDataTree tree;
- StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
- super(persistenceId);
+ StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId,
+ final ShardDataTree tree) {
+ super(persistenceId, ticker);
this.identifier = new LocalHistoryIdentifier(clientId, 0);
this.tree = Preconditions.checkNotNull(tree);
}
this.identifier = new LocalHistoryIdentifier(clientId, 0);
this.tree = Preconditions.checkNotNull(tree);
}