X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=340cc5581fbbe5ac051b033d8df86677c5aaa3f0;hb=abaef4a5ae37f27542155457fe7306a4662b1eeb;hp=2a21b8e858c9548ccf5ef82206042b83f38cb4e5;hpb=cd801d3b254bf709903b1fd31379967ab8ac1f36;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 2a21b8e858..340cc5581f 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -7,20 +7,25 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; + import akka.actor.ActorRef; -import com.google.common.base.Preconditions; import com.google.common.base.Verify; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collection; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; import java.util.function.Consumer; -import javax.annotation.concurrent.GuardedBy; +import org.checkerframework.checker.lock.qual.GuardedBy; +import org.checkerframework.checker.lock.qual.Holding; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; +import org.opendaylight.controller.cluster.access.client.ClientActorContext; import org.opendaylight.controller.cluster.access.client.ConnectedClientConnection; import org.opendaylight.controller.cluster.access.client.ConnectionEntry; import org.opendaylight.controller.cluster.access.commands.CreateLocalHistoryRequest; @@ -34,8 +39,8 @@ import org.opendaylight.controller.cluster.access.concepts.RequestException; import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.yangtools.concepts.Identifiable; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeSnapshot; +import org.opendaylight.yangtools.yang.data.api.schema.tree.ReadOnlyDataTree; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,12 +51,12 @@ import org.slf4j.LoggerFactory; */ abstract class ProxyHistory implements Identifiable { private abstract static class AbstractLocal extends ProxyHistory { - private final DataTree dataTree; + private final ReadOnlyDataTree dataTree; AbstractLocal(final AbstractClientHistory parent, final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier, final DataTree dataTree) { + final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) { super(parent, connection, identifier); - this.dataTree = Preconditions.checkNotNull(dataTree); + this.dataTree = requireNonNull(dataTree); } final DataTreeSnapshot takeSnapshot() { @@ -77,14 +82,20 @@ abstract class ProxyHistory implements Identifiable { private volatile LocalReadWriteProxyTransaction lastSealed; Local(final AbstractClientHistory parent, final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier, final DataTree dataTree) { + final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) { super(parent, connection, identifier, dataTree); } @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { - Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); + + if (isDone) { + // Done transactions do not register on our radar on should not have any state associated. + return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId) + : new LocalReadWriteProxyTransaction(this, txId); + } // onTransactionCompleted() runs concurrently final LocalReadWriteProxyTransaction localSealed = lastSealed; @@ -119,16 +130,15 @@ abstract class ProxyHistory implements Identifiable { @Override void onTransactionCompleted(final AbstractProxyTransaction tx) { Verify.verify(tx instanceof LocalProxyTransaction); - if (tx instanceof LocalReadWriteProxyTransaction) { - if (LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) { - LOG.debug("Completed last sealed transaction {}", tx); - } + if (tx instanceof LocalReadWriteProxyTransaction + && LAST_SEALED_UPDATER.compareAndSet(this, (LocalReadWriteProxyTransaction) tx, null)) { + LOG.debug("Completed last sealed transaction {}", tx); } } @Override void onTransactionSealed(final AbstractProxyTransaction tx) { - Preconditions.checkState(tx.equals(lastOpen)); + checkState(tx.equals(lastOpen)); lastSealed = lastOpen; lastOpen = null; } @@ -136,13 +146,13 @@ abstract class ProxyHistory implements Identifiable { private static final class LocalSingle extends AbstractLocal { LocalSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, - final LocalHistoryIdentifier identifier, final DataTree dataTree) { + final LocalHistoryIdentifier identifier, final ReadOnlyDataTree dataTree) { super(parent, connection, identifier, dataTree); } @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { final DataTreeSnapshot snapshot = takeSnapshot(); return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) : new LocalReadWriteProxyTransaction(this, txId, snapshot); @@ -162,8 +172,8 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { - return new RemoteProxyTransaction(this, txId, snapshotOnly, true); + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone); } @Override @@ -180,8 +190,8 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { - return new RemoteProxyTransaction(this, txId, snapshotOnly, false); + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone); } @Override @@ -209,35 +219,41 @@ abstract class ProxyHistory implements Identifiable { return identifier; } - @GuardedBy("lock") + @Holding("lock") @Override - void replayRequests(final Iterable previousEntries) { + void replayRequests(final Collection previousEntries) { // First look for our Create message - for (ConnectionEntry e : previousEntries) { + Iterator it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); if (req instanceof CreateLocalHistoryRequest) { - successor.connection.sendRequest(req, e.getCallback()); + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); + it.remove(); break; } } } for (AbstractProxyTransaction t : proxies.values()) { - LOG.debug("{} creating successor transaction proxy for {}", identifier, t); - final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(), - t.isSnapshotOnly()); - LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.replayMessages(newProxy, previousEntries); + LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor); + t.replayMessages(successor, previousEntries); } // Now look for any finalizing messages - for (ConnectionEntry e : previousEntries) { + it = previousEntries.iterator(); + while (it.hasNext()) { + final ConnectionEntry e = it.next(); final Request req = e.getRequest(); if (identifier.equals(req.getTarget())) { Verify.verify(req instanceof LocalHistoryRequest); - successor.connection.sendRequest(req, e.getCallback()); + if (req instanceof DestroyLocalHistoryRequest) { + successor.connection.enqueueRequest(req, e.getCallback(), e.getEnqueuedTicks()); + it.remove(); + break; + } } } } @@ -257,22 +273,34 @@ abstract class ProxyHistory implements Identifiable { } @Override - void forwardRequest(final Request request, final Consumer> callback, - final BiConsumer, Consumer>> forwardTo) throws RequestException { - // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the - // period required to get into the queue. + void replayEntry(final ConnectionEntry entry, final Consumer replayTo) + throws RequestException { + final Request request = entry.getRequest(); if (request instanceof TransactionRequest) { - forwardTransactionRequest((TransactionRequest) request, callback); + lookupProxy(request).replayRequest((TransactionRequest) request, entry.getCallback(), + entry.getEnqueuedTicks()); } else if (request instanceof LocalHistoryRequest) { - forwardTo.accept(request, callback); + replayTo.accept(entry); } else { throw new IllegalArgumentException("Unhandled request " + request); } } - private void forwardTransactionRequest(final TransactionRequest request, - final Consumer> callback) throws RequestException { + @Override + void forwardEntry(final ConnectionEntry entry, final Consumer forwardTo) + throws RequestException { + final Request request = entry.getRequest(); + if (request instanceof TransactionRequest) { + lookupProxy(request).forwardRequest((TransactionRequest) request, entry.getCallback()); + } else if (request instanceof LocalHistoryRequest) { + forwardTo.accept(entry); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + private AbstractProxyTransaction lookupProxy(final Request request) + throws RequestReplayException { final AbstractProxyTransaction proxy; lock.lock(); try { @@ -280,11 +308,11 @@ abstract class ProxyHistory implements Identifiable { } finally { lock.unlock(); } - if (proxy == null) { - throw new RequestReplayException("Failed to find proxy for %s", request); + if (proxy != null) { + return proxy; } - proxy.forwardRequest(request, callback); + throw new RequestReplayException("Failed to find proxy for %s", request); } } @@ -302,14 +330,14 @@ abstract class ProxyHistory implements Identifiable { private ProxyHistory(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - this.parent = Preconditions.checkNotNull(parent); - this.connection = Preconditions.checkNotNull(connection); - this.identifier = Preconditions.checkNotNull(identifier); + this.parent = requireNonNull(parent); + this.connection = requireNonNull(connection); + this.identifier = requireNonNull(identifier); } static ProxyHistory createClient(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); + final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); return dataTree.isPresent() ? new Local(parent, connection, identifier, dataTree.get()) : new Remote(parent, connection, identifier); } @@ -317,7 +345,7 @@ abstract class ProxyHistory implements Identifiable { static ProxyHistory createSingle(final AbstractClientHistory parent, final AbstractClientConnection connection, final LocalHistoryIdentifier identifier) { - final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); + final Optional dataTree = connection.getBackendInfo().flatMap(ShardBackendInfo::getDataTree); return dataTree.isPresent() ? new LocalSingle(parent, connection, identifier, dataTree.get()) : new RemoteSingle(parent, connection, identifier); } @@ -327,6 +355,14 @@ abstract class ProxyHistory implements Identifiable { return identifier; } + final ClientActorContext context() { + return connection.context(); + } + + final long currentTime() { + return connection.currentTime(); + } + final ActorRef localActor() { return connection.localActor(); } @@ -337,14 +373,19 @@ abstract class ProxyHistory implements Identifiable { final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly) { + return createTransactionProxy(txId, snapshotOnly, false); + } + + AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly, + final boolean isDone) { lock.lock(); try { if (successor != null) { - return successor.createTransactionProxy(txId, snapshotOnly); + return successor.createTransactionProxy(txId, snapshotOnly, isDone); } final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); - final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly); + final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone); proxies.put(proxyId, ret); LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); return ret; @@ -356,8 +397,8 @@ abstract class ProxyHistory implements Identifiable { final void abortTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); - LOG.debug("Proxy {} aborting transaction {}", this, tx); + // Removal will be completed once purge completes + LOG.debug("Proxy {} aborted transaction {}", this, tx); onTransactionAborted(tx); } finally { lock.unlock(); @@ -367,7 +408,7 @@ abstract class ProxyHistory implements Identifiable { final void completeTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); + // Removal will be completed once purge completes LOG.debug("Proxy {} completing transaction {}", this, tx); onTransactionCompleted(tx); } finally { @@ -375,6 +416,16 @@ abstract class ProxyHistory implements Identifiable { } } + void purgeTransaction(final AbstractProxyTransaction tx) { + lock.lock(); + try { + proxies.remove(tx.getIdentifier()); + LOG.debug("Proxy {} purged transaction {}", this, tx); + } finally { + lock.unlock(); + } + } + final void close() { lock.lock(); try { @@ -391,14 +442,21 @@ abstract class ProxyHistory implements Identifiable { } } + final void enqueueRequest(final TransactionRequest request, final Consumer> callback, + final long enqueuedTicks) { + connection.enqueueRequest(request, callback, enqueuedTicks); + } + final void sendRequest(final TransactionRequest request, final Consumer> callback) { connection.sendRequest(request, callback); } @GuardedBy("lock") + @SuppressWarnings("checkstyle:hiddenField") abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, - TransactionIdentifier txId, boolean snapshotOnly); + TransactionIdentifier txId, boolean snapshotOnly, boolean isDone); + @SuppressWarnings("checkstyle:hiddenField") abstract ProxyHistory createSuccessor(AbstractClientConnection connection); @SuppressFBWarnings(value = "UL_UNRELEASED_LOCK", justification = "Lock is released asynchronously via the cohort") @@ -436,12 +494,12 @@ abstract class ProxyHistory implements Identifiable { LOG.debug("Proxy {} purge completed with {}", this, response); } - @GuardedBy("lock") + @Holding("lock") void onTransactionAborted(final AbstractProxyTransaction tx) { // No-op for most implementations } - @GuardedBy("lock") + @Holding("lock") void onTransactionCompleted(final AbstractProxyTransaction tx) { // No-op for most implementations }