import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
private final MessageTracker appendEntriesReplyTracker;
+ private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
+ Serialization.serializedActorPath(getSelf()));
+
+
/**
* Coordinates persistence recovery on startup.
*/
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
+ shardMBean.setShardActor(getSelf());
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
- return Optional.<ActorRef>of(shardRoleChangeNotifier);
+ return Optional.of(shardRoleChangeNotifier);
}
@Override
}
try {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
- } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
handleTransactionCommitTimeoutCheck();
} else if(message instanceof DatastoreContext) {
onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof RegisterRoleChangeListener){
+ roleChangeNotifier.get().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus){
+ shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ context().parent().tell(message, self());
} else {
super.onReceiveCommand(message);
}
// currently uses a same thread executor anyway.
cohortEntry.getCohort().preCommit().get();
- // If we do not have any followers and we are not using persistence we can
- // apply modification to the state immediately
- if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+ // If we do not have any followers and we are not using persistence
+ // or if cohortEntry has no modifications
+ // we can apply modification to the state immediately
+ if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
} else {
Shard.this.persistData(getSender(), transactionID,
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
- }
- ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
- Serialization.serializedActorPath(replyActorPath));
- getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
+
+ } else {
+
+ getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
+ READY_TRANSACTION_REPLY, getSelf());
+ }
}
private void handleAbortTransaction(final AbortTransaction abort) {
throw new IllegalStateException("SchemaContext is not set");
}
- if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
- shardMBean.incrementReadOnlyTransactionCount();
+ shardMBean.incrementWriteOnlyTransactionCount();
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
+ return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
- } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
- shardMBean.incrementWriteOnlyTransactionCount();
+ shardMBean.incrementReadOnlyTransactionCount();
+
+ return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
String transactionChainId, short clientVersion) {
- ShardTransactionIdentifier transactionId =
- ShardTransactionIdentifier.builder()
- .remoteTransactionId(remoteTransactionId)
- .build();
+
+ ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
recoveryCoordinator = null;
currentLogRecoveryBatch = null;
- updateJournalStats();
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
-
- updateJournalStats();
-
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
}
}
- private void updateJournalStats() {
- ReplicatedLogEntry lastLogEntry = getLastLogEntry();
-
- if (lastLogEntry != null) {
- shardMBean.setLastLogIndex(lastLogEntry.getIndex());
- shardMBean.setLastLogTerm(lastLogEntry.getTerm());
- }
-
- shardMBean.setCommitIndex(getCommitIndex());
- shardMBean.setLastApplied(getLastApplied());
- shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
- }
-
@Override
protected void createSnapshot() {
// Create a transaction actor. We are really going to treat the transaction as a worker
delayedListenerRegistrations.clear();
}
- shardMBean.setRaftState(getRaftState().name());
- shardMBean.setCurrentTerm(getCurrentTerm());
-
// If this actor is no longer the leader close all the transaction chains
if(!isLeader){
for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
return dataPersistenceProvider;
}
- @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
- shardMBean.setLeader(newLeader);
- }
-
@Override public String persistenceId() {
return this.name.toString();
}
}
@VisibleForTesting
- InMemoryDOMDataStore getDataStore() {
+ public InMemoryDOMDataStore getDataStore() {
return store;
}