import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
+import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
-import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionFactory;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private final InMemoryDOMDataStore store;
/// The name of this shard
- private final ShardIdentifier name;
+ private final String name;
private final ShardStats shardMBean;
private ShardRecoveryCoordinator recoveryCoordinator;
private List<Object> currentLogRecoveryBatch;
- private final Map<String, DOMStoreTransactionChain> transactionChains = new HashMap<>();
+ private final DOMTransactionFactory transactionFactory;
private final String txnDispatcherPath;
- protected Shard(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
- super(name.toString(), mapPeerAddresses(peerAddresses),
- Optional.of(datastoreContext.getShardRaftConfig()));
+ super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
- this.name = name;
+ this.name = name.toString();
this.datastoreContext = datastoreContext;
this.schemaContext = schemaContext;
this.dataPersistenceProvider = (datastoreContext.isPersistent())
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
+ shardMBean.setShardActor(getSelf());
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
}
- commitCoordinator = new ShardCommitCoordinator(TimeUnit.SECONDS.convert(1, TimeUnit.MINUTES),
- datastoreContext.getShardTransactionCommitQueueCapacity(), LOG, name.toString());
+ transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+
+ commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+ TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
+ datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
setTransactionCommitTimeout();
datastoreContext.getShardTransactionCommitTimeoutInSeconds(), TimeUnit.SECONDS);
}
- private static Map<String, String> mapPeerAddresses(
- final Map<ShardIdentifier, String> peerAddresses) {
- Map<String, String> map = new HashMap<>();
-
- for (Map.Entry<ShardIdentifier, String> entry : peerAddresses
- .entrySet()) {
- map.put(entry.getKey().toString(), entry.getValue());
- }
-
- return map;
- }
-
public static Props props(final ShardIdentifier name,
- final Map<ShardIdentifier, String> peerAddresses,
+ final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
Preconditions.checkNotNull(name, "name should not be null");
Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
try {
if (CreateTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCreateTransaction(message);
+ } else if (BatchedModifications.class.isInstance(message)) {
+ handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
// currently uses a same thread executor anyway.
cohortEntry.getCohort().preCommit().get();
- // If we do not have any followers and we are not using persistence we can
- // apply modification to the state immediately
- if(!hasFollowers() && !persistence().isRecoveryApplicable()){
+ // If we do not have any followers and we are not using persistence
+ // or if cohortEntry has no modifications
+ // we can apply modification to the state immediately
+ if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
} else {
Shard.this.persistData(getSender(), transactionID,
commitCoordinator.handleCanCommit(canCommit, getSender(), self());
}
+ private void handleBatchedModifications(BatchedModifications batched) {
+ // This message is sent to prepare the modificationsa transaction directly on the Shard as an
+ // optimization to avoid the extra overhead of a separate ShardTransaction actor. On the last
+ // BatchedModifications message, the caller sets the ready flag in the message indicating
+ // modifications are complete. The reply contains the cohort actor path (this actor) for the caller
+ // to initiate the 3-phase commit. This also avoids the overhead of sending an additional
+ // ReadyTransaction message.
+
+ // If we're not the leader then forward to the leader. This is a safety measure - we shouldn't
+ // normally get here if we're not the leader as the front-end (TransactionProxy) should determine
+ // the primary/leader shard. However with timing and caching on the front-end, there's a small
+ // window where it could have a stale leader during leadership transitions.
+ //
+ if(isLeader()) {
+ try {
+ BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
+ sender().tell(reply, self());
+ } catch (Exception e) {
+ LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
+ batched.getTransactionID(), e);
+ getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ }
+ } else {
+ ActorSelection leader = getLeader();
+ if(leader != null) {
+ // TODO: what if this is not the first batch and leadership changed in between batched messages?
+ // We could check if the commitCoordinator already has a cached entry and forward all the previous
+ // batched modifications.
+ LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader);
+ leader.forward(batched, getContext());
+ } else {
+ // TODO: rather than throwing an immediate exception, we could schedule a timer to try again to make
+ // it more resilient in case we're in the process of electing a new leader.
+ getSender().tell(new akka.actor.Status.Failure(new NoShardLeaderException(String.format(
+ "Could not find the leader for shard %s. This typically happens" +
+ " when the system is coming up or recovering and a leader is being elected. Try again" +
+ " later.", persistenceId()))), getSelf());
+ }
+ }
+ }
+
private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
ready.getTransactionID(), ready.getTxnClientVersion());
// commitCoordinator in preparation for the subsequent three phase commit initiated by
// the front-end.
commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
- ready.getModification());
+ (MutableCompositeModification) ready.getModification());
// Return our actor path as we'll handle the three phase commit, except if the Tx client
// version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- DOMStoreTransactionChain chain =
- transactionChains.remove(closeTransactionChain.getTransactionChainId());
-
- if(chain != null) {
- chain.close();
- }
+ transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- DOMStoreTransactionFactory factory = store;
-
- if(!transactionChainId.isEmpty()) {
- factory = transactionChains.get(transactionChainId);
- if(factory == null){
- DOMStoreTransactionChain transactionChain = store.createTransactionChain();
- transactionChains.put(transactionChainId, transactionChain);
- factory = transactionChain;
- }
- }
-
- if(this.schemaContext == null) {
- throw new IllegalStateException("SchemaContext is not set");
- }
-
- if (transactionType == TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
-
- shardMBean.incrementWriteOnlyTransactionCount();
+ DOMStoreTransaction transaction = transactionFactory.newTransaction(
+ TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
+ transactionChainId);
- return createShardTransaction(factory.newWriteOnlyTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
-
- shardMBean.incrementReadWriteTransactionCount();
-
- return createShardTransaction(factory.newReadWriteTransaction(), transactionId, clientVersion);
-
- } else if (transactionType == TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
-
- shardMBean.incrementReadOnlyTransactionCount();
-
- return createShardTransaction(factory.newReadOnlyTransaction(), transactionId, clientVersion);
-
- } else {
- throw new IllegalArgumentException(
- "Shard="+name + ":CreateTransaction message has unidentified transaction type="
- + transactionType);
- }
+ return createShardTransaction(transaction, transactionId, clientVersion);
}
private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
persistenceId(), listenerRegistration.path());
- getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
+ getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf());
}
private ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
recoveryCoordinator = null;
currentLogRecoveryBatch = null;
- updateJournalStats();
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
persistenceId(), data, data.getClass().getClassLoader(),
CompositeModificationPayload.class.getClassLoader());
}
-
- updateJournalStats();
-
}
private void applyModificationToState(ActorRef clientActor, String identifier, Object modification) {
}
}
- private void updateJournalStats() {
- ReplicatedLogEntry lastLogEntry = getLastLogEntry();
-
- if (lastLogEntry != null) {
- shardMBean.setLastLogIndex(lastLogEntry.getIndex());
- shardMBean.setLastLogTerm(lastLogEntry.getTerm());
- }
-
- shardMBean.setCommitIndex(getCommitIndex());
- shardMBean.setLastApplied(getLastApplied());
- shardMBean.setInMemoryJournalDataSize(getRaftActorContext().getReplicatedLog().dataSize());
- }
-
@Override
protected void createSnapshot() {
// Create a transaction actor. We are really going to treat the transaction as a worker
delayedListenerRegistrations.clear();
}
- shardMBean.setRaftState(getRaftState().name());
- shardMBean.setCurrentTerm(getCurrentTerm());
-
// If this actor is no longer the leader close all the transaction chains
- if(!isLeader){
- for(Map.Entry<String, DOMStoreTransactionChain> entry : transactionChains.entrySet()){
- if(LOG.isDebugEnabled()) {
- LOG.debug(
- "{}: onStateChanged: Closing transaction chain {} because shard {} is no longer the leader",
- persistenceId(), entry.getKey(), getId());
- }
- entry.getValue().close();
+ if(!isLeader) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(
+ "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader",
+ persistenceId(), getId());
}
- transactionChains.clear();
+ transactionFactory.closeAllTransactionChains();
}
}
return dataPersistenceProvider;
}
- @Override protected void onLeaderChanged(final String oldLeader, final String newLeader) {
- shardMBean.setLeader(newLeader);
- }
-
@Override public String persistenceId() {
- return this.name.toString();
+ return this.name;
}
@VisibleForTesting
return dataPersistenceProvider;
}
+ @VisibleForTesting
+ ShardCommitCoordinator getCommitCoordinator() {
+ return commitCoordinator;
+ }
+
+
private static class ShardCreator implements Creator<Shard> {
private static final long serialVersionUID = 1L;
final ShardIdentifier name;
- final Map<ShardIdentifier, String> peerAddresses;
+ final Map<String, String> peerAddresses;
final DatastoreContext datastoreContext;
final SchemaContext schemaContext;
- ShardCreator(final ShardIdentifier name, final Map<ShardIdentifier, String> peerAddresses,
+ ShardCreator(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
this.name = name;
this.peerAddresses = peerAddresses;