From: Robert Varga Date: Thu, 18 Aug 2016 14:05:32 +0000 (+0200) Subject: BUG-5280: Create AbstractProxyHistory class X-Git-Tag: release/carbon~505 X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?p=controller.git;a=commitdiff_plain;h=1d34f75864ac09d31ef0f7b4ef59f7434167ae15 BUG-5280: Create AbstractProxyHistory class Given the connection-oriented nature of SequencedQueue, we really need to properly encapsulate various aspects of the client, so we can perform proper state propagation, both during message transmission and on reconnection. This is a first step in that direction, which encapsulates client's sendRequest() and self() methods at proper levels. It furthermore makes state tracking in proxies consistent with state tracking in their aggregate counterparts, hence each ProxyTransaction is guaranteed to have an associated ProxyHistory. Change-Id: I8c15b234ec813ac427e63a6e077ae17cde443be3 Signed-off-by: Robert Varga --- diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java index fda9a16ab8..7608556e6e 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java @@ -34,7 +34,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state"); - private final Map histories = new ConcurrentHashMap<>(); + private final Map histories = new ConcurrentHashMap<>(); private final DistributedDataStoreClientBehavior client; private final LocalHistoryIdentifier identifier; @@ -55,19 +55,6 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state); } - private LocalHistoryIdentifier getHistoryForCookie(final Long cookie) { - LocalHistoryIdentifier ret = histories.get(cookie); - if (ret == null) { - ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie); - final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret); - if (existing != null) { - ret = existing; - } - } - - return ret; - } - @Override public final LocalHistoryIdentifier getIdentifier() { return identifier; @@ -83,9 +70,15 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia state = State.CLOSED; } + private AbstractProxyHistory createHistoryProxy(final Long shard) { + final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(), + identifier.getHistoryId(), shard); + return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId); + } + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) { - return AbstractProxyTransaction.create(client, getHistoryForCookie(shard), - transactionId.getTransactionId(), client.resolver().getFutureBackendInfo(shard)); + final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy); + return history.createTransactionProxy(transactionId); } /** diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java new file mode 100644 index 0000000000..9093c08097 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import akka.actor.ActorRef; +import com.google.common.base.Preconditions; +import java.util.Optional; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.concepts.Identifiable; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +/** + * Per-connection representation of a local history. + * + * @author Robert Varga + */ +abstract class AbstractProxyHistory implements Identifiable { + // FIXME: this should really be ClientConnection + private final DistributedDataStoreClientBehavior client; + private final LocalHistoryIdentifier identifier; + + AbstractProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) { + this.client = Preconditions.checkNotNull(client); + this.identifier = Preconditions.checkNotNull(identifier); + } + + static AbstractProxyHistory create(final DistributedDataStoreClientBehavior client, + final Optional backendInfo, final LocalHistoryIdentifier identifier) { + final Optional dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree); + return dataTree.isPresent() ? new LocalProxyHistory(client, identifier, dataTree.get()) : new RemoteProxyHistory(client, identifier); + } + + @Override + public LocalHistoryIdentifier getIdentifier() { + return identifier; + } + + final ActorRef localActor() { + return client.self(); + } + + final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) { + return doCreateTransactionProxy(client, new TransactionIdentifier(identifier, txId.getTransactionId())); + } + + abstract AbstractProxyTransaction doCreateTransactionProxy(DistributedDataStoreClientBehavior client, + TransactionIdentifier txId); +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java index cd104b597b..8ff8b8eff9 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java @@ -7,12 +7,14 @@ */ package org.opendaylight.controller.cluster.databroker.actors.dds; +import akka.actor.ActorRef; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Verify; import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import java.util.function.Consumer; import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest; import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess; @@ -21,14 +23,13 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest; import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess; import org.opendaylight.controller.cluster.access.commands.TransactionRequest; -import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; import org.opendaylight.controller.cluster.access.concepts.RequestFailure; +import org.opendaylight.controller.cluster.access.concepts.Response; import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException; import org.opendaylight.yangtools.concepts.Identifiable; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; -import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; /** * Class translating transaction operations towards a particular backend shard. @@ -51,33 +52,8 @@ abstract class AbstractProxyTransaction implements Identifiable backend) { - - final java.util.Optional dataTree = backend.flatMap(ShardBackendInfo::getDataTree); - final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId); - if (dataTree.isPresent()) { - return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot()); - } else { - return new RemoteProxyTransaction(client, identifier); - } - } - - final DistributedDataStoreClientBehavior client() { - return client; + final ActorRef localActor() { + return client.self(); } final long nextSequence() { @@ -109,6 +85,10 @@ abstract class AbstractProxyTransaction implements Identifiable request, final Consumer> completer) { + client.sendRequest(request, completer); + } + /** * Seal this transaction before it is either */ @@ -141,7 +121,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret = SettableFuture.create(); - client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> { + sendRequest(Verify.verifyNotNull(doCommit(false)), t -> { if (t instanceof TransactionCommitSuccess) { ret.set(Boolean.TRUE); } else if (t instanceof RequestFailure) { @@ -156,7 +136,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> { + sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> { if (t instanceof TransactionAbortSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -170,7 +150,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> { + sendRequest(Verify.verifyNotNull(doCommit(true)), t -> { if (t instanceof TransactionCanCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -184,7 +164,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> { + sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> { if (t instanceof TransactionPreCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { @@ -198,7 +178,7 @@ abstract class AbstractProxyTransaction implements Identifiable ret) { checkSealed(); - client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> { + sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> { if (t instanceof TransactionCommitSuccess) { ret.voteYes(); } else if (t instanceof RequestFailure) { diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java index 8f2ee88563..be94e3ee6a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java @@ -43,8 +43,8 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local); updateState(local, State.TX_OPEN); - return new ClientTransaction(getClient(), this, - new TransactionIdentifier(getIdentifier(), NEXT_TX_UPDATER.getAndIncrement(this))); + return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(), + NEXT_TX_UPDATER.getAndIncrement(this))); } @Override diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java index 10d64ed9bb..e8e75e90e2 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java @@ -62,8 +62,7 @@ public final class ClientTransaction extends LocalAbortable implements Identifia private volatile int state = OPEN_STATE; - ClientTransaction(final DistributedDataStoreClientBehavior client, final AbstractClientHistory parent, - final TransactionIdentifier transactionId) { + ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) { this.transactionId = Preconditions.checkNotNull(transactionId); this.parent = Preconditions.checkNotNull(parent); } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java index b84008ca39..dd4f1aadc6 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java @@ -154,7 +154,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple public ClientTransaction createTransaction() { final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(), nextTransactionId.getAndIncrement()); - final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId); + final ClientTransaction tx = new ClientTransaction(singleHistory, txId); LOG.debug("{}: creating a new transaction {}", persistenceId(), tx); return returnIfOperational(transactions, txId, tx, aborted); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java new file mode 100644 index 0000000000..8ccc4a6353 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree; + +final class LocalProxyHistory extends AbstractProxyHistory { + private final DataTree dataTree; + + LocalProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier, DataTree dataTree) { + super(client, identifier); + this.dataTree = Preconditions.checkNotNull(dataTree); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client, + final TransactionIdentifier txId) { + // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure + // causality + return new LocalProxyTransaction(client, txId, dataTree.takeSnapshot()); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java index 9e787f12e1..b5eadb5abe 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java @@ -87,13 +87,13 @@ final class LocalProxyTransaction extends AbstractProxyTransaction { @Override void doAbort() { - client().sendRequest(new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER); + sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER); modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted")); } @Override CommitLocalTransactionRequest doCommit(final boolean coordinated) { - final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(), + final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(), modification, coordinated); modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted")); return ret; diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java new file mode 100644 index 0000000000..c596d31f84 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2016 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.databroker.actors.dds; + +import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier; +import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier; + +final class RemoteProxyHistory extends AbstractProxyHistory { + RemoteProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier) { + super(client, identifier); + } + + @Override + AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client, + final TransactionIdentifier txId) { + return new RemoteProxyTransaction(client, txId); + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java index 9fb1b89580..bb21223aab 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java @@ -62,7 +62,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { RemoteProxyTransaction(final DistributedDataStoreClientBehavior client, final TransactionIdentifier identifier) { super(client); - builder = new ModifyTransactionRequestBuilder(identifier, client.self()); + builder = new ModifyTransactionRequestBuilder(identifier, localActor()); } @Override @@ -95,21 +95,21 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { // Make sure we send any modifications before issuing a read ensureFlushedBuider(); - client().sendRequest(request, completer); + sendRequest(request, completer); return MappingCheckedFuture.create(future, ReadFailedException.MAPPER); } @Override CheckedFuture doExists(final YangInstanceIdentifier path) { final SettableFuture future = SettableFuture.create(); - return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path), + return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), t -> completeExists(future, t), future); } @Override CheckedFuture>, ReadFailedException> doRead(final YangInstanceIdentifier path) { final SettableFuture>> future = SettableFuture.create(); - return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path), + return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path), t -> completeRead(future, t), future); } @@ -134,8 +134,10 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction { } private void flushBuilder() { - client().sendRequest(builder.build(), this::completeModify); + final ModifyTransactionRequest message = builder.build(); builderBusy = false; + + sendRequest(message, this::completeModify); } private void appendModification(final TransactionModification modification) {