BUG-5280: Create AbstractProxyHistory class 77/44277/4
authorRobert Varga <rovarga@cisco.com>
Thu, 18 Aug 2016 14:05:32 +0000 (16:05 +0200)
committerTom Pantelis <tpanteli@brocade.com>
Wed, 24 Aug 2016 15:43:29 +0000 (15:43 +0000)
Given the connection-oriented nature of SequencedQueue, we
really need to properly encapsulate various aspects of the client,
so we can perform proper state propagation, both during message
transmission and on reconnection.

This is a first step in that direction, which encapsulates client's
sendRequest() and self() methods at proper levels. It furthermore
makes state tracking in proxies consistent with state tracking in
their aggregate counterparts, hence each ProxyTransaction is guaranteed
to have an associated ProxyHistory.

Change-Id: I8c15b234ec813ac427e63a6e077ae17cde443be3
Signed-off-by: Robert Varga <rovarga@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractClientHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientLocalHistory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/ClientTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/DistributedDataStoreClientBehavior.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyTransaction.java

index fda9a16..7608556 100644 (file)
@@ -34,7 +34,7 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
     private static final AtomicReferenceFieldUpdater<AbstractClientHistory, State> STATE_UPDATER =
             AtomicReferenceFieldUpdater.newUpdater(AbstractClientHistory.class, State.class, "state");
 
-    private final Map<Long, LocalHistoryIdentifier> histories = new ConcurrentHashMap<>();
+    private final Map<Long, AbstractProxyHistory> histories = new ConcurrentHashMap<>();
     private final DistributedDataStoreClientBehavior client;
     private final LocalHistoryIdentifier identifier;
 
@@ -55,19 +55,6 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         Preconditions.checkState(success, "Race condition detected, state changed from %s to %s", expected, state);
     }
 
-    private LocalHistoryIdentifier getHistoryForCookie(final Long cookie) {
-        LocalHistoryIdentifier ret = histories.get(cookie);
-        if (ret == null) {
-            ret = new LocalHistoryIdentifier(identifier.getClientId(), identifier.getHistoryId(), cookie);
-            final LocalHistoryIdentifier existing = histories.putIfAbsent(cookie, ret);
-            if (existing != null) {
-                ret = existing;
-            }
-        }
-
-        return ret;
-    }
-
     @Override
     public final LocalHistoryIdentifier getIdentifier() {
         return identifier;
@@ -83,9 +70,15 @@ abstract class AbstractClientHistory extends LocalAbortable implements Identifia
         state = State.CLOSED;
     }
 
+    private AbstractProxyHistory createHistoryProxy(final Long shard) {
+        final LocalHistoryIdentifier historyId = new LocalHistoryIdentifier(identifier.getClientId(),
+            identifier.getHistoryId(), shard);
+        return AbstractProxyHistory.create(client, client.resolver().getFutureBackendInfo(shard), historyId);
+    }
+
     final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier transactionId, final Long shard) {
-        return AbstractProxyTransaction.create(client, getHistoryForCookie(shard),
-            transactionId.getTransactionId(), client.resolver().getFutureBackendInfo(shard));
+        final AbstractProxyHistory history = histories.computeIfAbsent(shard, this::createHistoryProxy);
+        return history.createTransactionProxy(transactionId);
     }
 
     /**
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/AbstractProxyHistory.java
new file mode 100644 (file)
index 0000000..9093c08
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.concepts.Identifiable;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+/**
+ * Per-connection representation of a local history.
+ *
+ * @author Robert Varga
+ */
+abstract class AbstractProxyHistory implements Identifiable<LocalHistoryIdentifier> {
+    // FIXME: this should really be ClientConnection
+    private final DistributedDataStoreClientBehavior client;
+    private final LocalHistoryIdentifier identifier;
+
+    AbstractProxyHistory(final DistributedDataStoreClientBehavior client, final LocalHistoryIdentifier identifier) {
+        this.client = Preconditions.checkNotNull(client);
+        this.identifier = Preconditions.checkNotNull(identifier);
+    }
+
+    static AbstractProxyHistory create(final DistributedDataStoreClientBehavior client,
+            final Optional<ShardBackendInfo> backendInfo, final LocalHistoryIdentifier identifier) {
+        final Optional<DataTree> dataTree = backendInfo.flatMap(ShardBackendInfo::getDataTree);
+        return dataTree.isPresent() ? new LocalProxyHistory(client, identifier, dataTree.get()) : new RemoteProxyHistory(client, identifier);
+    }
+
+    @Override
+    public LocalHistoryIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    final ActorRef localActor() {
+        return client.self();
+    }
+
+    final AbstractProxyTransaction createTransactionProxy(final TransactionIdentifier txId) {
+        return doCreateTransactionProxy(client, new TransactionIdentifier(identifier, txId.getTransactionId()));
+    }
+
+    abstract AbstractProxyTransaction doCreateTransactionProxy(DistributedDataStoreClientBehavior client,
+            TransactionIdentifier txId);
+}
index cd104b5..8ff8b8e 100644 (file)
@@ -7,12 +7,14 @@
  */
 package org.opendaylight.controller.cluster.databroker.actors.dds;
 
