import org.opendaylight.controller.cluster.common.actor.MeteringBehavior;
import org.opendaylight.controller.cluster.datastore.exceptions.NoShardLeaderException;
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardStats;
import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.ShardLeaderStateChanged;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.cluster.datastore.modification.Modification;
-import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.notifications.LeaderStateChanged;
import org.opendaylight.controller.cluster.notifications.RegisterRoleChangeListener;
import org.opendaylight.controller.cluster.raft.base.messages.FollowerInitialSyncUpStatus;
import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply;
import org.opendaylight.controller.cluster.raft.messages.ServerRemoved;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload;
-import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload;
import org.opendaylight.yangtools.concepts.Identifier;
import org.opendaylight.yangtools.util.StringIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
}
}
- private ActorRef createTransaction(int transactionType, String remoteTransactionId, String transactionChainId) {
- ShardTransactionIdentifier transactionId = new ShardTransactionIdentifier(remoteTransactionId);
+ private ActorRef createTransaction(int transactionType, String transactionId, String transactionChainId) {
LOG.debug("{}: Creating transaction : {} ", persistenceId(), transactionId);
return transactionActorFactory.newShardTransaction(TransactionType.fromInt(transactionType),
transactionId, transactionChainId);
}
- private void commitWithNewTransaction(final Modification modification) {
- ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.toString(), null);
+ private void commitWithNewTransaction(final BatchedModifications modification) {
+ ReadWriteShardDataTreeTransaction tx = store.newReadWriteTransaction(modification.getTransactionID(),
+ modification.getTransactionChainID());
modification.apply(tx.getSnapshot());
try {
snapshotCohort.syncCommitTransaction(tx);
// Replication consensus reached, proceed to commit
finishCommit(clientActor, identifier);
}
- } else if (data instanceof CompositeModificationPayload) {
- Object modification = ((CompositeModificationPayload) data).getModification();
-
- applyModificationToState(clientActor, identifier, modification);
- } else if(data instanceof CompositeModificationByteStringPayload ){
- Object modification = ((CompositeModificationByteStringPayload) data).getModification();
-
- applyModificationToState(clientActor, identifier, modification);
} else {
- LOG.error("{}: Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
- persistenceId(), data, data.getClass().getClassLoader(),
- CompositeModificationPayload.class.getClassLoader());
+ LOG.error("{}: Unknown state received {} ClassLoader {}", persistenceId(), data,
+ data.getClass().getClassLoader());
}
}
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
+
+ // The only implementation we know of is BatchedModifications, which also carries a transaction
+ // identifier -- which we really need that.
+ Preconditions.checkArgument(modification instanceof BatchedModifications);
+ commitWithNewTransaction((BatchedModifications)modification);
} else {
// This must be the OK to commit after replication consensus.
finishCommit(clientActor, identifier);