import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
private final List<DelayedListenerRegistration> delayedListenerRegistrations =
Lists.newArrayList();
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
- private final DataPersistenceProvider dataPersistenceProvider;
+ private DataPersistenceProvider dataPersistenceProvider;
private SchemaContext schemaContext;
private final ShardCommitCoordinator commitCoordinator;
- private final long transactionCommitTimeout;
+ private long transactionCommitTimeout;
private Cancellable txCommitTimeoutCheckSchedule;
private final Optional<ActorRef> roleChangeNotifier;
+ private final MessageTracker appendEntriesReplyTracker;
+
/**
* Coordinates persistence recovery on startup.
*/
private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final String txnDispatcherPath;
+
protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
super(name.toString(), mapPeerAddresses(peerAddresses),
this.name = name;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+ this.dataPersistenceProvider = (datastoreContext.isPersistent())
+ ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+ this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
+ .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
- transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ setTransactionCommitTimeout();
// create a notifier actor for each cluster member
roleChangeNotifier = createRoleChangeNotifier(name.toString());
+
+ appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
+ getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+ }
+
+ private void setTransactionCommitTimeout() {
+ transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
}
private static Map<String, String> mapPeerAddresses(
@Override
public void postStop() {
+ LOG.info("Stopping Shard {}", persistenceId());
+
super.postStop();
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
+
+ shardMBean.unregisterMBean();
}
@Override
onRecoveryComplete();
} else {
super.onReceiveRecover(message);
+ if(LOG.isTraceEnabled()) {
+ appendEntriesReplyTracker.begin();
+ }
}
}
@Override
public void onReceiveCommand(final Object message) throws Exception {
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- handleCreateTransaction(message);
- } else if(message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
- } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
- handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
- handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
- handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
- closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- } else if (message instanceof PeerAddressResolved) {
- PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId().toString(),
- resolved.getPeerAddress());
- } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
- handleTransactionCommitTimeoutCheck();
- } else {
- super.onReceiveCommand(message);
+
+ MessageTracker.Context context = appendEntriesReplyTracker.received(message);
+
+ if(context.error().isPresent()){
+ LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+ context.error());
+ }
+
+ try {
+ if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ handleCreateTransaction(message);
+ } else if (message instanceof ForwardedReadyTransaction) {
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof PeerAddressResolved) {
+ PeerAddressResolved resolved = (PeerAddressResolved) message;
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
+ } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ handleTransactionCommitTimeoutCheck();
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
+ } else {
+ super.onReceiveCommand(message);
+ }
+ } finally {
+ context.done();
}
}
return roleChangeNotifier;
}
+ private void onDatastoreContext(DatastoreContext context) {
+ datastoreContext = context;
+
+ commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+ setTransactionCommitTimeout();
+
+ if(datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+ dataPersistenceProvider = new PersistentDataProvider();
+ } else if(!datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof PersistentDataProvider) {
+ dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ }
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
shardMBean.incrementReadOnlyTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
- schemaContext,datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
+ return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
shardMBean.incrementReadWriteTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
-
+ return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
shardMBean.incrementWriteOnlyTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
+ return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
}
}
+ private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
+ short clientVersion){
+ return getContext().actorOf(
+ ShardTransaction.props(transaction, getSelf(),
+ schemaContext, datastoreContext, shardMBean,
+ transactionId.getRemoteTransactionId(), clientVersion)
+ .withDispatcher(txnDispatcherPath),
+ transactionId.toString());
+
+ }
+
private void createTransaction(CreateTransaction createTransaction) {
try {
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),