import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
private final MessageTracker appendEntriesReplyTracker;
+ private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
+ Serialization.serializedActorPath(getSelf()));
+
+
/**
* Coordinates persistence recovery on startup.
*/
}
try {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
- } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
onDatastoreContext((DatastoreContext)message);
} else if(message instanceof RegisterRoleChangeListener){
roleChangeNotifier.get().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus){
+ shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ context().parent().tell(message, self());
} else {
super.onReceiveCommand(message);
}
// currently uses a same thread executor anyway.
cohortEntry.getCohort().preCommit().get();
- // If we do not have any followers and we are not using persistence we can
- // apply modification to the state immediately
- if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+ // If we do not have any followers and we are not using persistence
+ // or if cohortEntry has no modifications
+ // we can apply modification to the state immediately
+ if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
} else {
Shard.this.persistData(getSender(), transactionID,
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
- }
- ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
- Serialization.serializedActorPath(replyActorPath));
- getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
+
+ } else {
+
+ getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
+ READY_TRANSACTION_REPLY, getSelf());
+ }
}
private void handleAbortTransaction(final AbortTransaction abort) {
throw new IllegalStateException("SchemaContext is not set");
}
- if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
- shardMBean.incrementReadOnlyTransactionCount();
+ shardMBean.incrementWriteOnlyTransactionCount();
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
+ return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
- } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
- shardMBean.incrementWriteOnlyTransactionCount();
+ shardMBean.incrementReadOnlyTransactionCount();
+
+ return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
String transactionChainId, short clientVersion) {
- ShardTransactionIdentifier transactionId =
- ShardTransactionIdentifier.builder()
- .remoteTransactionId(remoteTransactionId)
- .build();
+
+ ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
}
@VisibleForTesting
- InMemoryDOMDataStore getDataStore() {
+ public InMemoryDOMDataStore getDataStore() {
return store;
}