import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
*/
public class Shard extends RaftActor {
- private static final Object COMMIT_TRANSACTION_REPLY = new CommitTransactionReply().toSerializable();
-
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
- public static final String DEFAULT_NAME = "default";
+ @VisibleForTesting
+ static final String DEFAULT_NAME = "default";
// The state of this Shard
private final InMemoryDOMDataStore store;
applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
} else {
Shard.this.persistData(getSender(), transactionID,
- new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
+ new ModificationPayload(cohortEntry.getModification()));
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException | ExecutionException | IOException e) {
LOG.error(e, "An exception occurred while preCommitting transaction {}",
cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
cohortEntry = commitCoordinator.getAndRemoveCohortEntry(transactionID);
if(cohortEntry != null) {
commitWithNewTransaction(cohortEntry.getModification());
- sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
} else {
// This really shouldn't happen - it likely means that persistence or replication
// took so long to complete such that the cohort entry was expired from the cache.
// currently uses a same thread executor anyway.
cohortEntry.getCohort().commit().get();
- sender.tell(COMMIT_TRANSACTION_REPLY, getSelf());
+ sender.tell(CommitTransactionReply.INSTANCE.toSerializable(), getSelf());
shardMBean.incrementCommittedTransactionCount();
shardMBean.setLastCommittedTransactionTime(System.currentTimeMillis());
// transactionId so to maintain backwards compatibility, we create a separate cohort actor
// to provide the compatible behavior.
ActorRef replyActorPath = self();
- if(ready.getTxnClientVersion() < CreateTransaction.HELIUM_1_VERSION) {
+ if(ready.getTxnClientVersion() < DataStoreVersions.HELIUM_1_VERSION) {
LOG.debug("Creating BackwardsCompatibleThreePhaseCommitCohort");
replyActorPath = getContext().actorOf(BackwardsCompatibleThreePhaseCommitCohort.props(
ready.getTransactionID()));
shardMBean.incrementAbortTransactionsCount();
if(sender != null) {
- sender.tell(new AbortTransactionReply().toSerializable(), self);
+ sender.tell(AbortTransactionReply.INSTANCE.toSerializable(), self);
}
}
// This must be for install snapshot. Don't want to open this up and trigger
// deSerialization
- self().tell(new CaptureSnapshotReply(ReadDataReply.getNormalizedNodeByteString(message)),
+ self().tell(new CaptureSnapshotReply(ReadDataReply.fromSerializableAsByteString(message)),
self());
createSnapshotTransaction = null;
}
private ActorRef createTypedTransactionActor(int transactionType,
- ShardTransactionIdentifier transactionId, String transactionChainId, int clientVersion ) {
+ ShardTransactionIdentifier transactionId, String transactionChainId,
+ short clientVersion ) {
DOMStoreTransactionFactory factory = store;
}
private ActorRef createTransaction(int transactionType, String remoteTransactionId,
- String transactionChainId, int clientVersion) {
+ String transactionChainId, short clientVersion) {
ShardTransactionIdentifier transactionId =
ShardTransactionIdentifier.builder()
dataChangeListeners.add(dataChangeListenerPath);
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
- new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+ new DataChangeListenerProxy(dataChangeListenerPath);
LOG.debug("Registering for path {}", registerChangeListener.getPath());
@Override
protected void appendRecoveredLogEntry(final Payload data) {
- if (data instanceof CompositeModificationPayload) {
+ if(data instanceof ModificationPayload) {
+ try {
+ currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+ } catch (ClassNotFoundException | IOException e) {
+ LOG.error(e, "Error extracting ModificationPayload");
+ }
+ } else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
@Override
protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
- if (data instanceof CompositeModificationPayload) {
+ if(data instanceof ModificationPayload) {
+ try {
+ applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+ } catch (ClassNotFoundException | IOException e) {
+ LOG.error(e, "Error extracting ModificationPayload");
+ }
+ }
+ else if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
applyModificationToState(clientActor, identifier, modification);
Object modification = ((CompositeModificationByteStringPayload) data).getModification();
applyModificationToState(clientActor, identifier, modification);
-
} else {
LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
data, data.getClass().getClassLoader(),
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(
- modification, schemaContext));
+ commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
} else {
// This must be the OK to commit after replication consensus.
finishCommit(clientActor, identifier);
createSnapshotTransaction = createTransaction(
TransactionProxy.TransactionType.READ_ONLY.ordinal(),
"createSnapshot" + ++createSnapshotTransactionCounter, "",
- CreateTransaction.CURRENT_VERSION);
+ DataStoreVersions.CURRENT_VERSION);
createSnapshotTransaction.tell(
new ReadData(YangInstanceIdentifier.builder().build()).toSerializable(), self());