X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=802d9ed0b33bc3a1fb5716a62d84bf03ffd266d3;hp=d6aa3d3f3fc079db3ac4ed12e46c40173f1c8dc4;hb=28551609a31799a43d3017ba0681e198f5136d70;hpb=d6ed0a044d591d65847714451d97d80345154089 diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index d6aa3d3f3f..802d9ed0b3 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -11,6 +11,7 @@ import akka.actor.ActorRef; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -24,7 +25,9 @@ import org.opendaylight.controller.cluster.access.client.AbstractClientConnectio import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.DestroyLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.LocalHistoryRequest; +import org.opendaylight.controller.cluster.access.commands.PurgeLocalHistoryRequest; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.Request; @@ -46,9 +49,9 @@ abstract class ProxyHistory implements Identifiable { private abstract static class AbstractLocal extends ProxyHistory { private final DataTree dataTree; - AbstractLocal(final AbstractClientConnection connection, + AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, final DataTree dataTree) { - super(connection, identifier); + super(parent, connection, identifier); this.dataTree = Preconditions.checkNotNull(dataTree); } @@ -58,9 +61,9 @@ abstract class ProxyHistory implements Identifiable { } private abstract static class AbstractRemote extends ProxyHistory { - AbstractRemote(final AbstractClientConnection connection, + AbstractRemote(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); + super(parent, connection, identifier); } } @@ -74,9 +77,9 @@ abstract class ProxyHistory implements Identifiable { private volatile LocalReadWriteProxyTransaction lastSealed; - Local(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, - final DataTree dataTree) { - super(connection, identifier, dataTree); + Local(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier, final DataTree dataTree) { + super(parent, connection, identifier, dataTree); } @Override @@ -104,7 +107,7 @@ abstract class ProxyHistory implements Identifiable { @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createClient(connection, getIdentifier()); + return createClient(parent(), connection, getIdentifier()); } @Override @@ -133,9 +136,9 @@ abstract class ProxyHistory implements Identifiable { } private static final class LocalSingle extends AbstractLocal { - LocalSingle(final AbstractClientConnection connection, + LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier, final DataTree dataTree) { - super(connection, identifier, dataTree); + super(parent, connection, identifier, dataTree); } @Override @@ -148,13 +151,14 @@ abstract class ProxyHistory implements Identifiable { @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createSingle(connection, getIdentifier()); + return createSingle(parent(), connection, getIdentifier()); } } private static final class Remote extends AbstractRemote { - Remote(final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); + Remote(final AbstractClientHistory parent, final AbstractClientConnection connection, + final LocalHistoryIdentifier identifier) { + super(parent, connection, identifier); } @Override @@ -165,14 +169,14 @@ abstract class ProxyHistory implements Identifiable { @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createClient(connection, getIdentifier()); + return createClient(parent(), connection, getIdentifier()); } } private static final class RemoteSingle extends AbstractRemote { - RemoteSingle(final AbstractClientConnection connection, + RemoteSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - super(connection, identifier); + super(parent, connection, identifier); } @Override @@ -183,7 +187,7 @@ abstract class ProxyHistory implements Identifiable { @Override ProxyHistory createSuccessor(final AbstractClientConnection connection) { - return createSingle(connection, getIdentifier()); + return createSingle(parent(), connection, getIdentifier()); } } @@ -208,14 +212,17 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") @Override - void replaySuccessfulRequests(final Iterable previousEntries) { + void replayRequests(final Iterable previousEntries) { // First look for our Create message - for (ConnectionEntry e : previousEntries) { + Iterator it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof CreateLocalHistoryRequest) { successor.connection.sendRequest(req, e.getCallback()); + it.remove(); break; } } @@ -230,11 +237,17 @@ abstract class ProxyHistory implements Identifiable { } // Now look for any finalizing messages - for (ConnectionEntry e : previousEntries) { + it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); - successor.connection.sendRequest(req, e.getCallback()); + if (req instanceof DestroyLocalHistoryRequest) { + successor.connection.sendRequest(req, e.getCallback()); + it.remove(); + break; + } } } } @@ -254,18 +267,20 @@ abstract class ProxyHistory implements Identifiable { } @Override - void replayRequest(final Request request, final Consumer> callback, - final BiConsumer, Consumer>> replayTo) throws RequestException { + void forwardRequest(final Request request, final Consumer> callback, + final BiConsumer, Consumer>> forwardTo) throws RequestException { + // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the + // period required to get into the queue. if (request instanceof TransactionRequest) { - replayTransactionRequest((TransactionRequest) request, callback); + forwardTransactionRequest((TransactionRequest) request, callback); } else if (request instanceof LocalHistoryRequest) { - replayTo.accept(request, callback); + forwardTo.accept(request, callback); } else { throw new IllegalArgumentException("Unhandled request " + request); } } - private void replayTransactionRequest(final TransactionRequest request, + private void forwardTransactionRequest(final TransactionRequest request, final Consumer> callback) throws RequestException { final AbstractProxyTransaction proxy; @@ -279,7 +294,7 @@ abstract class ProxyHistory implements Identifiable { throw new RequestReplayException("Failed to find proxy for %s", request); } - proxy.replayRequest(request, callback); + proxy.forwardRequest(request, callback); } } @@ -288,30 +303,33 @@ abstract class ProxyHistory implements Identifiable { private final Lock lock = new ReentrantLock(); private final LocalHistoryIdentifier identifier; private final AbstractClientConnection connection; + private final AbstractClientHistory parent; @GuardedBy("lock") private final Map proxies = new LinkedHashMap<>(); @GuardedBy("lock") private ProxyHistory successor; - private ProxyHistory(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier) { + private ProxyHistory(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { + this.parent = Preconditions.checkNotNull(parent); this.connection = Preconditions.checkNotNull(connection); this.identifier = Preconditions.checkNotNull(identifier); } - static ProxyHistory createClient(final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier) { + static ProxyHistory createClient(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new Local(connection, identifier, dataTree.get()) - : new Remote(connection, identifier); + return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get()) + : new Remote(parent, connection, identifier); } - static ProxyHistory createSingle(final AbstractClientConnection connection, + static ProxyHistory createSingle(final AbstractClientHistory parent, + final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); - return dataTree.isPresent() ? new LocalSingle(connection, identifier, dataTree.get()) - : new RemoteSingle(connection, identifier); + return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get()) + : new RemoteSingle(parent, connection, identifier); } @Override @@ -319,10 +337,18 @@ abstract class ProxyHistory implements Identifiable { return identifier; } + final long currentTime() { + return connection.currentTime(); + } + final ActorRef localActor() { return connection.localActor(); } + final AbstractClientHistory parent() { + return parent; + } + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly) { lock.lock(); @@ -363,6 +389,27 @@ abstract class ProxyHistory implements Identifiable { } } + final void close() { + lock.lock(); + try { + if (successor != null) { + successor.close(); + return; + } + + LOG.debug("Proxy {} invoking destroy", this); + connection.sendRequest(new DestroyLocalHistoryRequest(getIdentifier(), 1, localActor()), + this::onDestroyComplete); + } finally { + lock.unlock(); + } + } + + final void enqueueRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + connection.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { connection.sendRequest(request, callback); } @@ -391,6 +438,23 @@ abstract class ProxyHistory implements Identifiable { return new ReconnectCohort(); } + private void onDestroyComplete(final Response response) { + LOG.debug("Proxy {} destroy completed with {}", this, response); + + lock.lock(); + try { + parent.onProxyDestroyed(this); + connection.sendRequest(new PurgeLocalHistoryRequest(getIdentifier(), 2, localActor()), + this::onPurgeComplete); + } finally { + lock.unlock(); + } + } + + private void onPurgeComplete(final Response response) { + LOG.debug("Proxy {} purge completed with {}", this, response); + } + @GuardedBy("lock") void onTransactionAborted(final AbstractProxyTransaction tx) { // No-op for most implementations