import akka.actor.ActorSelection;
import akka.actor.Cancellable;
import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.persistence.RecoveryFailure;
import akka.serialization.Serialization;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
+import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
// The state of this Shard
private final InMemoryDOMDataStore store;
- private final LoggingAdapter LOG = Logging.getLogger(getContext().system(), this);
-
/// The name of this shard
private final ShardIdentifier name;
private final List<DelayedListenerRegistration> delayedListenerRegistrations =
Lists.newArrayList();
- private final DatastoreContext datastoreContext;
+ private DatastoreContext datastoreContext;
- private final DataPersistenceProvider dataPersistenceProvider;
+ private DataPersistenceProvider dataPersistenceProvider;
private SchemaContext schemaContext;
private final ShardCommitCoordinator commitCoordinator;
- private final long transactionCommitTimeout;
+ private long transactionCommitTimeout;
private Cancellable txCommitTimeoutCheckSchedule;
private final Optional<ActorRef> roleChangeNotifier;
+ private final MessageTracker appendEntriesReplyTracker;
+
/**
* Coordinates persistence recovery on startup.
*/
private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final String txnDispatcherPath;
+
protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
super(name.toString(), mapPeerAddresses(peerAddresses),
this.name = name;
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
- this.dataPersistenceProvider = (datastoreContext.isPersistent()) ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+ this.dataPersistenceProvider = (datastoreContext.isPersistent())
+ ? new PersistentDataProvider() : new NonPersistentRaftDataProvider();
+ this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
+ .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
+
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
- transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
- datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
+ setTransactionCommitTimeout();
// create a notifier actor for each cluster member
roleChangeNotifier = createRoleChangeNotifier(name.toString());
+
+ appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
+ getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
+ }
+
+ private void setTransactionCommitTimeout() {
+ transactionCommitTimeout = TimeUnit.MILLISECONDS.convert(
+ datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
}
private static Map<String, String> mapPeerAddresses(
@Override
public void postStop() {
+ LOG.info("Stopping Shard {}", persistenceId());
+
super.postStop();
if(txCommitTimeoutCheckSchedule != null) {
txCommitTimeoutCheckSchedule.cancel();
}
+
+ shardMBean.unregisterMBean();
}
@Override
}
if (message instanceof RecoveryFailure){
- LOG.error(((RecoveryFailure) message).cause(), "{}: Recovery failed because of this cause",
- persistenceId());
+ LOG.error("{}: Recovery failed because of this cause",
+ persistenceId(), ((RecoveryFailure) message).cause());
// Even though recovery failed, we still need to finish our recovery, eg send the
// ActorInitialized message and start the txCommitTimeoutCheckSchedule.
onRecoveryComplete();
} else {
super.onReceiveRecover(message);
+ if(LOG.isTraceEnabled()) {
+ appendEntriesReplyTracker.begin();
+ }
}
}
@Override
public void onReceiveCommand(final Object message) throws Exception {
- if(LOG.isDebugEnabled()) {
- LOG.debug("{}: onReceiveCommand: Received message {} from {}", persistenceId(), message, getSender());
- }
-
- if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
- handleCreateTransaction(message);
- } else if(message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction)message);
- } else if(message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
- handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
- handleCommitTransaction(CommitTransaction.fromSerializable(message));
- } else if(message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
- handleAbortTransaction(AbortTransaction.fromSerializable(message));
- } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)){
- closeTransactionChain(CloseTransactionChain.fromSerializable(message));
- } else if (message instanceof RegisterChangeListener) {
- registerChangeListener((RegisterChangeListener) message);
- } else if (message instanceof UpdateSchemaContext) {
- updateSchemaContext((UpdateSchemaContext) message);
- } else if (message instanceof PeerAddressResolved) {
- PeerAddressResolved resolved = (PeerAddressResolved) message;
- setPeerAddress(resolved.getPeerId().toString(),
- resolved.getPeerAddress());
- } else if(message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
- handleTransactionCommitTimeoutCheck();
- } else {
- super.onReceiveCommand(message);
+
+ MessageTracker.Context context = appendEntriesReplyTracker.received(message);
+
+ if(context.error().isPresent()){
+ LOG.trace("{} : AppendEntriesReply failed to arrive at the expected interval {}", persistenceId(),
+ context.error());
+ }
+
+ try {
+ if (message.getClass().equals(CreateTransaction.SERIALIZABLE_CLASS)) {
+ handleCreateTransaction(message);
+ } else if (message instanceof ForwardedReadyTransaction) {
+ handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ } else if (message.getClass().equals(CanCommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CommitTransaction.SERIALIZABLE_CLASS)) {
+ handleCommitTransaction(CommitTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(AbortTransaction.SERIALIZABLE_CLASS)) {
+ handleAbortTransaction(AbortTransaction.fromSerializable(message));
+ } else if (message.getClass().equals(CloseTransactionChain.SERIALIZABLE_CLASS)) {
+ closeTransactionChain(CloseTransactionChain.fromSerializable(message));
+ } else if (message instanceof RegisterChangeListener) {
+ registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof UpdateSchemaContext) {
+ updateSchemaContext((UpdateSchemaContext) message);
+ } else if (message instanceof PeerAddressResolved) {
+ PeerAddressResolved resolved = (PeerAddressResolved) message;
+ setPeerAddress(resolved.getPeerId().toString(),
+ resolved.getPeerAddress());
+ } else if (message.equals(TX_COMMIT_TIMEOUT_CHECK_MESSAGE)) {
+ handleTransactionCommitTimeoutCheck();
+ } else if(message instanceof DatastoreContext) {
+ onDatastoreContext((DatastoreContext)message);
+ } else {
+ super.onReceiveCommand(message);
+ }
+ } finally {
+ context.done();
}
}
return roleChangeNotifier;
}
+ private void onDatastoreContext(DatastoreContext context) {
+ datastoreContext = context;
+
+ commitCoordinator.setQueueCapacity(datastoreContext.getShardTransactionCommitQueueCapacity());
+
+ setTransactionCommitTimeout();
+
+ if(datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof NonPersistentRaftDataProvider) {
+ dataPersistenceProvider = new PersistentDataProvider();
+ } else if(!datastoreContext.isPersistent() &&
+ dataPersistenceProvider instanceof PersistentDataProvider) {
+ dataPersistenceProvider = new NonPersistentRaftDataProvider();
+ }
+
+ updateConfigParams(datastoreContext.getShardRaftConfig());
+ }
+
private void handleTransactionCommitTimeoutCheck() {
CohortEntry cohortEntry = commitCoordinator.getCurrentCohortEntry();
if(cohortEntry != null) {
long elapsed = System.currentTimeMillis() - cohortEntry.getLastAccessTime();
if(elapsed > transactionCommitTimeout) {
- LOG.warning("{}: Current transaction {} has timed out after {} ms - aborting",
+ LOG.warn("{}: Current transaction {} has timed out after {} ms - aborting",
persistenceId(), cohortEntry.getTransactionID(), transactionCommitTimeout);
doAbortTransaction(cohortEntry.getTransactionID(), null);
new ModificationPayload(cohortEntry.getModification()));
}
} catch (Exception e) {
- LOG.error(e, "{} An exception occurred while preCommitting transaction {}",
- persistenceId(), cohortEntry.getTransactionID());
+ LOG.error("{} An exception occurred while preCommitting transaction {}",
+ persistenceId(), cohortEntry.getTransactionID(), e);
shardMBean.incrementFailedTransactionsCount();
getSender().tell(new akka.actor.Status.Failure(e), getSelf());
}
} catch (Exception e) {
sender.tell(new akka.actor.Status.Failure(e), getSelf());
- LOG.error(e, "{}, An exception occurred while committing transaction {}", persistenceId(), transactionID);
+ LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+ transactionID, e);
shardMBean.incrementFailedTransactionsCount();
} finally {
commitCoordinator.currentTransactionComplete(transactionID, true);
@Override
public void onFailure(final Throwable t) {
- LOG.error(t, "{}: An exception happened during abort", persistenceId());
+ LOG.error("{}: An exception happened during abort", persistenceId(), t);
if(sender != null) {
sender.tell(new akka.actor.Status.Failure(t), self);
shardMBean.incrementReadOnlyTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newReadOnlyTransaction(), getSelf(),
- schemaContext,datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
+ return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
shardMBean.incrementReadWriteTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newReadWriteTransaction(), getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
-
+ return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
} else if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
shardMBean.incrementWriteOnlyTransactionCount();
- return getContext().actorOf(
- ShardTransaction.props(factory.newWriteOnlyTransaction(), getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion),
- transactionId.toString());
+ return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
} else {
throw new IllegalArgumentException(
"Shard="+name + ":CreateTransaction message has unidentified transaction type="
}
}
+ private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
+ short clientVersion){
+ return getContext().actorOf(
+ ShardTransaction.props(transaction, getSelf(),
+ schemaContext, datastoreContext, shardMBean,
+ transactionId.getRemoteTransactionId(), clientVersion)
+ .withDispatcher(txnDispatcherPath),
+ transactionId.toString());
+
+ }
+
private void createTransaction(CreateTransaction createTransaction) {
try {
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "{}: Failed to commit", persistenceId());
+ LOG.error("{}: Failed to commit", persistenceId(), e);
}
}
try {
currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+ LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
} else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
shardMBean.incrementCommittedTransactionCount();
} catch (InterruptedException | ExecutionException e) {
shardMBean.incrementFailedTransactionsCount();
- LOG.error(e, "{}: Failed to commit", persistenceId());
+ LOG.error("{}: Failed to commit", persistenceId(), e);
}
}
}
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
- LOG.error(e, "{}: Error extracting ModificationPayload", persistenceId());
+ LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
}
else if (data instanceof CompositeModificationPayload) {
transaction.write(DATASTORE_ROOT, node);
syncCommitTransaction(transaction);
} catch (InterruptedException | ExecutionException e) {
- LOG.error(e, "{}: An exception occurred when applying snapshot", persistenceId());
+ LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
} finally {
LOG.info("{}: Done applying snapshot", persistenceId());
}