import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.ShardCommitCoordinator.CohortEntry;
-import org.opendaylight.controller.cluster.datastore.compat.BackwardsCompatibleThreePhaseCommitCohort;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ActorInitialized;
import org.opendaylight.controller.cluster.datastore.messages.BatchedModifications;
-import org.opendaylight.controller.cluster.datastore.messages.BatchedModificationsReply;
import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved;
-import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
+import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
/**
* A Shard represents a portion of the logical data tree <br/>
* <p>
- * Our Shard uses InMemoryDataStore as it's internal representation and delegates all requests it
+ * Our Shard uses InMemoryDataTree as it's internal representation and delegates all requests it
* </p>
*/
public class Shard extends RaftActor {
static final String DEFAULT_NAME = "default";
// The state of this Shard
- private final InMemoryDOMDataStore store;
+ private final ShardDataTree store;
/// The name of this shard
private final String name;
private final MessageTracker appendEntriesReplyTracker;
- private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
- Serialization.serializedActorPath(getSelf()));
-
- private final DOMTransactionFactory domTransactionFactory;
-
private final ShardTransactionActorFactory transactionActorFactory;
private final ShardSnapshotCohort snapshotCohort;
LOG.info("Shard created : {}, persistent : {}", name, datastoreContext.isPersistent());
- store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
- datastoreContext.getDataStoreProperties());
-
- if (schemaContext != null) {
- store.onGlobalContextUpdated(schemaContext);
- }
+ store = new ShardDataTree(schemaContext);
shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString(),
datastoreContext.getDataStoreMXBeanType());
- shardMBean.setNotificationManager(store.getDataChangeListenerNotificationManager());
shardMBean.setShardActor(getSelf());
if (isMetricsCaptureEnabled()) {
getContext().become(new MeteringBehavior(this));
}
- domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
-
- commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
+ commitCoordinator = new ShardCommitCoordinator(store,
TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
- transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+ transactionActorFactory = new ShardTransactionActorFactory(store, datastoreContext,
new Dispatchers(context().system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
} else if (BatchedModifications.class.isInstance(message)) {
handleBatchedModifications((BatchedModifications)message);
} else if (message instanceof ForwardedReadyTransaction) {
- handleForwardedReadyTransaction((ForwardedReadyTransaction) message);
+ commitCoordinator.handleForwardedReadyTransaction((ForwardedReadyTransaction) message,
+ getSender(), this);
} else if (CanCommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
handleCanCommitTransaction(CanCommitTransaction.fromSerializable(message));
} else if (CommitTransaction.SERIALIZABLE_CLASS.isInstance(message)) {
return roleChangeNotifier;
}
+ @Override
+ protected LeaderStateChanged newLeaderStateChanged(String memberId, String leaderId) {
+ return new ShardLeaderStateChanged(memberId, leaderId,
+ isLeader() ? Optional.<DataTree>of(store.getDataTree()) : Optional.<DataTree>absent());
+ }
+
private void onDatastoreContext(DatastoreContext context) {
datastoreContext = context;
}
}
- private void handleCommitTransaction(final CommitTransaction commit) {
- final String transactionID = commit.getTransactionID();
+ private static boolean isEmptyCommit(final DataTreeCandidate candidate) {
+ return ModificationType.UNMODIFIED.equals(candidate.getRootNode().getModificationType());
+ }
- LOG.debug("{}: Committing transaction {}", persistenceId(), transactionID);
+ void continueCommit(final CohortEntry cohortEntry) throws Exception {
+ final DataTreeCandidate candidate = cohortEntry.getCohort().getCandidate();
- // Get the current in-progress cohort entry in the commitCoordinator if it corresponds to
- // this transaction.
- final CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry == null) {
- // We're not the current Tx - the Tx was likely expired b/c it took too long in
- // between the canCommit and commit messages.
- IllegalStateException ex = new IllegalStateException(
- String.format("%s: Cannot commit transaction %s - it is not the current transaction",
- persistenceId(), transactionID));
- LOG.error(ex.getMessage());
+ // If we do not have any followers and we are not using persistence
+ // or if cohortEntry has no modifications
+ // we can apply modification to the state immediately
+ if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
+ applyModificationToState(getSender(), cohortEntry.getTransactionID(), candidate);
+ } else {
+ Shard.this.persistData(getSender(), cohortEntry.getTransactionID(),
+ DataTreeCandidatePayload.create(candidate));
+ }
+ }
+
+ private void handleCommitTransaction(final CommitTransaction commit) {
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
shardMBean.incrementFailedTransactionsCount();
- getSender().tell(new akka.actor.Status.Failure(ex), getSelf());
- return;
}
+ }
- // We perform the preCommit phase here atomically with the commit phase. This is an
- // optimization to eliminate the overhead of an extra preCommit message. We lose front-end
- // coordination of preCommit across shards in case of failure but preCommit should not
- // normally fail since we ensure only one concurrent 3-phase commit.
+ private void finishCommit(@Nonnull final ActorRef sender, @Nonnull final String transactionID, @Nonnull final CohortEntry cohortEntry) {
+ LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
try {
// We block on the future here so we don't have to worry about possibly accessing our
// state on a different thread outside of our dispatcher. Also, the data store
// currently uses a same thread executor anyway.
- cohortEntry.getCohort().preCommit().get();
+ cohortEntry.getCohort().commit().get();
+
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
+
+ shardMBean.incrementCommittedTransactionCount();
+ shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- // If we do not have any followers and we are not using persistence
- // or if cohortEntry has no modifications
- // we can apply modification to the state immediately
- if((!hasFollowers() && !persistence().isRecoveryApplicable()) || (!cohortEntry.hasModifications())){
- applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
- } else {
- Shard.this.persistData(getSender(), transactionID,
- new ModificationPayload(cohortEntry.getModification()));
- }
} catch (Exception e) {
- LOG.error("{} An exception occurred while preCommitting transaction {}",
- persistenceId(), cohortEntry.getTransactionID(), e);
+ sender.tell(new akka.actor.Status.Failure(e), getSelf());
+
+ LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
+ transactionID, e);
shardMBean.incrementFailedTransactionsCount();
- getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+ } finally {
+ commitCoordinator.currentTransactionComplete(transactionID, true);
}
-
- cohortEntry.updateLastAccessTime();
}
private void finishCommit(@Nonnull final ActorRef sender, final @Nonnull String transactionID) {
// after the commit has been replicated to a majority of the followers.
CohortEntry cohortEntry = commitCoordinator.getCohortEntryIfCurrent(transactionID);
- if(cohortEntry == null) {
+ if (cohortEntry == null) {
// The transaction is no longer the current commit. This can happen if the transaction
// was aborted prior, most likely due to timeout in the front-end. We need to finish
// committing the transaction though since it was successfully persisted and replicated
// transaction.
cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
if(cohortEntry != null) {
- commitWithNewTransaction(cohortEntry.getModification());
+ try {
+ store.applyForeignCandidate(transactionID, cohortEntry.getCohort().getCandidate());
+ } catch (DataValidationFailedException e) {
+ shardMBean.incrementFailedTransactionsCount();
+ LOG.error("{}: Failed to re-apply transaction {}", persistenceId(), transactionID, e);
+ }
+
sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
} else {
// This really shouldn't happen - it likely means that persistence or replication
LOG.error(ex.getMessage());
sender.tell(new akka.actor.Status.Failure(ex), getSelf());
}
-
- return;
- }
-
- LOG.debug("{}: Finishing commit for transaction {}", persistenceId(), cohortEntry.getTransactionID());
-
- try {
- // We block on the future here so we don't have to worry about possibly accessing our
- // state on a different thread outside of our dispatcher. Also, the data store
- // currently uses a same thread executor anyway.
- cohortEntry.getCohort().commit().get();
-
- sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
-
- shardMBean.incrementCommittedTransactionCount();
- shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
-
- } catch (Exception e) {
- sender.tell(new akka.actor.Status.Failure(e), getSelf());
-
- LOG.error("{}, An exception occurred while committing transaction {}", persistenceId(),
- transactionID, e);
- shardMBean.incrementFailedTransactionsCount();
- } finally {
- commitCoordinator.currentTransactionComplete(transactionID, true);
+ } else {
+ finishCommit(sender, transactionID, cohortEntry);
}
}
private void handleCanCommitTransaction(final CanCommitTransaction canCommit) {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
- commitCoordinator.handleCanCommit(canCommit, getSender(), self());
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
}
private void handleBatchedModifications(BatchedModifications batched) {
//
if(isLeader()) {
try {
- boolean ready = commitCoordinator.handleTransactionModifications(batched);
- if(ready) {
- sender().tell(READY_TRANSACTION_REPLY, self());
- } else {
- sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
- }
+ commitCoordinator.handleBatchedModifications(batched, getSender(), this);
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
}
}
- private void handleForwardedReadyTransaction(ForwardedReadyTransaction ready) {
- LOG.debug("{}: Readying transaction {}, client version {}", persistenceId(),
- ready.getTransactionID(), ready.getTxnClientVersion());
-
- // This message is forwarded by the ShardTransaction on ready. We cache the cohort in the
- // commitCoordinator in preparation for the subsequent three phase commit initiated by
- // the front-end.
- commitCoordinator.transactionReady(ready.getTransactionID(), ready.getCohort(),
- (MutableCompositeModification) ready.getModification());
-
- // Return our actor path as we'll handle the three phase commit, except if the Tx client
- // version < 1 (Helium-1 version). This means the Tx was initiated by a base Helium version
- // node. In that case, the subsequent 3-phase commit messages won't contain the
- // transactionId so to maintain backwards compatibility, we create a separate cohort actor
- // to provide the compatible behavior.
- if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
- ActorRef replyActorPath = getSelf();
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
- }
-
- ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
- ready.getTxnClientVersion());
- getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
- } else {
- getSender().tell(READY_TRANSACTION_REPLY, getSelf());
- }
- }
-
private void handleAbortTransaction(final AbortTransaction abort) {
doAbortTransaction(abort.getTransactionID(), getSender());
}
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
}
private void commitWithNewTransaction(final Modification modification) {
- DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
- modification.apply(tx);
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+ modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
- } catch (InterruptedException | ExecutionException e) {
+ } catch (Exception e) {
shardMBean.incrementFailedTransactionsCount();
LOG.error("{}: Failed to commit", persistenceId(), e);
}
@VisibleForTesting
void updateSchemaContext(final SchemaContext schemaContext) {
- store.onGlobalContextUpdated(schemaContext);
+ store.updateSchemaContext(schemaContext);
}
private boolean isMetricsCaptureEnabled() {
@Override
protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
-
- if(data instanceof ModificationPayload) {
+ if (data instanceof DataTreeCandidatePayload) {
+ if (clientActor == null) {
+ // No clientActor indicates a replica coming from the leader
+ try {
+ store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+ } catch (DataValidationFailedException | IOException e) {
+ LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
+ }
+ } else {
+ // Replication consensus reached, proceed to commit
+ finishCommit(clientActor, identifier);
+ }
+ } else if (data instanceof ModificationPayload) {
try {
applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
} catch (ClassNotFoundException | IOException e) {
LOG.error("{}: Error extracting ModificationPayload", persistenceId(), e);
}
- }
- else if (data instanceof CompositeModificationPayload) {
+ } else if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
applyModificationToState(clientActor, identifier, modification);
persistenceId(), getId());
}
- domTransactionFactory.closeAllTransactionChains();
+ store.closeAllTransactionChains();
}
}
}
@VisibleForTesting
- public InMemoryDOMDataStore getDataStore() {
+ public ShardDataTree getDataStore() {
return store;
}