import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
import org.opendaylight.controller.cluster.common.actor.CommonConfig;
import org.opendaylight.controller.cluster.common.actor.MessageTracker;
import org.opendaylight.controller.cluster.common.actor.MessageTracker.Error;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
-import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
import org.opendaylight.yangtools.yang.data.api.schema.tree.ModificationType;
new Dispatchers(context().system().dispatchers()).getDispatcherPath(
Dispatchers.DispatcherType.Transaction), self(), getContext(), shardMBean);
- snapshotCohort = new ShardSnapshotCohort(transactionActorFactory, store, LOG, this.name);
+ snapshotCohort = new ShardSnapshotCohort(builder.getId().getMemberName(), transactionActorFactory, store,
+ LOG, this.name);
messageRetrySupport = new ShardTransactionMessageRetrySupport(this);
}
void continueCommit(final CohortEntry cohortEntry) {
final DataTreeCandidate candidate = cohortEntry.getCandidate();
+ final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
// If we do not have any followers and we are not using persistence
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
- } else {
- persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+ return;
}
+
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(transactionId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+ e);
+ // TODO: do we need to do something smarter here?
+ throw Throwables.propagate(e);
+ }
+
+ persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
}
private void handleCommitTransaction(final CommitTransaction commit) {
if (isLeader()) {
- if(!commitCoordinator.handleCommit(new StringIdentifier(commit.getTransactionID()), getSender(), this)) {
+ if(!commitCoordinator.handleCommit(commit.getTransactionID(), getSender(), this)) {
shardMBean.incrementFailedTransactionsCount();
}
} else {
LOG.debug("{}: Can committing transaction {}", persistenceId(), canCommit.getTransactionID());
if (isLeader()) {
- commitCoordinator.handleCanCommit(new StringIdentifier(canCommit.getTransactionID()), getSender(), this);
+ commitCoordinator.handleCanCommit(canCommit.getTransactionID(), getSender(), this);
} else {
ActorSelection leader = getLeader();
if (leader == null) {
doAbortTransaction(abort.getTransactionID(), getSender());
}
- void doAbortTransaction(final String transactionID, final ActorRef sender) {
- commitCoordinator.handleAbort(new StringIdentifier(transactionID), sender, this);
+ void doAbortTransaction(final TransactionIdentifier transactionID, final ActorRef sender) {
+ commitCoordinator.handleAbort(transactionID, sender, this);
}
private void handleCreateTransaction(final Object message) {
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ store.closeTransactionChain(closeTransactionChain.getIdentifier());
}
private void createTransaction(CreateTransaction createTransaction) {
}
ActorRef transactionActor = createTransaction(createTransaction.getTransactionType(),
- createTransaction.getTransactionId(), createTransaction.getTransactionChainId());
+ createTransaction.getTransactionId());
getSender().tell(new CreateTransactionReply(Serialization.serializedActorPath(transactionActor),
createTransaction.getTransactionId(), createTransaction.getVersion()).toSerializable(), getSelf());
}
}
- private ActorRef createTransaction(int transactionType, String transactionId, String transactionChainId) {
+ private ActorRef createTransaction(int transactionType, TransactionIdentifier transactionId) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
- transactionId, transactionChainId);
+ transactionId);
}
private void commitWithNewTransaction(final BatchedModifications modification) {
- ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID(),
- modification.getTransactionChainID());
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID());
modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
- if (data instanceof DataTreeCandidatePayload) {
+ if (data instanceof DataTreeCandidateSupplier) {
if (clientActor == null) {
// No clientActor indicates a replica coming from the leader
try {
- store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+ store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
} catch (DataValidationFailedException | IOException e) {
LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}