X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=b3b604b7f08ad7e42abb4a224626673b97b8b8e3;hp=5257c6c7f6299bcfd85e08f67711256dbe906c63;hb=b4d95acff78952020e9fbde4372d13b461fd7469;hpb=db9a673c114febc785fbd324947ac2c3e3095d06 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 5257c6c7f6..b3b604b7f0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -22,6 +22,8 @@ import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; +import org.opendaylight.controller.cluster.access.client.ConnectionEntry; +import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; @@ -195,12 +197,33 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replaySuccessfulRequests() { + void replaySuccessfulRequests(final Iterable previousEntries) { + // First look for our Create message + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + if (req instanceof CreateLocalHistoryRequest) { + successor.connection.sendRequest(req, e.getCallback()); + break; + } + } + } + for (AbstractProxyTransaction t : proxies.values()) { LOG.debug("{} creating successor transaction proxy for {}", identifier, t); final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier()); LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.replaySuccessfulRequests(newProxy); + t.replayMessages(newProxy, previousEntries); + } + + // Now look for any finalizing messages + for (ConnectionEntry e : previousEntries) { + final Request req = e.getRequest(); + if (identifier.equals(req.getTarget())) { + Verify.verify(req instanceof LocalHistoryRequest); + successor.connection.sendRequest(req, e.getCallback()); + } } } @@ -208,6 +231,11 @@ abstract class ProxyHistory implements Identifiable { @Override ProxyHistory finishReconnect() { final ProxyHistory ret = Verify.verifyNotNull(successor); + + for (AbstractProxyTransaction t : proxies.values()) { + t.finishReconnect(); + } + LOG.debug("Finished reconnecting proxy history {}", this); lock.unlock(); return ret; @@ -342,6 +370,11 @@ abstract class ProxyHistory implements Identifiable { successor = createSuccessor(newConnection); LOG.debug("History {} instantiated successor {}", this, successor); + + for (AbstractProxyTransaction t : proxies.values()) { + t.startReconnect(); + } + return new ReconnectCohort(); }