import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
private final List<DelayedListenerRegistration> delayedListenerRegistrations =
Lists.newArrayList();
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
- private final DataPersistenceProvider dataPersistenceProvider;
+ private DataPersistenceProvider dataPersistenceProvider;
private SchemaContext schemaContext;
private final ShardCommitCoordinator commitCoordinator;
- private final long transactionCommitTimeout;
+ private long transactionCommitTimeout;
private Cancellable txCommitTimeoutCheckSchedule;
private final MessageTracker appendEntriesReplyTracker;
+ private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
+ Serialization.serializedActorPath(getSelf()));
+
+
/**
* Coordinates persistence recovery on startup.
*/
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
- transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ setTransactionCommitTimeout();
// create a notifier actor for each cluster member
roleChangeNotifier = createRoleChangeNotifier(name.toString());
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
}
+ private void setTransactionCommitTimeout() {
+ transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ }
+
private static Map<String, String> mapPeerAddresses(
final Map<ShardIdentifier, String> peerAddresses) {
Map<String, String> map = new HashMap<>();
private Optional<ActorRef> createRoleChangeNotifier(String shardId) {
ActorRef shardRoleChangeNotifier = this.getContext().actorOf(
RoleChangeNotifier.getProps(shardId), shardId + "-notifier");
- return Optional.<ActorRef>of(shardRoleChangeNotifier);
+ return Optional.of(shardRoleChangeNotifier);
}
@Override
public void postStop() {
+ LOG.info("Stopping Shard {}", persistenceId());
+
super.postStop();
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
+
+ shardMBean.unregisterMBean();
}
@Override
}
try {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
- } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ } else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ } else if (AbortTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) {
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
resolved.getPeerAddress());
} else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
handleTransactionCommitTimeoutCheck();
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
+ } else if(message instanceof RegisterRoleChangeListener){
+ roleChangeNotifier.get().forward(message, context());
+ } else if (message instanceof FollowerInitialSyncUpStatus){
+ shardMBean.setFollowerInitialSyncStatus(((FollowerInitialSyncUpStatus) message).isInitialSyncDone());
+ context().parent().tell(message, self());
} else {
super.onReceiveCommand(message);
}
return roleChangeNotifier;
}
+ private void onDatastoreContext(DatastoreContext context) {
+ datastoreContext = context;
+
+ commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+ setTransactionCommitTimeout();
+
+ if(datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+ dataPersistenceProvider = new PersistentDataProvider();
+ } else if(!datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof PersistentDataProvider) {
+ dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ }
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
// currently uses a same thread executor anyway.
cohortEntry.getCohort().preCommit().get();
- // If we do not have any followers and we are not using persistence we can
- // apply modification to the state immediately
- if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+ // If we do not have any followers and we are not using persistence
+ // or if cohortEntry has no modifications
+ // we can apply modification to the state immediately
+ if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
} else {
Shard.this.persistData(getSender(), transactionID,
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- ActorRef replyActorPath = self();
if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
- }
- ReadyTransactionReply readyTransactionReply = new ReadyTransactionReply(
- Serialization.serializedActorPath(replyActorPath));
- getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
+ ReadyTransactionReply readyTransactionReply =
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
+ readyTransactionReply, getSelf());
+
+ } else {
+
+ getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
+ READY_TRANSACTION_REPLY, getSelf());
+ }
}
private void handleAbortTransaction(final AbortTransaction abort) {
throw new IllegalStateException("SchemaContext is not set");
}
- if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+ if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
- shardMBean.incrementReadOnlyTransactionCount();
+ shardMBean.incrementWriteOnlyTransactionCount();
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
+ return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
- } else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+ } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
- shardMBean.incrementWriteOnlyTransactionCount();
+ shardMBean.incrementReadOnlyTransactionCount();
+
+ return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
String transactionChainId, short clientVersion) {
- ShardTransactionIdentifier transactionId =
- ShardTransactionIdentifier.builder()
- .remoteTransactionId(remoteTransactionId)
- .build();
+
+ ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
if(LOG.isDebugEnabled()) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
}
@VisibleForTesting
- InMemoryDOMDataStore getDataStore() {
+ public InMemoryDOMDataStore getDataStore() {
return store;
}