import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionReply;
-import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.MessageTracker;
-import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
import org.opendaylight.controller.cluster.raft.RaftActor;
+import org.opendaylight.controller.cluster.raft.RaftActorRecoveryCohort;
+import org.opendaylight.controller.cluster.raft.RaftActorSnapshotCohort;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.model.api.SchemaContext;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
*/
public class Shard extends RaftActor {
- private static final YangInstanceIdentifier DATASTORE_ROOT = YangInstanceIdentifier.builder().build();
-
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
@VisibleForTesting
private DatastoreContext datastoreContext;
- private SchemaContext schemaContext;
-
- private int createSnapshotTransactionCounter;
-
private final ShardCommitCoordinator commitCoordinator;
private long transactionCommitTimeout;
private final ReadyTransactionReply READY_TRANSACTION_REPLY = new ReadyTransactionReply(
Serialization.serializedActorPath(getSelf()));
+ private final DOMTransactionFactory domTransactionFactory;
- /**
- * Coordinates persistence recovery on startup.
- */
- private ShardRecoveryCoordinator recoveryCoordinator;
+ private final ShardTransactionActorFactory transactionActorFactory;
- private final DOMTransactionFactory transactionFactory;
-
- private final String txnDispatcherPath;
+ private final ShardSnapshotCohort snapshotCohort;
private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this);
this.name = name.toString();
this.datastoreContext = datastoreContext;
- this.schemaContext = schemaContext;
- this.txnDispatcherPath = new Dispatchers(context().system().dispatchers())
- .getDispatcherPath(Dispatchers.DispatcherType.Transaction);
setPersistence(datastoreContext.isPersistent());
getContext().become(new MeteringBehavior(this));
}
- transactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
+ domTransactionFactory = new DOMTransactionFactory(store, shardMBean, LOG, this.name);
- commitCoordinator = new ShardCommitCoordinator(transactionFactory,
+ commitCoordinator = new ShardCommitCoordinator(domTransactionFactory,
TimeUnit.SECONDS.convert(5, TimeUnit.MINUTES),
datastoreContext.getShardTransactionCommitQueueCapacity(), self(), LOG, this.name);
appendEntriesReplyTracker = new MessageTracker(AppendEntriesReply.class,
getRaftActorContext().getConfigParams().getIsolatedCheckIntervalInMillis());
- recoveryCoordinator = new ShardRecoveryCoordinator(store, persistenceId(), LOG);
+ transactionActorFactory = new ShardTransactionActorFactory(domTransactionFactory, datastoreContext,
+ new Dispatchers(context().system().dispatchers()).getDispatcherPath(
+ Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
+
+ snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
}
private void setTransactionCommitTimeout() {
//
if(isLeader()) {
try {
- BatchedModificationsReply reply = commitCoordinator.handleTransactionModifications(batched);
- sender().tell(reply, self());
+ boolean ready = commitCoordinator.handleTransactionModifications(batched);
+ if(ready) {
+ sender().tell(READY_TRANSACTION_REPLY, self());
+ } else {
+ sender().tell(new BatchedModificationsReply(batched.getModifications().size()), self());
+ }
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
// node. In that case, the subsequent 3-phase commit messages won't contain the
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
- if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
- LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
- ActorRef replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
- ready.getTransactionID()));
+ if(ready.getTxnClientVersion() < DataStoreVersions.LITHIUM_VERSION) {
+ ActorRef replyActorPath = getSelf();
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
+ LOG.debug("{}: Creating BackwardsCompatibleThreePhaseCommitCohort", persistenceId());
+ replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
+ ready.getTransactionID()));
+ }
ReadyTransactionReply readyTransactionReply =
- new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath));
+ new ReadyTransactionReply(Serialization.serializedActorPath(replyActorPath),
+ ready.getTxnClientVersion());
getSender().tell(ready.isReturnSerialized() ? readyTransactionReply.toSerializable() :
- readyTransactionReply, getSelf());
-
+ readyTransactionReply, getSelf());
} else {
-
- getSender().tell(ready.isReturnSerialized() ? READY_TRANSACTION_REPLY.toSerializable() :
- READY_TRANSACTION_REPLY, getSelf());
+ getSender().tell(READY_TRANSACTION_REPLY, getSelf());
}
}
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- transactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ domTransactionFactory.closeTransactionChain(closeTransactionChain.getTransactionChainId());
}
private ActorRef createTypedTransactionActor(int transactionType,
ShardTransactionIdentifier transactionId, String transactionChainId,
short clientVersion ) {
- DOMStoreTransaction transaction = transactionFactory.newTransaction(
- TransactionProxy.TransactionType.fromInt(transactionType), transactionId.toString(),
- transactionChainId);
-
- return createShardTransaction(transaction, transactionId, clientVersion);
- }
-
- private ActorRef createShardTransaction(DOMStoreTransaction transaction, ShardTransactionIdentifier transactionId,
- short clientVersion){
- return getContext().actorOf(
- ShardTransaction.props(transaction, getSelf(),
- schemaContext, datastoreContext, shardMBean,
- transactionId.getRemoteTransactionId(), clientVersion)
- .withDispatcher(txnDispatcherPath),
- transactionId.toString());
-
+ return transactionActorFactory.newShardTransaction(TransactionProxy.TransactionType.fromInt(transactionType),
+ transactionId, transactionChainId, clientVersion);
}
private void createTransaction(CreateTransaction createTransaction) {
return transactionActor;
}
- private void syncCommitTransaction(final DOMStoreWriteTransaction transaction)
- throws ExecutionException, InterruptedException {
- DOMStoreThreePhaseCommitCohort commitCohort = transaction.ready();
- commitCohort.preCommit().get();
- commitCohort.commit().get();
- }
-
private void commitWithNewTransaction(final Modification modification) {
DOMStoreWriteTransaction tx = store.newWriteOnlyTransaction();
modification.apply(tx);
try {
- syncCommitTransaction(tx);
+ snapshotCohort.syncCommitTransaction(tx);
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
} catch (InterruptedException | ExecutionException e) {
}
private void updateSchemaContext(final UpdateSchemaContext message) {
- this.schemaContext = message.getSchemaContext();
updateSchemaContext(message.getSchemaContext());
- store.onGlobalContextUpdated(message.getSchemaContext());
}
@VisibleForTesting
}
@Override
- protected
- void startLogRecoveryBatch(final int maxBatchSize) {
- recoveryCoordinator.startLogRecoveryBatch(maxBatchSize);
+ protected RaftActorSnapshotCohort getRaftActorSnapshotCohort() {
+ return snapshotCohort;
}
@Override
- protected void appendRecoveredLogEntry(final Payload data) {
- recoveryCoordinator.appendRecoveredLogPayload(data);
- }
-
- @Override
- protected void applyRecoverySnapshot(final byte[] snapshotBytes) {
- recoveryCoordinator.applyRecoveredSnapshot(snapshotBytes);
- }
-
- @Override
- protected void applyCurrentLogRecoveryBatch() {
- recoveryCoordinator.applyCurrentLogRecoveryBatch();
+ @Nonnull
+ protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
+ return new ShardRecoveryCoordinator(store, persistenceId(), LOG);
}
@Override
protected void onRecoveryComplete() {
- recoveryCoordinator = null;
-
//notify shard manager
getContext().parent().tell(new ActorInitialized(), getSelf());
}
}
- @Override
- protected void createSnapshot() {
- // Create a transaction actor. We are really going to treat the transaction as a worker
- // so that this actor does not get block building the snapshot. THe transaction actor will
- // after processing the CreateSnapshot message.
-
- ActorRef createSnapshotTransaction = createTransaction(
- TransactionProxy.TransactionType.READ_ONLY.ordinal(),
- "createSnapshot" + ++createSnapshotTransactionCounter, "",
- DataStoreVersions.CURRENT_VERSION);
-
- createSnapshotTransaction.tell(CreateSnapshot.INSTANCE, self());
- }
-
- @VisibleForTesting
- @Override
- protected void applySnapshot(final byte[] snapshotBytes) {
- // Since this will be done only on Recovery or when this actor is a Follower
- // we can safely commit everything in here. We not need to worry about event notifications
- // as they would have already been disabled on the follower
-
- LOG.info("{}: Applying snapshot", persistenceId());
- try {
- DOMStoreWriteTransaction transaction = store.newWriteOnlyTransaction();
-
- NormalizedNode<?, ?> node = SerializationUtils.deserializeNormalizedNode(snapshotBytes);
-
- // delete everything first
- transaction.delete(DATASTORE_ROOT);
-
- // Add everything from the remote node back
- transaction.write(DATASTORE_ROOT, node);
- syncCommitTransaction(transaction);
- } catch (InterruptedException | ExecutionException e) {
- LOG.error("{}: An exception occurred when applying snapshot", persistenceId(), e);
- } finally {
- LOG.info("{}: Done applying snapshot", persistenceId());
- }
- }
-
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
persistenceId(), getId());
}
- transactionFactory.closeAllTransactionChains();
+ domTransactionFactory.closeAllTransactionChains();
}
}