X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=opendaylight%2Fmd-sal%2Fsal-distributed-datastore%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fcontroller%2Fcluster%2Fdatabroker%2Factors%2Fdds%2FProxyHistory.java;h=e26e00fa13a98c42d4236cce29050a55f1f020a9;hb=1e07329c0d800b8fea43ae0c4060aded5fd18739;hp=88a79775baa94730eba45a01ed8e229c0b701c69;hpb=20ece8c549211d1c453f1763132bb0a0ca7be0e0;p=controller.git diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java index 88a79775ba..e26e00fa13 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ProxyHistory.java @@ -19,7 +19,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.function.BiConsumer; import java.util.function.Consumer; import javax.annotation.concurrent.GuardedBy; import org.opendaylight.controller.cluster.access.client.AbstractClientConnection; @@ -86,9 +85,15 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { Preconditions.checkState(lastOpen == null, "Proxy %s has %s currently open", this, lastOpen); + if (isDone) { + // Done transactions do not register on our radar on should not have any state associated. + return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId) + : new LocalReadWriteProxyTransaction(this, txId); + } + // onTransactionCompleted() runs concurrently final LocalReadWriteProxyTransaction localSealed = lastSealed; final DataTreeSnapshot baseSnapshot; @@ -145,7 +150,7 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { final DataTreeSnapshot snapshot = takeSnapshot(); return snapshotOnly ? new LocalReadOnlyProxyTransaction(this, txId, snapshot) : new LocalReadWriteProxyTransaction(this, txId, snapshot); @@ -165,8 +170,8 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { - return new RemoteProxyTransaction(this, txId, snapshotOnly, true); + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, true, isDone); } @Override @@ -183,8 +188,8 @@ abstract class ProxyHistory implements Identifiable { @Override AbstractProxyTransaction doCreateTransactionProxy(final AbstractClientConnection connection, - final TransactionIdentifier txId, final boolean snapshotOnly) { - return new RemoteProxyTransaction(this, txId, snapshotOnly, false); + final TransactionIdentifier txId, final boolean snapshotOnly, final boolean isDone) { + return new RemoteProxyTransaction(this, txId, snapshotOnly, false, isDone); } @Override @@ -231,11 +236,8 @@ abstract class ProxyHistory implements Identifiable { } for (AbstractProxyTransaction t : proxies.values()) { - LOG.debug("{} creating successor transaction proxy for {}", identifier, t); - final AbstractProxyTransaction newProxy = successor.createTransactionProxy(t.getIdentifier(), - t.isSnapshotOnly()); - LOG.debug("{} created successor transaction proxy {}", identifier, newProxy); - t.replayMessages(newProxy, previousEntries); + LOG.debug("{} replaying messages to old proxy {} towards successor {}", identifier, t, successor); + t.replayMessages(successor, previousEntries); } // Now look for any finalizing messages @@ -269,22 +271,34 @@ abstract class ProxyHistory implements Identifiable { } @Override - void forwardRequest(final Request request, final Consumer> callback, - final BiConsumer, Consumer>> forwardTo) throws RequestException { - // FIXME: do not use sendRequest() once we have throttling in place, as we have already waited the - // period required to get into the queue. + void replayEntry(final ConnectionEntry entry, final Consumer replayTo) + throws RequestException { + final Request request = entry.getRequest(); if (request instanceof TransactionRequest) { - forwardTransactionRequest((TransactionRequest) request, callback); + lookupProxy(request).replayRequest((TransactionRequest) request, entry.getCallback(), + entry.getEnqueuedTicks()); } else if (request instanceof LocalHistoryRequest) { - forwardTo.accept(request, callback); + replayTo.accept(entry); } else { throw new IllegalArgumentException("Unhandled request " + request); } } - private void forwardTransactionRequest(final TransactionRequest request, - final Consumer> callback) throws RequestException { + @Override + void forwardEntry(final ConnectionEntry entry, final Consumer forwardTo) + throws RequestException { + final Request request = entry.getRequest(); + if (request instanceof TransactionRequest) { + lookupProxy(request).forwardRequest((TransactionRequest) request, entry.getCallback()); + } else if (request instanceof LocalHistoryRequest) { + forwardTo.accept(entry); + } else { + throw new IllegalArgumentException("Unhandled request " + request); + } + } + private AbstractProxyTransaction lookupProxy(final Request request) + throws RequestReplayException { final AbstractProxyTransaction proxy; lock.lock(); try { @@ -292,11 +306,11 @@ abstract class ProxyHistory implements Identifiable { } finally { lock.unlock(); } - if (proxy == null) { - throw new RequestReplayException("Failed to find proxy for %s", request); + if (proxy != null) { + return proxy; } - proxy.forwardRequest(request, callback); + throw new RequestReplayException("Failed to find proxy for %s", request); } } @@ -357,14 +371,19 @@ abstract class ProxyHistory implements Identifiable { final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly) { + return createTransactionProxy(txId, snapshotOnly, false); + } + + AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId, final boolean snapshotOnly, + final boolean isDone) { lock.lock(); try { if (successor != null) { - return successor.createTransactionProxy(txId, snapshotOnly); + return successor.createTransactionProxy(txId, snapshotOnly, isDone); } final TransactionIdentifier proxyId = new TransactionIdentifier(identifier, txId.getTransactionId()); - final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly); + final AbstractProxyTransaction ret = doCreateTransactionProxy(connection, proxyId, snapshotOnly, isDone); proxies.put(proxyId, ret); LOG.debug("Allocated proxy {} for transaction {}", proxyId, txId); return ret; @@ -387,7 +406,7 @@ abstract class ProxyHistory implements Identifiable { final void completeTransaction(final AbstractProxyTransaction tx) { lock.lock(); try { - proxies.remove(tx.getIdentifier()); + // Removal will be completed once purge completes LOG.debug("Proxy {} completing transaction {}", this, tx); onTransactionCompleted(tx); } finally { @@ -395,6 +414,16 @@ abstract class ProxyHistory implements Identifiable { } } + void purgeTransaction(final AbstractProxyTransaction tx) { + lock.lock(); + try { + proxies.remove(tx.getIdentifier()); + LOG.debug("Proxy {} purged transaction {}", this, tx); + } finally { + lock.unlock(); + } + } + final void close() { lock.lock(); try { @@ -422,7 +451,7 @@ abstract class ProxyHistory implements Identifiable { @GuardedBy("lock") abstract AbstractProxyTransaction doCreateTransactionProxy(AbstractClientConnection connection, - TransactionIdentifier txId, boolean snapshotOnly); + TransactionIdentifier txId, boolean snapshotOnly, boolean isDone); abstract ProxyHistory createSuccessor(AbstractClientConnection connection);