}
};
+ static final Object RESUME_NEXT_PENDING_TRANSACTION = new Object() {
+ @Override
+ public String toString() {
+ return "resumeNextPendingTransaction";
+ }
+ };
+
// FIXME: shard names should be encapsulated in their own class and this should be exposed as a constant.
public static final String DEFAULT_NAME = "default";
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
ShardDataTreeChangeListenerPublisherActorProxy treeChangeListenerPublisher =
- new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher");
+ new ShardDataTreeChangeListenerPublisherActorProxy(getContext(), name + "-DTCL-publisher", name);
ShardDataChangeListenerPublisherActorProxy dataChangeListenerPublisher =
- new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher");
+ new ShardDataChangeListenerPublisherActorProxy(getContext(), name + "-DCL-publisher", name);
if (builder.getDataTree() != null) {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getDataTree(),
treeChangeListenerPublisher, dataChangeListenerPublisher, name, frontendMetadata);
maybeError.get());
}
+ store.resetTransactionBatch();
+
if (message instanceof RequestEnvelope) {
final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
persistPayload(txId, AbortTransactionPayload.create(txId), true);
} else if (message instanceof MakeLeaderLocal) {
onMakeLeaderLocal();
+ } else if (RESUME_NEXT_PENDING_TRANSACTION.equals(message)) {
+ store.resumeNextPendingTransaction();
} else {
super.handleNonRaftCommand(message);
}
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
- LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
+ LOG.warn("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
}
}
Ticker ticker() {
return Ticker.systemTicker();
}
+
+ void scheduleNextPendingTransaction() {
+ self().tell(RESUME_NEXT_PENDING_TRANSACTION, ActorRef.noSender());
+ }
}