private long sessionId;
private long txSequence;
- protected AbstractEnvelopeProxy() {
+ AbstractEnvelopeProxy() {
// for Externalizable
}
}
@Override
- public final void writeExternal(final ObjectOutput out) throws IOException {
+ public void writeExternal(final ObjectOutput out) throws IOException {
WritableObjects.writeLongs(out, sessionId, txSequence);
out.writeObject(message);
}
@SuppressWarnings("unchecked")
@Override
- public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
final byte header = WritableObjects.readLongHeader(in);
sessionId = WritableObjects.readFirstLong(in, header);
txSequence = WritableObjects.readSecondLong(in, header);
message = (T) in.readObject();
}
- abstract Envelope<T> createEnvelope(T wrappedNessage, long envSessionId, long envTxSequence);
+ abstract Envelope<T> createEnvelope(T wrappedNessage, long sessionId, long txSequence);
final Object readResolve() {
return createEnvelope(message, sessionId, txSequence);
*/
package org.opendaylight.controller.cluster.access.concepts;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.yangtools.concepts.WritableObjects;
+
abstract class AbstractResponseEnvelopeProxy<T extends Response<?, ?>> extends AbstractEnvelopeProxy<T> {
private static final long serialVersionUID = 1L;
- protected AbstractResponseEnvelopeProxy() {
+ private long executionTimeNanos;
+
+ AbstractResponseEnvelopeProxy() {
// for Externalizable
}
AbstractResponseEnvelopeProxy(final ResponseEnvelope<T> envelope) {
super(envelope);
+ this.executionTimeNanos = envelope.getExecutionTimeNanos();
+ }
+
+ @Override
+ public final void writeExternal(final ObjectOutput out) throws IOException {
+ super.writeExternal(out);
+ WritableObjects.writeLong(out, executionTimeNanos);
}
@Override
- abstract ResponseEnvelope<T> createEnvelope(T message, long sequence, long retry);
+ public final void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ super.readExternal(in);
+ executionTimeNanos = WritableObjects.readLong(in);
+ }
+
+ @Override
+ final ResponseEnvelope<T> createEnvelope(final T message, final long sessionId, final long txSequence) {
+ return createEnvelope(message, sessionId, txSequence, executionTimeNanos);
+ }
+
+ abstract ResponseEnvelope<T> createEnvelope(T message, long sessionId, long txSequence, long executionTimeNanos);
}
public final class FailureEnvelope extends ResponseEnvelope<RequestFailure<?, ?>> {
private static final long serialVersionUID = 1L;
- public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
- super(message, sessionId, txSequence);
+ public FailureEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence,
+ final long executionTimeNanos) {
+ super(message, sessionId, txSequence, executionTimeNanos);
}
@Override
}
@Override
- FailureEnvelope createEnvelope(final RequestFailure<?, ?> message, final long sessionId, final long txSequence) {
- return new FailureEnvelope(message, sessionId, txSequence);
+ ResponseEnvelope<RequestFailure<?, ?>> createEnvelope(final RequestFailure<?, ?> message, final long sessionId,
+ final long txSequence, final long executionTimeNanos) {
+ return new FailureEnvelope(message, sessionId, txSequence, executionTimeNanos);
}
}
* Respond to this envelope with a {@link RequestFailure} caused by specified {@link RequestException}.
*
* @param cause Cause of this {@link RequestFailure}
+ * @param executionTimeNanos Time to execute the request, in nanoseconds
* @throws NullPointerException if cause is null
*/
- public void sendFailure(final RequestException cause) {
- sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence()));
+ public void sendFailure(final RequestException cause, final long executionTimeNanos) {
+ sendResponse(new FailureEnvelope(getMessage().toRequestFailure(cause), getSessionId(), getTxSequence(),
+ executionTimeNanos));
}
/**
* @param success Successful response
* @throws NullPointerException if success is null
*/
- public void sendSuccess(final RequestSuccess<?, ?> success) {
- sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence()));
+ public void sendSuccess(final RequestSuccess<?, ?> success, final long executionTimeNanos) {
+ sendResponse(new SuccessEnvelope(success, getSessionId(), getTxSequence(), executionTimeNanos));
}
private void sendResponse(final ResponseEnvelope<?> envelope) {
*/
package org.opendaylight.controller.cluster.access.concepts;
+import com.google.common.base.Preconditions;
+
public abstract class ResponseEnvelope<T extends Response<?, ?>> extends Envelope<T> {
private static final long serialVersionUID = 1L;
- ResponseEnvelope(final T message, final long sessionId, final long txSequence) {
+ private final long executionTimeNanos;
+
+ ResponseEnvelope(final T message, final long sessionId, final long txSequence, final long executionTimeNanos) {
super(message, sessionId, txSequence);
+ Preconditions.checkArgument(executionTimeNanos >= 0);
+ this.executionTimeNanos = executionTimeNanos;
}
+
+ /**
+ * Return the time the request took to execute in nanoseconds. This may not reflect the actual CPU time, but rather
+ * a measure of the complexity involved in servicing the original request.
+ *
+ * @return Time the request took to execute in nanoseconds
+ */
+ public final long getExecutionTimeNanos() {
+ return executionTimeNanos;
+ }
+
+ @Override
+ abstract AbstractResponseEnvelopeProxy<T> createProxy();
}
public final class SuccessEnvelope extends ResponseEnvelope<RequestSuccess<?, ?>> {
private static final long serialVersionUID = 1L;
- public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
- super(message, sessionId, txSequence);
+ public SuccessEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence,
+ final long executionTimeNanos) {
+ super(message, sessionId, txSequence, executionTimeNanos);
}
@Override
}
@Override
- SuccessEnvelope createEnvelope(final RequestSuccess<?, ?> message, final long sessionId, final long txSequence) {
- return new SuccessEnvelope(message, sessionId, txSequence);
+ ResponseEnvelope<RequestSuccess<?, ?>> createEnvelope(final RequestSuccess<?, ?> message, final long sessionId,
+ final long txSequence, final long executionTimeNanos) {
+ return new SuccessEnvelope(message, sessionId, txSequence, executionTimeNanos);
}
}
mockRequest = new MockRequest(mockIdentifier, mockReplyTo);
mockRequest2 = new MockRequest(mockIdentifier, mockReplyTo);
mockResponse = mockRequest.toRequestFailure(mockCause);
- mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0);
+ mockResponseEnvelope = new FailureEnvelope(mockResponse, 0, 0, 0);
mockCookie = ThreadLocalRandom.current().nextLong();
queue = new ConnectingClientConnection<>(mockContext, mockCookie);
@Test
public void testCookie() {
- assertSame(mockCookie, queue.cookie());
+ assertEquals(mockCookie, queue.cookie());
}
@Test
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
private final Map<TransactionIdentifier, FrontendTransaction> transactions = new HashMap<>();
private final String persistenceId;
+ private final Ticker ticker;
- AbstractFrontendHistory(final String persistenceId) {
+ AbstractFrontendHistory(final String persistenceId, final Ticker ticker) {
this.persistenceId = Preconditions.checkNotNull(persistenceId);
+ this.ticker = Preconditions.checkNotNull(ticker);
}
final String persistenceId() {
return persistenceId;
}
+ final long readTime() {
+ return ticker.read();
+ }
+
final @Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
// FIXME: handle purging of transactions
}
}
- return tx.handleRequest(request, envelope);
+ return tx.handleRequest(request, envelope, now);
}
abstract FrontendTransaction createOpenTransaction(TransactionIdentifier id) throws RequestException;
}
// Sequence has already been checked
- @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope)
- throws RequestException {
+ @Nullable TransactionSuccess<?> handleRequest(final TransactionRequest<?> request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
if (request instanceof ModifyTransactionRequest) {
- return handleModifyTransaction((ModifyTransactionRequest) request, envelope);
+ return handleModifyTransaction((ModifyTransactionRequest) request, envelope, now);
} else if (request instanceof CommitLocalTransactionRequest) {
- handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope);
+ handleCommitLocalTransaction((CommitLocalTransactionRequest) request, envelope, now);
return null;
} else if (request instanceof ExistsTransactionRequest) {
return handleExistsTransaction((ExistsTransactionRequest) request);
} else if (request instanceof ReadTransactionRequest) {
return handleReadTransaction((ReadTransactionRequest) request);
} else if (request instanceof TransactionPreCommitRequest) {
- handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope);
+ handleTransactionPreCommit((TransactionPreCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionDoCommitRequest) {
- handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope);
+ handleTransactionDoCommit((TransactionDoCommitRequest) request, envelope, now);
return null;
} else if (request instanceof TransactionAbortRequest) {
- handleTransactionAbort((TransactionAbortRequest) request, envelope);
+ handleTransactionAbort((TransactionAbortRequest) request, envelope, now);
return null;
} else {
throw new UnsupportedRequestException(request);
return success;
}
- private void recordAndSendSuccess(final RequestEnvelope envelope, final TransactionSuccess<?> success) {
+ private long executionTime(final long startTime) {
+ return history.readTime() - startTime;
+ }
+
+ private void recordAndSendSuccess(final RequestEnvelope envelope, final long startTime,
+ final TransactionSuccess<?> success) {
recordResponse(success.getSequence(), success);
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, executionTime(startTime));
}
- private void recordAndSendFailure(final RequestEnvelope envelope, final RuntimeRequestException failure) {
+ private void recordAndSendFailure(final RequestEnvelope envelope, final long startTime,
+ final RuntimeRequestException failure) {
recordResponse(envelope.getMessage().getSequence(), failure);
- envelope.sendFailure(failure);
+ envelope.sendFailure(failure, executionTime(startTime));
}
private void handleTransactionPreCommit(final TransactionPreCommitRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
- recordAndSendSuccess(envelope, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
+ recordAndSendSuccess(envelope, now, new TransactionPreCommitSuccess(readyCohort.getIdentifier(),
request.getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Precommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Precommit failed", failure));
readyCohort = null;
}
});
}
- private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope)
- throws RequestException {
+ private void handleTransactionDoCommit(final TransactionDoCommitRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
+ successfulCommit(envelope, now);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("Commit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Commit failed", failure));
readyCohort = null;
}
});
}
- private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope)
- throws RequestException {
+ private void handleTransactionAbort(final TransactionAbortRequest request, final RequestEnvelope envelope,
+ final long now) throws RequestException {
readyCohort.abort(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
readyCohort = null;
- recordAndSendSuccess(envelope, new TransactionAbortSuccess(id, request.getSequence()));
+ recordAndSendSuccess(envelope, now, new TransactionAbortSuccess(id, request.getSequence()));
LOG.debug("Transaction {} aborted", id);
}
public void onFailure(final Throwable failure) {
readyCohort = null;
LOG.warn("Transaction {} abort failed", id, failure);
- recordAndSendFailure(envelope, new RuntimeRequestException("Abort failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("Abort failed", failure));
}
});
}
- private void coordinatedCommit(final RequestEnvelope envelope) {
+ private void coordinatedCommit(final RequestEnvelope envelope, final long now) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- recordAndSendSuccess(envelope, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
+ recordAndSendSuccess(envelope, now, new TransactionCanCommitSuccess(readyCohort.getIdentifier(),
envelope.getMessage().getSequence()));
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
readyCohort = null;
}
});
}
- private void directCommit(final RequestEnvelope envelope) {
+ private void directCommit(final RequestEnvelope envelope, final long now) {
readyCohort.canCommit(new FutureCallback<Void>() {
@Override
public void onSuccess(final Void result) {
- successfulDirectCanCommit(envelope);
+ successfulDirectCanCommit(envelope, now);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("CanCommit failed", failure));
+ recordAndSendFailure(envelope, now, new RuntimeRequestException("CanCommit failed", failure));
readyCohort = null;
}
});
}
- private void successfulDirectCanCommit(final RequestEnvelope envelope) {
+ private void successfulDirectCanCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.preCommit(new FutureCallback<DataTreeCandidate>() {
@Override
public void onSuccess(final DataTreeCandidate result) {
- successfulDirectPreCommit(envelope);
+ successfulDirectPreCommit(envelope, startTime);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("PreCommit failed", failure));
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("PreCommit failed", failure));
readyCohort = null;
}
});
}
- private void successfulDirectPreCommit(final RequestEnvelope envelope) {
+ private void successfulDirectPreCommit(final RequestEnvelope envelope, final long startTime) {
readyCohort.commit(new FutureCallback<UnsignedLong>() {
@Override
public void onSuccess(final UnsignedLong result) {
- successfulCommit(envelope);
+ successfulCommit(envelope, startTime);
}
@Override
public void onFailure(final Throwable failure) {
- recordAndSendFailure(envelope, new RuntimeRequestException("DoCommit failed", failure));
+ recordAndSendFailure(envelope, startTime, new RuntimeRequestException("DoCommit failed", failure));
readyCohort = null;
}
});
}
- private void successfulCommit(final RequestEnvelope envelope) {
- recordAndSendSuccess(envelope, new TransactionCommitSuccess(readyCohort.getIdentifier(),
+ private void successfulCommit(final RequestEnvelope envelope, final long startTime) {
+ recordAndSendSuccess(envelope, startTime, new TransactionCommitSuccess(readyCohort.getIdentifier(),
envelope.getMessage().getSequence()));
readyCohort = null;
}
private void handleCommitLocalTransaction(final CommitLocalTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
if (sealedModification.equals(request.getModification())) {
readyCohort = history.createReadyCohort(id, sealedModification);
if (request.isCoordinated()) {
- coordinatedCommit(envelope);
+ coordinatedCommit(envelope, now);
} else {
- directCommit(envelope);
+ directCommit(envelope, now);
}
} else {
throw new UnsupportedRequestException(request);
data.isPresent()));
}
- private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request) throws RequestException {
+ private ReadTransactionSuccess handleReadTransaction(final ReadTransactionRequest request)
+ throws RequestException {
final Optional<NormalizedNode<?, ?>> data = openTransaction.getSnapshot().readNode(request.getPath());
return recordSuccess(request.getSequence(), new ReadTransactionSuccess(id, request.getSequence(), data));
}
}
private @Nullable TransactionSuccess<?> handleModifyTransaction(final ModifyTransactionRequest request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
final DataTreeModification modification = openTransaction.getSnapshot();
for (TransactionModification m : request.getModifications()) {
case SIMPLE:
readyCohort = openTransaction.ready();
openTransaction = null;
- directCommit(envelope);
+ directCommit(envelope, now);
return null;
case THREE_PHASE:
readyCohort = openTransaction.ready();
openTransaction = null;
- coordinatedCommit(envelope);
+ coordinatedCommit(envelope, now);
return null;
default:
throw new UnsupportedRequestException(request);
this.persistenceId = Preconditions.checkNotNull(persistenceId);
this.clientId = Preconditions.checkNotNull(clientId);
this.tree = Preconditions.checkNotNull(tree);
- standaloneHistory = new StandaloneFrontendHistory(persistenceId, clientId, tree);
+ standaloneHistory = new StandaloneFrontendHistory(persistenceId, tree.ticker(), clientId, tree);
}
@Override
}
@Nullable LocalHistorySuccess handleLocalHistoryRequest(final LocalHistoryRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
checkRequestSequence(envelope);
try {
if (request instanceof CreateLocalHistoryRequest) {
return handleCreateHistory((CreateLocalHistoryRequest) request);
} else if (request instanceof DestroyLocalHistoryRequest) {
- return handleDestroyHistory((DestroyLocalHistoryRequest) request);
+ return handleDestroyHistory((DestroyLocalHistoryRequest) request, now);
} else if (request instanceof PurgeLocalHistoryRequest) {
- return handlePurgeHistory((PurgeLocalHistoryRequest)request);
+ return handlePurgeHistory((PurgeLocalHistoryRequest)request, now);
} else {
throw new UnsupportedRequestException(request);
}
lastSeenHistory = id.getHistoryId();
}
- localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ensureTransactionChain(id)));
+ localHistories.put(id, new LocalFrontendHistory(persistenceId, tree.ticker(), tree.ensureTransactionChain(id)));
LOG.debug("{}: created history {}", persistenceId, id);
return new LocalHistorySuccess(id, request.getSequence());
}
- private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request) throws RequestException {
+ private LocalHistorySuccess handleDestroyHistory(final DestroyLocalHistoryRequest request, final long now)
+ throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.get(id);
if (existing == null) {
return new LocalHistorySuccess(id, request.getSequence());
}
- return existing.destroy(request.getSequence());
+ return existing.destroy(request.getSequence(), now);
}
- private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request) throws RequestException {
+ private LocalHistorySuccess handlePurgeHistory(final PurgeLocalHistoryRequest request, final long now)
+ throws RequestException {
final LocalHistoryIdentifier id = request.getTarget();
final LocalFrontendHistory existing = localHistories.remove(id);
if (existing != null) {
if (!existing.isDestroyed()) {
LOG.warn("{}: purging undestroyed history {}", persistenceId, id);
- existing.destroy(request.getSequence());
+ existing.destroy(request.getSequence(), now);
}
// FIXME: record a PURGE tombstone in the journal
}
@Nullable TransactionSuccess<?> handleTransactionRequest(final TransactionRequest<?> request,
- final RequestEnvelope envelope) throws RequestException {
+ final RequestEnvelope envelope, final long now) throws RequestException {
checkRequestSequence(envelope);
try {
history = standaloneHistory;
}
- return history.handleTransactionRequest(request, envelope);
+ return history.handleTransactionRequest(request, envelope, now);
} finally {
expectNextRequest();
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import org.opendaylight.controller.cluster.access.commands.DeadTransactionException;
import org.opendaylight.controller.cluster.access.commands.LocalHistorySuccess;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
private Long lastSeenTransaction;
private State state = State.OPEN;
- LocalFrontendHistory(final String persistenceId, final ShardDataTreeTransactionChain chain) {
- super(persistenceId);
+ LocalFrontendHistory(final String persistenceId, final Ticker ticker, final ShardDataTreeTransactionChain chain) {
+ super(persistenceId, ticker);
this.chain = Preconditions.checkNotNull(chain);
}
return chain.createReadyCohort(id, mod);
}
- LocalHistorySuccess destroy(final long sequence) throws RequestException {
+ LocalHistorySuccess destroy(final long sequence, final long now) throws RequestException {
if (state != State.CLOSED) {
LOG.debug("{}: closing history {}", persistenceId(), getIdentifier());
}
if (message instanceof RequestEnvelope) {
+ final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
+
try {
- final RequestSuccess<?, ?> success = handleRequest(envelope);
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
if (success != null) {
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, ticker().read() - now);
}
} catch (RequestException e) {
LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e);
+ envelope.sendFailure(e, ticker().read() - now);
} catch (Exception e) {
LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
}
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
}
}
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || !isLeaderActive()) {
LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
if (request instanceof TransactionRequest) {
final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
- return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
} else if (request instanceof LocalHistoryRequest) {
final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
- return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
+import com.google.common.base.Ticker;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
new DefaultShardDataChangeListenerPublisher(), "");
}
- String logContext() {
+ final String logContext() {
return logContext;
}
+ final Ticker ticker() {
+ return shard.ticker();
+ }
+
public TipProducingDataTree getDataTree() {
return dataTree;
}
package org.opendaylight.controller.cluster.datastore;
import com.google.common.base.Preconditions;
+import com.google.common.base.Ticker;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
private final LocalHistoryIdentifier identifier;
private final ShardDataTree tree;
- StandaloneFrontendHistory(final String persistenceId, final ClientIdentifier clientId, final ShardDataTree tree) {
- super(persistenceId);
+ StandaloneFrontendHistory(final String persistenceId, final Ticker ticker, final ClientIdentifier clientId,
+ final ShardDataTree tree) {
+ super(persistenceId, ticker);
this.identifier = new LocalHistoryIdentifier(clientId, 0);
this.tree = Preconditions.checkNotNull(tree);
}