import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
void continueCommit(final CohortEntry cohortEntry) {
final DataTreeCandidate candidate = cohortEntry.getCandidate();
+ final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
// If we do not have any followers and we are not using persistence
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
- } else {
- persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+ return;
}
+
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(transactionId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+ e);
+ // TODO: do we need to do something smarter here?
+ throw Throwables.propagate(e);
+ }
+
+ persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
}
private void handleCommitTransaction(final CommitTransaction commit) {
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ store.closeTransactionChain(closeTransactionChain.getIdentifier());
}
private void createTransaction(CreateTransaction createTransaction) {
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
- if (data instanceof DataTreeCandidatePayload) {
+ if (data instanceof DataTreeCandidateSupplier) {
if (clientActor == null) {
// No clientActor indicates a replica coming from the leader
try {
- store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+ store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
} catch (DataValidationFailedException | IOException e) {
LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}