import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
treeChangeListenerPublisher, dataChangeListenerPublisher, name);
} else {
store = new ShardDataTree(this, builder.getSchemaContext(), builder.getTreeType(),
- treeChangeListenerPublisher, dataChangeListenerPublisher, name);
+ builder.getDatastoreContext().getStoreRoot(), treeChangeListenerPublisher,
+ dataChangeListenerPublisher, name);
}
shardMBean = ShardMBeanFactory.getShardStatsMBean(name, datastoreContext.getDataStoreMXBeanType(), this);
@SuppressWarnings("checkstyle:IllegalCatch")
@Override
protected void handleNonRaftCommand(final Object message) {
- try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
+ try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
}
if (message instanceof RequestEnvelope) {
+ final long now = ticker().read();
final RequestEnvelope envelope = (RequestEnvelope)message;
+
try {
- final RequestSuccess<?, ?> success = handleRequest(envelope);
+ final RequestSuccess<?, ?> success = handleRequest(envelope, now);
if (success != null) {
- envelope.sendSuccess(success);
+ envelope.sendSuccess(success, ticker().read() - now);
}
} catch (RequestException e) {
LOG.debug("{}: request {} failed", persistenceId(), envelope, e);
- envelope.sendFailure(e);
+ envelope.sendFailure(e, ticker().read() - now);
} catch (Exception e) {
LOG.debug("{}: request {} caused failure", persistenceId(), envelope, e);
- envelope.sendFailure(new RuntimeRequestException("Request failed to process", e));
+ envelope.sendFailure(new RuntimeRequestException("Request failed to process", e),
+ ticker().read() - now);
}
} else if (message instanceof ConnectClientRequest) {
handleConnectClient((ConnectClientRequest)message);
}
}
- private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope) throws RequestException {
+ private @Nullable RequestSuccess<?, ?> handleRequest(final RequestEnvelope envelope, final long now)
+ throws RequestException {
// We are not the leader, hence we want to fail-fast.
if (!isLeader() || !isLeaderActive()) {
LOG.debug("{}: not currently leader, rejecting request {}", persistenceId(), envelope);
if (request instanceof TransactionRequest) {
final TransactionRequest<?> txReq = (TransactionRequest<?>)request;
final ClientIdentifier clientId = txReq.getTarget().getHistoryId().getClientId();
- return getFrontend(clientId).handleTransactionRequest(txReq, envelope);
+ return getFrontend(clientId).handleTransactionRequest(txReq, envelope, now);
} else if (request instanceof LocalHistoryRequest) {
final LocalHistoryRequest<?> lhReq = (LocalHistoryRequest<?>)request;
final ClientIdentifier clientId = lhReq.getTarget().getClientId();
- return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope);
+ return getFrontend(clientId).handleLocalHistoryRequest(lhReq, envelope, now);
} else {
LOG.debug("{}: rejecting unsupported request {}", persistenceId(), request);
throw new UnsupportedRequestException(request);
updateConfigParams(datastoreContext.getShardRaftConfig());
}
- boolean canSkipPayload() {
- // If we do not have any followers and we are not using persistence we can apply modification to the state
- // immediately
- return !hasFollowers() && !persistence().isRecoveryApplicable();
- }
-
// applyState() will be invoked once consensus is reached on the payload
- void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
- // We are faking the sender
- persistData(self(), transactionId, payload);
+ void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
+ boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+ if (canSkipPayload) {
+ applyState(self(), id, payload);
+ } else {
+ // We are faking the sender
+ persistData(self(), id, payload, batchHint);
+ }
}
private void handleCommitTransaction(final CommitTransaction commit) {
doAbortTransaction(abort.getTransactionId(), getSender());
}
- void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+ void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
commitCoordinator.handleAbort(transactionID, sender, this);
}
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- store.closeTransactionChain(closeTransactionChain.getIdentifier());
+ final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+ store.closeTransactionChain(id, () -> store.purgeTransactionChain(id, null));
}
@SuppressWarnings("checkstyle:IllegalCatch")
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+ .dataChangeListenerActors(changeSupport.getListenerActors())
+ .commitCohortActors(store.getCohortActors());
+ }
+
@Override
public String persistenceId() {
return this.name;