X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=4764f24d495991f0082526fef2b569a62b828835;hp=802d9ed0b33bc3a1fb5716a62d84bf03ffd266d3;hb=cc5009b8f3ea91f64ee48cda815c6a5e73a8a1af;hpb=585e116247f9b616579ffad1785a972621d928e7 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 802d9ed0b3..4764f24d49 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -11,6 +11,7 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -22,6 +23,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; @@ -212,7 +214,7 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replayRequests(final Iterable previousEntries) { + void replayRequests(final Collection previousEntries) { // First look for our Create message Iterator it = previousEntries.iterator(); while (it.hasNext()) { @@ -221,7 +223,7 @@ abstract class ProxyHistory implements Identifiable { if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof CreateLocalHistoryRequest) { - successor.connection.sendRequest(req, e.getCallback()); + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); break; } @@ -229,11 +231,8 @@ abstract class ProxyHistory implements Identifiable { } for (AbstractProxyTransaction t : proxies.values()) { - LOG.debug("{} creating successor transaction proxy for {}", identifier, t); - final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(), - t.isSnapshotOnly()); - LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.replayMessages(newProxy, previousEntries); + LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor); + t.replayMessages(successor, previousEntries); } // Now look for any finalizing messages @@ -244,7 +243,7 @@ abstract class ProxyHistory implements Identifiable { if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof DestroyLocalHistoryRequest) { - successor.connection.sendRequest(req, e.getCallback()); + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); it.remove(); break; } @@ -337,6 +336,10 @@ abstract class ProxyHistory implements Identifiable { return identifier; } + final ClientActorContext context() { + return connection.context(); + } + final long currentTime() { return connection.currentTime(); } @@ -349,7 +352,7 @@ abstract class ProxyHistory implements Identifiable { return parent; } - final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, + AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly) { lock.lock(); try { @@ -370,8 +373,8 @@ abstract class ProxyHistory implements Identifiable { final void abortTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); - LOG.debug("Proxy {} aborting transaction {}", this, tx); + // Removal will be completed once purge completes + LOG.debug("Proxy {} aborted transaction {}", this, tx); onTransactionAborted(tx); } finally { lock.unlock();