import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
Serialization.serializedActorPath(getSelf()));
-
- /**
- * Coordinates persistence recovery on startup.
- */
- private ShardRecoveryCoordinator recoveryCoordinator;
-
private final DOMTransactionFactory transactionFactory;
private final String txnDispatcherPath;
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
-
- recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
private void setTransactionCommitTimeout() {
//
if(isLeader()) {
try {
- BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
- sender().tell(reply, self());
+ boolean ready = commitCoordinator.handleTransactionModifications(batched);
+ if(ready) {
+ sender().tell(READY_TRANSACTION_REPLY, self());
+ } else {
+ sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+ }
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
+ if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+ ActorRef replyActorPath = getSelf();
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+ ready.getTxnClientVersion());
getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
-
+ readyTransactionReply, getSelf());
} else {
-
- getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
- READY_TRANSACTION_REPLY, getSelf());
+ getSender().tell(READY_TRANSACTION_REPLY, getSelf());
}
}
}
@Override
- protected
- void startLogRecoveryBatch(final int maxBatchSize) {
- recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
- }
-
- @Override
- protected void appendRecoveredLogEntry(final Payload data) {
- recoveryCoordinator.appendRecoveredLogPayload(data);
- }
-
- @Override
- protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
- }
-
- @Override
- protected void applyCurrentLogRecoveryBatch() {
- recoveryCoordinator.applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
- recoveryCoordinator = null;
-
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());