import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.controller.cluster.datastore.persisted.CommitTransactionPayload;
+import org.opendaylight.controller.cluster.datastore.persisted.DataTreeCandidateSupplier;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
+import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataValidationFailedException;
void continueCommit(final CohortEntry cohortEntry) {
final DataTreeCandidate candidate = cohortEntry.getCandidate();
+ final TransactionIdentifier transactionId = cohortEntry.getTransactionID();
// If we do not have any followers and we are not using persistence
// or if cohortEntry has no modifications
// we can apply modification to the state immediately
if ((!hasFollowers() && !persistence().isRecoveryApplicable()) || isEmptyCommit(candidate)) {
- applyModificationToState(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), candidate);
- } else {
- persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(),
- DataTreeCandidatePayload.create(candidate));
+ applyModificationToState(cohortEntry.getReplySender(), transactionId, candidate);
+ return;
}
+
+ final Payload payload;
+ try {
+ payload = CommitTransactionPayload.create(transactionId, candidate);
+ } catch (IOException e) {
+ LOG.error("{}: failed to encode transaction {} candidate {}", persistenceId(), transactionId, candidate,
+ e);
+ // TODO: do we need to do something smarter here?
+ throw Throwables.propagate(e);
+ }
+
+ persistData(cohortEntry.getReplySender(), cohortEntry.getTransactionID(), payload);
}
private void handleCommitTransaction(final CommitTransaction commit) {
protected void handleBatchedModificationsLocal(BatchedModifications batched, ActorRef sender) {
try {
- commitCoordinator.handleBatchedModifications(batched, sender, this, store.getSchemaContext());
+ commitCoordinator.handleBatchedModifications(batched, sender, this);
} catch (Exception e) {
LOG.error("{}: Error handling BatchedModifications for Tx {}", persistenceId(),
batched.getTransactionID(), e);
boolean isLeaderActive = isLeaderActive();
if (isLeader() && isLeaderActive) {
try {
- commitCoordinator.handleReadyLocalTransaction(message, getSender(), this, store.getSchemaContext());
+ commitCoordinator.handleReadyLocalTransaction(message, getSender(), this);
} catch (Exception e) {
LOG.error("{}: Error handling ReadyLocalTransaction for Tx {}", persistenceId(),
message.getTransactionID(), e);
}
private void closeTransactionChain(final CloseTransactionChain closeTransactionChain) {
- store.closeTransactionChain(closeTransactionChain.getTransactionChainId());
+ store.closeTransactionChain(closeTransactionChain.getIdentifier());
}
private void createTransaction(CreateTransaction createTransaction) {
@Override
@Nonnull
protected RaftActorRecoveryCohort getRaftActorRecoveryCohort() {
- return new ShardRecoveryCoordinator(store, store.getSchemaContext(),
- restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
+ return new ShardRecoveryCoordinator(store,
+ restoreFromSnapshot != null ? restoreFromSnapshot.getSnapshot() : null, persistenceId(), LOG);
}
@Override
@Override
protected void applyState(final ActorRef clientActor, final Identifier identifier, final Object data) {
- if (data instanceof DataTreeCandidatePayload) {
+ if (data instanceof DataTreeCandidateSupplier) {
if (clientActor == null) {
// No clientActor indicates a replica coming from the leader
try {
- store.applyForeignCandidate(identifier, ((DataTreeCandidatePayload)data).getCandidate());
+ store.applyForeignCandidate(identifier, ((DataTreeCandidateSupplier)data).getCandidate().getValue());
} catch (DataValidationFailedException | IOException e) {
LOG.error("{}: Error applying replica {}", persistenceId(), identifier, e);
}