X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=dea377a810cb73be01cf4c17dab93c4c2b476c1b;hb=3f2221486de63178fbfbb43508ce9466c0b23b73;hp=1661bb4b5dc95477f9e10c413171bc8d5ba03fb8;hpb=cad857b425b1a0072681066b2ba37b0b0dc8c111;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 1661bb4b5d..dea377a810 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -63,6 +64,7 @@ import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeList import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; +import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload; import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification; import org.opendaylight.controller.cluster.datastore.node.NormalizedNodeToNodeCodec; import org.opendaylight.controller.cluster.notifications.RoleChangeNotifier; @@ -97,7 +99,8 @@ public class Shard extends RaftActor { private static final Object TX_COMMIT_TIMEOUT_CHECK_MESSAGE = "txCommitTimeoutCheck"; - public static final String DEFAULT_NAME = "default"; + @VisibleForTesting + static final String DEFAULT_NAME = "default"; // The state of this Shard private final InMemoryDOMDataStore store; @@ -325,9 +328,9 @@ public class Shard extends RaftActor { applyModificationToState(getSender(), transactionID, cohortEntry.getModification()); } else { Shard.this.persistData(getSender(), transactionID, - new CompositeModificationByteStringPayload(cohortEntry.getModification().toSerializable())); + new ModificationPayload(cohortEntry.getModification())); } - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException | ExecutionException | IOException e) { LOG.error(e, "An exception occurred while preCommitting transaction {}", cohortEntry.getTransactionID()); shardMBean.incrementFailedTransactionsCount(); @@ -683,7 +686,13 @@ public class Shard extends RaftActor { @Override protected void appendRecoveredLogEntry(final Payload data) { - if (data instanceof CompositeModificationPayload) { + if(data instanceof ModificationPayload) { + try { + currentLogRecoveryBatch.add(((ModificationPayload) data).getModification()); + } catch (ClassNotFoundException | IOException e) { + LOG.error(e, "Error extracting ModificationPayload"); + } + } else if (data instanceof CompositeModificationPayload) { currentLogRecoveryBatch.add(((CompositeModificationPayload) data).getModification()); } else if (data instanceof CompositeModificationByteStringPayload) { currentLogRecoveryBatch.add(((CompositeModificationByteStringPayload) data).getModification()); @@ -760,7 +769,14 @@ public class Shard extends RaftActor { @Override protected void applyState(final ActorRef clientActor, final String identifier, final Object data) { - if (data instanceof CompositeModificationPayload) { + if(data instanceof ModificationPayload) { + try { + applyModificationToState(clientActor, identifier, ((ModificationPayload) data).getModification()); + } catch (ClassNotFoundException | IOException e) { + LOG.error(e, "Error extracting ModificationPayload"); + } + } + else if (data instanceof CompositeModificationPayload) { Object modification = ((CompositeModificationPayload) data).getModification(); applyModificationToState(clientActor, identifier, modification); @@ -768,7 +784,6 @@ public class Shard extends RaftActor { Object modification = ((CompositeModificationByteStringPayload) data).getModification(); applyModificationToState(clientActor, identifier, modification); - } else { LOG.error("Unknown state received {} Class loader = {} CompositeNodeMod.ClassLoader = {}", data, data.getClass().getClassLoader(), @@ -787,8 +802,7 @@ public class Shard extends RaftActor { } else if(clientActor == null) { // There's no clientActor to which to send a commit reply so we must be applying // replicated state from the leader. - commitWithNewTransaction(MutableCompositeModification.fromSerializable( - modification, schemaContext)); + commitWithNewTransaction(MutableCompositeModification.fromSerializable(modification)); } else { // This must be the OK to commit after replication consensus. finishCommit(clientActor, identifier);