import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
+import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec;
import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier;
private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck";
- public static final String DEFAULT_NAME = "default";
+ @VisibleForTesting
+ static final String DEFAULT_NAME = "default";
// The state of this Shard
private final InMemoryDOMDataStore store;
applyModificationToState(getSender(), transactionID, cohortEntry.getModification());
} else {
Shard.this.persistData(getSender(), transactionID,
- new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable()));
+ new ModificationPayload(cohortEntry.getModification()));
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException | ExecutionException | IOException e) {
LOG.error(e, "An exception occurred while preCommitting transaction {}",
cohortEntry.getTransactionID());
shardMBean.incrementFailedTransactionsCount();
@Override
protected void appendRecoveredLogEntry(final Payload data) {
- if (data instanceof CompositeModificationPayload) {
+ if(data instanceof ModificationPayload) {
+ try {
+ currentLogRecoveryBatch.add(((ModificationPayload) data).getModification());
+ } catch (ClassNotFoundException | IOException e) {
+ LOG.error(e, "Error extracting ModificationPayload");
+ }
+ } else if (data instanceof CompositeModificationPayload) {
currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification());
} else if (data instanceof CompositeModificationByteStringPayload) {
currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification());
@Override
protected void applyState(final ActorRef clientActor, final String identifier, final Object data) {
- if (data instanceof CompositeModificationPayload) {
+ if(data instanceof ModificationPayload) {
+ try {
+ applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification());
+ } catch (ClassNotFoundException | IOException e) {
+ LOG.error(e, "Error extracting ModificationPayload");
+ }
+ }
+ else if (data instanceof CompositeModificationPayload) {
Object modification = ((CompositeModificationPayload) data).getModification();
applyModificationToState(clientActor, identifier, modification);
Object modification = ((CompositeModificationByteStringPayload) data).getModification();
applyModificationToState(clientActor, identifier, modification);
-
} else {
LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}",
data, data.getClass().getClassLoader(),
} else if(clientActor == null) {
// There's no clientActor to which to send a commit reply so we must be applying
// replicated state from the leader.
- commitWithNewTransaction(MutableCompositeModification.fromSerializable(
- modification, schemaContext));
+ commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification));
} else {
// This must be the OK to commit after replication consensus.
finishCommit(clientActor, identifier);