+import akka.actor.ActorRef;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Verify;
 import com.google.common.util.concurrent.CheckedFuture;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import java.util.function.Consumer;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionAbortSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionCanCommitSuccess;
@@ -21,14 +23,13 @@ import org.opendaylight.controller.cluster.access.commands.TransactionDoCommitRe
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitRequest;
 import org.opendaylight.controller.cluster.access.commands.TransactionPreCommitSuccess;
 import org.opendaylight.controller.cluster.access.commands.TransactionRequest;
-import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
 import org.opendaylight.controller.cluster.access.concepts.RequestFailure;
+import org.opendaylight.controller.cluster.access.concepts.Response;
 import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
 import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.yangtools.concepts.Identifiable;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
 
 /**
  * Class translating transaction operations towards a particular backend shard.
@@ -51,33 +52,8 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         this.client = Preconditions.checkNotNull(client);
     }
 
-    /**
-     * Instantiate a new tracker for a transaction. This method bases its decision on which implementation to use
-     * based on provided {@link ShardBackendInfo}. If no information is present, it will choose the remote
-     * implementation, which is fine, as the queueing logic in ClientActorBehavior will hold on to the requests until
-     * the backend is located.
-     *
-     * @param client Client behavior
-     * @param historyId Local history identifier
-     * @param transactionId Transaction identifier
-     * @param backend Optional backend identifier
-     * @return A new state tracker
-     */
-    static AbstractProxyTransaction create(final DistributedDataStoreClientBehavior client,
-            final LocalHistoryIdentifier historyId, final long transactionId,
-            final java.util.Optional<ShardBackendInfo> backend) {
-
-        final java.util.Optional<DataTree> dataTree = backend.flatMap(ShardBackendInfo::getDataTree);
-        final TransactionIdentifier identifier = new TransactionIdentifier(historyId, transactionId);
-        if (dataTree.isPresent()) {
-            return new LocalProxyTransaction(client, identifier, dataTree.get().takeSnapshot());
-        } else {
-            return new RemoteProxyTransaction(client, identifier);
-        }
-    }
-
-    final DistributedDataStoreClientBehavior client() {
-        return client;
+    final ActorRef localActor() {
+        return client.self();
     }
 
     final long nextSequence() {
@@ -109,6 +85,10 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         return doRead(path);
     }
 
+    final void sendRequest(final TransactionRequest<?> request, final Consumer<Response<?, ?>> completer) {
+        client.sendRequest(request, completer);
+    }
+
     /**
      * Seal this transaction before it is either
      */
@@ -141,7 +121,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
         checkSealed();
 
         final SettableFuture<Boolean> ret = SettableFuture.create();
-        client().sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
+        sendRequest(Verify.verifyNotNull(doCommit(false)), t -> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.set(Boolean.TRUE);
             } else if (t instanceof RequestFailure) {
@@ -156,7 +136,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void abort(final VotingFuture<Void> ret) {
         checkSealed();
 
-        client.sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), client().self()), t -> {
+        sendRequest(new TransactionAbortRequest(getIdentifier(), nextSequence(), localActor()), t -> {
             if (t instanceof TransactionAbortSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -170,7 +150,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void canCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
+        sendRequest(Verify.verifyNotNull(doCommit(true)), t -> {
             if (t instanceof TransactionCanCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -184,7 +164,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void preCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
+        sendRequest(new TransactionPreCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
             if (t instanceof TransactionPreCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
@@ -198,7 +178,7 @@ abstract class AbstractProxyTransaction implements Identifiable<TransactionIdent
     void doCommit(final VotingFuture<?> ret) {
         checkSealed();
 
-        client.sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), client().self()), t-> {
+        sendRequest(new TransactionDoCommitRequest(getIdentifier(), nextSequence(), localActor()), t-> {
             if (t instanceof TransactionCommitSuccess) {
                 ret.voteYes();
             } else if (t instanceof RequestFailure) {
index 8f2ee88..be94e3e 100644 (file)
@@ -43,8 +43,8 @@ public final class ClientLocalHistory extends AbstractClientHistory implements A
         Preconditions.checkState(local == State.IDLE, "Local history %s state is %s", this, local);
         updateState(local, State.TX_OPEN);
 
-        return new ClientTransaction(getClient(), this,
-            new TransactionIdentifier(getIdentifier(), NEXT_TX_UPDATER.getAndIncrement(this)));
+        return new ClientTransaction(this, new TransactionIdentifier(getIdentifier(),
+            NEXT_TX_UPDATER.getAndIncrement(this)));
     }
 
     @Override
index 10d64ed..e8e75e9 100644 (file)
@@ -62,8 +62,7 @@ public final class ClientTransaction extends LocalAbortable implements Identifia
 
     private volatile int state = OPEN_STATE;
 
-    ClientTransaction(final DistributedDataStoreClientBehavior client, final AbstractClientHistory parent,
-        final TransactionIdentifier transactionId) {
+    ClientTransaction(final AbstractClientHistory parent, final TransactionIdentifier transactionId) {
         this.transactionId = Preconditions.checkNotNull(transactionId);
         this.parent = Preconditions.checkNotNull(parent);
     }
index b84008c..dd4f1aa 100644 (file)
@@ -154,7 +154,7 @@ final class DistributedDataStoreClientBehavior extends ClientActorBehavior imple
     public ClientTransaction createTransaction() {
         final TransactionIdentifier txId = new TransactionIdentifier(singleHistory.getIdentifier(),
             nextTransactionId.getAndIncrement());
-        final ClientTransaction tx = new ClientTransaction(this, singleHistory, txId);
+        final ClientTransaction tx = new ClientTransaction(singleHistory, txId);
         LOG.debug("{}: creating a new transaction {}", persistenceId(), tx);
 
         return returnIfOperational(transactions, txId, tx, aborted);
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/LocalProxyHistory.java
new file mode 100644 (file)
index 0000000..8ccc4a6
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTree;
+
+final class LocalProxyHistory extends AbstractProxyHistory {
+    private final DataTree dataTree;
+
+    LocalProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier, DataTree dataTree) {
+        super(client, identifier);
+        this.dataTree = Preconditions.checkNotNull(dataTree);
+    }
+
+    @Override
+    AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
+            final TransactionIdentifier txId) {
+        // FIXME: this violates history contract: we should use the last submitted transaction instead to ensure
+        //        causality
+        return new LocalProxyTransaction(client, txId, dataTree.takeSnapshot());
+    }
+}
\ No newline at end of file
index 9e787f1..b5eadb5 100644 (file)
@@ -87,13 +87,13 @@ final class LocalProxyTransaction extends AbstractProxyTransaction {
 
     @Override
     void doAbort() {
-        client().sendRequest(new AbortLocalTransactionRequest(identifier, client().self()), ABORT_COMPLETER);
+        sendRequest(new AbortLocalTransactionRequest(identifier, localActor()), ABORT_COMPLETER);
         modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been aborted"));
     }
 
     @Override
     CommitLocalTransactionRequest doCommit(final boolean coordinated) {
-        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, client().self(),
+        final CommitLocalTransactionRequest ret = new CommitLocalTransactionRequest(identifier, localActor(),
             modification, coordinated);
         modification = new FailedDataTreeModification(() -> new IllegalStateException("Tracker has been submitted"));
         return ret;
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/actors/dds/RemoteProxyHistory.java
new file mode 100644 (file)
index 0000000..c596d31
--- /dev/null
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker.actors.dds;
+
+import org.opendaylight.controller.cluster.access.concepts.LocalHistoryIdentifier;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+
+final class RemoteProxyHistory extends AbstractProxyHistory {
+    RemoteProxyHistory(DistributedDataStoreClientBehavior client, LocalHistoryIdentifier identifier) {
+        super(client, identifier);
+    }
+
+    @Override
+    AbstractProxyTransaction doCreateTransactionProxy(final DistributedDataStoreClientBehavior client,
+            final TransactionIdentifier txId) {
+        return new RemoteProxyTransaction(client, txId);
+    }
+}
\ No newline at end of file
index 9fb1b89..bb21223 100644 (file)
@@ -62,7 +62,7 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     RemoteProxyTransaction(final DistributedDataStoreClientBehavior client,
         final TransactionIdentifier identifier) {
         super(client);
-        builder = new ModifyTransactionRequestBuilder(identifier, client.self());
+        builder = new ModifyTransactionRequestBuilder(identifier, localActor());
     }
 
     @Override
@@ -95,21 +95,21 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
 
         // Make sure we send any modifications before issuing a read
         ensureFlushedBuider();
-        client().sendRequest(request, completer);
+        sendRequest(request, completer);
         return MappingCheckedFuture.create(future, ReadFailedException.MAPPER);
     }
 
     @Override
     CheckedFuture<Boolean, ReadFailedException> doExists(final YangInstanceIdentifier path) {
         final SettableFuture<Boolean> future = SettableFuture.create();
-        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
+        return sendReadRequest(new ExistsTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
             t -> completeExists(future, t), future);
     }
 
     @Override
     CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> doRead(final YangInstanceIdentifier path) {
         final SettableFuture<Optional<NormalizedNode<?, ?>>> future = SettableFuture.create();
-        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), client().self(), path),
+        return sendReadRequest(new ReadTransactionRequest(getIdentifier(), nextSequence(), localActor(), path),
             t -> completeRead(future, t), future);
     }
 
@@ -134,8 +134,10 @@ final class RemoteProxyTransaction extends AbstractProxyTransaction {
     }
 
     private void flushBuilder() {
-        client().sendRequest(builder.build(), this::completeModify);
+        final ModifyTransactionRequest message = builder.build();
         builderBusy = false;
+
+        sendRequest(message, this::completeModify);
     }
 
     private void appendModification(final TransactionModification modification) {