X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=b3b604b7f08ad7e42abb4a224626673b97b8b8e3;hp=07fcbebad01a263d38a0bfabe839e8b02832ffe2;hb=b4d95acff78952020e9fbde4372d13b461fd7469;hpb=61d4d322740f116d7d8ec91b8ba2e4eed409d7d7;ds=sidebyside diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 07fcbebad0..b3b604b7f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -22,6 +22,8 @@ import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -195,12 +197,33 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replaySuccessfulRequests() { + void replaySuccessfulRequests(final Iterable previousEntries) { + // First look for our Create message + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + if (req instanceof CreateLocalHistoryRequest) { + successor.connection.sendRequest(req, e.getCallback()); + break; + } + } + } + for (AbstractProxyTransaction t : proxies.values()) { LOG.debug("{} creating successor transaction proxy for {}", identifier, t); final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.startReconnect(newProxy); + t.replayMessages(newProxy, previousEntries); + } + + // Now look for any finalizing messages + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + successor.connection.sendRequest(req, e.getCallback()); + } } } @@ -347,6 +370,11 @@ abstract class ProxyHistory implements Identifiable { successor = createSuccessor(newConnection); LOG.debug("History {} instantiated successor {}", this, successor); + + for (AbstractProxyTransaction t : proxies.values()) { + t.startReconnect(); + } + return new ReconnectCohort(); }