X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatastore%2FShard.java;h=fdf00c6fff9f20f8eb349ea108d3f97d1d642235;hb=15c366198fa48eefd94f4d1a72faa9833e988250;hp=57e85570a903fb7923dc80b1478b693071d8b848;hpb=a46305fbc6bb7ec6883c21298d356a5e4fbbb015;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 57e85570a9..fdf00c6fff 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -17,6 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import java.io.IOException; +import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -195,18 +196,18 @@ public class Shard extends RaftActor { } @Override - public void onReceiveRecover(final Object message) throws Exception { + protected void handleRecover(final Object message) { LOG.debug("{}: onReceiveRecover: Received message {} from {}", persistenceId(), message.getClass(), getSender()); - super.onReceiveRecover(message); + super.handleRecover(message); if (LOG.isTraceEnabled()) { appendEntriesReplyTracker.begin(); } } @Override - public void onReceiveCommand(final Object message) throws Exception { + protected void handleCommand(final Object message) { MessageTracker.Context context = appendEntriesReplyTracker.received(message); @@ -260,7 +261,7 @@ public class Shard extends RaftActor { } else if(ShardTransactionMessageRetrySupport.TIMER_MESSAGE_CLASS.isInstance(message)) { messageRetrySupport.onTimerMessage(message); } else { - super.onReceiveCommand(message); + super.handleCommand(message); } } finally { context.done(); @@ -427,11 +428,19 @@ public class Shard extends RaftActor { messageRetrySupport.addMessageToRetry(batched, getSender(), "Could not commit transaction " + batched.getTransactionID()); } else { - // TODO: what if this is not the first batch and leadership changed in between batched messages? - // We could check if the commitCoordinator already has a cached entry and forward all the previous - // batched modifications. - LOG.debug("{}: Forwarding BatchedModifications to leader {}", persistenceId(), leader); - leader.forward(batched, getContext()); + // If this is not the first batch and leadership changed in between batched messages, + // we need to reconstruct previous BatchedModifications from the transaction + // DataTreeModification, honoring the max batched modification count, and forward all the + // previous BatchedModifications to the new leader. + Collection newModifications = commitCoordinator.createForwardedBatchedModifications( + batched, datastoreContext.getShardBatchedModificationCount()); + + LOG.debug("{}: Forwarding {} BatchedModifications to leader {}", persistenceId(), + newModifications.size(), leader); + + for(BatchedModifications bm: newModifications) { + leader.forward(bm, getContext()); + } } } }