import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
import org.opendaylight.controller.cluster.access.concepts.FrontendIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
import org.opendaylight.controller.cluster.access.concepts.Request;
import org.opendaylight.controller.cluster.access.concepts.RequestEnvelope;
import org.opendaylight.controller.cluster.access.concepts.RequestException;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.GetShardDataTree;
+import org.opendaylight.controller.cluster.datastore.messages.OnDemandShardState;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
import org.opendaylight.controller.cluster.datastore.messages.ReadyLocalTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.persisted.DatastoreSnapshot.ShardSnapshot;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.RaftState;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
+import org.opendaylight.controller.cluster.raft.client.messages.OnDemandRaftState;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
@SuppressWarnings("checkstyle:IllegalCatch")
@Override
protected void handleNonRaftCommand(final Object message) {
- try (final MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
+ try (MessageTracker.Context context = appendEntriesReplyTracker.received(message)) {
final Optional<Error> maybeError = context.error();
if (maybeError.isPresent()) {
LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
updateConfigParams(datastoreContext.getShardRaftConfig());
}
- boolean canSkipPayload() {
- // If we do not have any followers and we are not using persistence we can apply modification to the state
- // immediately
- return !hasFollowers() && !persistence().isRecoveryApplicable();
- }
-
// applyState() will be invoked once consensus is reached on the payload
- void persistPayload(final TransactionIdentifier transactionId, final Payload payload) {
- // We are faking the sender
- persistData(self(), transactionId, payload);
+ void persistPayload(final Identifier id, final Payload payload, final boolean batchHint) {
+ boolean canSkipPayload = !hasFollowers() && !persistence().isRecoveryApplicable();
+ if (canSkipPayload) {
+ applyState(self(), id, payload);
+ } else {
+ // We are faking the sender
+ persistData(self(), id, payload, batchHint);
+ }
}
private void handleCommitTransaction(final CommitTransaction commit) {
doAbortTransaction(abort.getTransactionId(), getSender());
}
- void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+ void doAbortTransaction(final Identifier transactionID, final ActorRef sender) {
commitCoordinator.handleAbort(transactionID, sender, this);
}
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- store.closeTransactionChain(closeTransactionChain.getIdentifier());
+ final LocalHistoryIdentifier id = closeTransactionChain.getIdentifier();
+ store.closeTransactionChain(id, () -> store.purgeTransactionChain(id, null));
}
@SuppressWarnings("checkstyle:IllegalCatch")
store.setRunOnPendingTransactionsComplete(operation);
}
+ @Override
+ protected OnDemandRaftState.AbstractBuilder<?> newOnDemandRaftStateBuilder() {
+ return OnDemandShardState.newBuilder().treeChangeListenerActors(treeChangeSupport.getListenerActors())
+ .dataChangeListenerActors(changeSupport.getListenerActors())
+ .commitCohortActors(store.getCohortActors());
+ }
+
@Override
public String persistenceId() {
return this.name;