treeChangeListenerPublisher, dataChangeListenerPublisher, name);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
- treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+ builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
+ dataChangeListenerPublisher, name);
}
shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
}
if (message instanceof RequestEnvelope) {
+ final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
+
try {
- final RequestSuccess<?, ?> success = handleRequest(envelope);
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
if (success != null) {
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, ticker().read() - now);
}
} catch (RequestException e) {
LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e);
+ envelope.sendFailure(e, ticker().read() - now);
} catch (Exception e) {
LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
}
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
}
}
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || !isLeaderActive()) {
LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
if (request instanceof TransactionRequest) {
final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
- return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
} else if (request instanceof LocalHistoryRequest) {
final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
- return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);