BUG-5280: split DistributedDataStore 07/48707/48
authorRobert Varga <rovarga@cisco.com>
Fri, 25 Nov 2016 12:55:13 +0000 (13:55 +0100)
committerRobert Varga <rovarga@cisco.com>
Mon, 19 Dec 2016 13:59:05 +0000 (14:59 +0100)
Split the DistributedDataStore into two components into
an abstract base class and concretization running with
TransactionProxies. Add another concretization, which uses
DataStoreClient to instantiate requests.

Change-Id: I454eec76d54c2fd4e4ea1e5cd16d12398eec81f0
Signed-off-by: Robert Varga <rovarga@cisco.com>
26 files changed:
opendaylight/md-sal/sal-cluster-admin-impl/src/test/java/org/opendaylight/controller/cluster/datastore/admin/ClusterAdminRpcServiceTest.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedDataStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransactionChain.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedWriteTransaction.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMStoreThreePhaseCommitCohortAdaptor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java [deleted file]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DatastoreContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataTreeCohortIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreRemotingIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/IntegrationTestKit.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/MemberNode.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/DistributedEntityOwnershipServiceTest.java

index 07acc18..113c9fa 100644 (file)
@@ -47,8 +47,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.MemberNode;
 import org.opendaylight.controller.cluster.datastore.MemberNode.RaftStateVerifier;
 import org.opendaylight.controller.cluster.datastore.Shard;
@@ -246,8 +246,8 @@ public class ClusterAdminRpcServiceTest {
         verifyFailedRpcResult(rpcResult);
     }
 
-    private static NormalizedNode<?, ?> writeCarsNodeAndVerify(DistributedDataStore writeToStore,
-            DistributedDataStore readFromStore) throws Exception {
+    private static NormalizedNode<?, ?> writeCarsNodeAndVerify(AbstractDataStore writeToStore,
+            AbstractDataStore readFromStore) throws Exception {
         DOMStoreWriteTransaction writeTx = writeToStore.newWriteOnlyTransaction();
         NormalizedNode<?, ?> carsNode = CarsModel.create();
         writeTx.write(CarsModel.BASE_PATH, carsNode);
@@ -262,7 +262,7 @@ public class ClusterAdminRpcServiceTest {
         return carsNode;
     }
 
-    private static void readCarsNodeAndVerify(DistributedDataStore readFromStore,
+    private static void readCarsNodeAndVerify(AbstractDataStore readFromStore,
             NormalizedNode<?, ?> expCarsNode) throws Exception {
         Optional<NormalizedNode<?, ?>> optional = readFromStore.newReadOnlyTransaction()
                 .read(CarsModel.BASE_PATH).get(15, TimeUnit.SECONDS);
@@ -636,7 +636,7 @@ public class ClusterAdminRpcServiceTest {
                 successShardResult("cars", DataStoreType.Operational),
                 successShardResult("people", DataStoreType.Operational));
 
-        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+        verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
                 new String[]{"cars", "people"}, new SimpleEntry<>("member-1", true),
@@ -685,7 +685,7 @@ public class ClusterAdminRpcServiceTest {
                 successShardResult("cars", DataStoreType.Operational),
                 successShardResult("people", DataStoreType.Operational));
 
-        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+        verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
                 new String[]{"cars", "people"},
@@ -714,7 +714,7 @@ public class ClusterAdminRpcServiceTest {
                 successShardResult("cars", DataStoreType.Operational),
                 successShardResult("people", DataStoreType.Operational));
 
-        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+        verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
                 new String[]{"cars", "people"},
@@ -779,7 +779,7 @@ public class ClusterAdminRpcServiceTest {
                 successShardResult("cars", DataStoreType.Operational),
                 successShardResult("people", DataStoreType.Operational));
 
-        verifyVotingStates(new DistributedDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
+        verifyVotingStates(new AbstractDataStore[]{replicaNode1.configDataStore(), replicaNode1.operDataStore(),
                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
                 new String[]{"cars", "people"},
@@ -847,7 +847,7 @@ public class ClusterAdminRpcServiceTest {
                 successShardResult("people", DataStoreType.Operational));
 
         // Members 2 and 3 are now non-voting but should get replicated with the new new server config.
-        verifyVotingStates(new DistributedDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
+        verifyVotingStates(new AbstractDataStore[]{leaderNode1.configDataStore(), leaderNode1.operDataStore(),
                 replicaNode2.configDataStore(), replicaNode2.operDataStore(),
                 replicaNode3.configDataStore(), replicaNode3.operDataStore()},
                 new String[]{"cars", "people"},
@@ -863,7 +863,7 @@ public class ClusterAdminRpcServiceTest {
         });
     }
 
-    private void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
+    private static void setupPersistedServerConfigPayload(ServerConfigurationPayload serverConfig,
             String member, String datastoreTypeSuffix, String... shards) {
         String[] datastoreTypes = {"config_", "oper_"};
         for (String type : datastoreTypes) {
@@ -884,9 +884,9 @@ public class ClusterAdminRpcServiceTest {
     }
 
     @SafeVarargs
-    private static void verifyVotingStates(DistributedDataStore[] datastores, String[] shards,
+    private static void verifyVotingStates(AbstractDataStore[] datastores, String[] shards,
             SimpleEntry<String, Boolean>... expStates) throws Exception {
-        for (DistributedDataStore datastore: datastores) {
+        for (AbstractDataStore datastore: datastores) {
             for (String shard: shards) {
                 verifyVotingStates(datastore, shard, expStates);
             }
@@ -894,7 +894,7 @@ public class ClusterAdminRpcServiceTest {
     }
 
     @SafeVarargs
-    private static void verifyVotingStates(DistributedDataStore datastore, String shardName,
+    private static void verifyVotingStates(AbstractDataStore datastore, String shardName,
             SimpleEntry<String, Boolean>... expStates) throws Exception {
         String localMemberName = datastore.getActorContext().getCurrentMemberName().getName();
         Map<String, Boolean> expStateMap = new HashMap<>();
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedDataStore.java
new file mode 100644 (file)
index 0000000..7d16966
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import akka.actor.ActorSystem;
+import com.google.common.annotations.VisibleForTesting;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
+import org.opendaylight.controller.cluster.datastore.ClusterWrapper;
+import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+
+/**
+ * Implements a distributed DOMStore using ClientActor.
+ */
+public class ClientBackedDataStore extends AbstractDataStore {
+
+    public ClientBackedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+            final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+            final DatastoreSnapshot restoreFromSnapshot) {
+        super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+    }
+
+    @VisibleForTesting
+    ClientBackedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
+        super(actorContext, identifier);
+    }
+
+    @Override
+    public DOMStoreTransactionChain createTransactionChain() {
+        return new ClientBackedTransactionChain(getClient().createLocalHistory());
+    }
+
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction() {
+        return new ClientBackedReadTransaction(getClient().createSnapshot(), null);
+    }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        return new ClientBackedWriteTransaction(getClient().createTransaction());
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        return new ClientBackedReadWriteTransaction(getClient().createTransaction());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadTransaction.java
new file mode 100644 (file)
index 0000000..3d10c39
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import javax.annotation.Nullable;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An implementation of {@link DOMStoreReadTransaction} backed by a {@link ClientSnapshot}. Used for standalone
+ * transactions.
+ *
+ * @author Robert Varga
+ */
+final class ClientBackedReadTransaction extends ClientBackedTransaction<ClientSnapshot>
+        implements DOMStoreReadTransaction {
+    private static final AtomicReferenceFieldUpdater<ClientBackedReadTransaction, ClientBackedTransactionChain>
+        PARENT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ClientBackedReadTransaction.class,
+            ClientBackedTransactionChain.class, "parent");
+
+    @SuppressWarnings("unused")
+    private volatile ClientBackedTransactionChain parent;
+
+    ClientBackedReadTransaction(final ClientSnapshot delegate, @Nullable final ClientBackedTransactionChain parent) {
+        super(delegate);
+        this.parent = parent;
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+        return Futures.makeChecked(delegate().read(path), ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        return Futures.makeChecked(delegate().exists(path), ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+
+        final ClientBackedTransactionChain local = PARENT_UPDATER.getAndSet(this, null);
+        if (local != null) {
+            local.snapshotClosed(delegate());
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedReadWriteTransaction.java
new file mode 100644 (file)
index 0000000..a1bbe61
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.Futures;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An implementation of {@link DOMStoreReadWriteTransaction} backed by a {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+final class ClientBackedReadWriteTransaction extends ClientBackedWriteTransaction
+        implements DOMStoreReadWriteTransaction {
+
+    ClientBackedReadWriteTransaction(final ClientTransaction delegate) {
+        super(delegate);
+    }
+
+    @Override
+    public CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(final YangInstanceIdentifier path) {
+        return Futures.makeChecked(delegate().read(path), ReadFailedException.MAPPER);
+    }
+
+    @Override
+    public CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
+        return Futures.makeChecked(delegate().exists(path), ReadFailedException.MAPPER);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransaction.java
new file mode 100644 (file)
index 0000000..6c14297
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
+import com.google.common.base.Preconditions;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nonnull;
+import org.opendaylight.controller.cluster.access.concepts.TransactionIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.AbstractClientHandle;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.AbstractDOMStoreTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link DOMStoreTransaction} backed by a {@link ClientTransaction}. It guards against user-level
+ * leaks by maintaining a phantom reference on the backing transaction, which will ensure that the transaction will
+ * be aborted, if it is not otherwise closed, just before this object is garbage-collected.
+ *
+ * @author Robert Varga
+ */
+abstract class ClientBackedTransaction<T extends AbstractClientHandle<?>> extends
+        AbstractDOMStoreTransaction<TransactionIdentifier> {
+    private static final class Finalizer extends FinalizablePhantomReference<ClientBackedTransaction<?>> {
+        private static final FinalizableReferenceQueue QUEUE = new FinalizableReferenceQueue();
+        private static final Set<Finalizer> FINALIZERS = ConcurrentHashMap.newKeySet();
+        private static final Logger LOG = LoggerFactory.getLogger(Finalizer.class);
+
+        private final AbstractClientHandle<?> transaction;
+
+        private Finalizer(final ClientBackedTransaction<?> referent, final AbstractClientHandle<?> transaction) {
+            super(referent, QUEUE);
+            this.transaction = Preconditions.checkNotNull(transaction);
+        }
+
+        static @Nonnull <T extends AbstractClientHandle<?>> T recordTransaction(
+                @Nonnull final ClientBackedTransaction<T> referent, @Nonnull final T transaction) {
+            FINALIZERS.add(new Finalizer(referent, transaction));
+            return transaction;
+        }
+
+        @Override
+        public void finalizeReferent() {
+            FINALIZERS.remove(this);
+            if (transaction.abort()) {
+                LOG.warn("Aborted orphan transaction {}", transaction.getIdentifier());
+            }
+        }
+    }
+
+    private final T delegate;
+
+    ClientBackedTransaction(final T delegate) {
+        super(delegate.getIdentifier());
+        this.delegate = Finalizer.recordTransaction(this, delegate);
+    }
+
+    final T delegate() {
+        return delegate;
+    }
+
+    @Override
+    public void close() {
+        delegate.abort();
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedTransactionChain.java
new file mode 100644 (file)
index 0000000..db30372
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.WeakHashMap;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.databroker.actors.dds.AbstractClientHandle;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientSnapshot;
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.md.sal.common.api.data.TransactionChainClosedException;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}.
+ *
+ * @author Robert Varga
+ */
+final class ClientBackedTransactionChain implements DOMStoreTransactionChain {
+    private static final Logger LOG = LoggerFactory.getLogger(ClientBackedTransactionChain.class);
+
+    @GuardedBy("this")
+    private final Map<AbstractClientHandle<?>, Boolean> openSnapshots = new WeakHashMap<>();
+
+    private final ClientLocalHistory history;
+
+    ClientBackedTransactionChain(final ClientLocalHistory history) {
+        this.history = Preconditions.checkNotNull(history);
+    }
+
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction() {
+        return new ClientBackedReadTransaction(createSnapshot(), this);
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        return new ClientBackedReadWriteTransaction(createTransaction());
+    }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        return new ClientBackedWriteTransaction(createTransaction());
+    }
+
+    @Override
+    public synchronized void close() {
+        for (AbstractClientHandle<?> snap : openSnapshots.keySet()) {
+            LOG.warn("Aborting unclosed transaction {}", snap.getIdentifier());
+            snap.abort();
+        }
+        openSnapshots.clear();
+
+        history.close();
+    }
+
+    synchronized void snapshotClosed(final ClientSnapshot clientTransaction) {
+        openSnapshots.remove(clientTransaction);
+    }
+
+    private ClientSnapshot createSnapshot() {
+        try {
+            return recordSnapshot(history.takeSnapshot());
+        } catch (org.opendaylight.mdsal.common.api.TransactionChainClosedException e) {
+            throw new TransactionChainClosedException("Transaction chain has been closed", e);
+        }
+    }
+
+    private ClientTransaction createTransaction() {
+        try {
+            return recordSnapshot(history.createTransaction());
+        } catch (org.opendaylight.mdsal.common.api.TransactionChainClosedException e) {
+            throw new TransactionChainClosedException("Transaction chain has been closed", e);
+        }
+    }
+
+    private synchronized <T extends AbstractClientHandle<?>> T recordSnapshot(final T snapshot) {
+        openSnapshots.put(snapshot, Boolean.TRUE);
+        return snapshot;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ClientBackedWriteTransaction.java
new file mode 100644 (file)
index 0000000..27a671d
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+/**
+ * An implementation of {@link DOMStoreWriteTransaction} backed by a {@link ClientTransaction}.
+ *
+ * @author Robert Varga
+ */
+class ClientBackedWriteTransaction extends ClientBackedTransaction<ClientTransaction>
+        implements DOMStoreWriteTransaction {
+    ClientBackedWriteTransaction(final ClientTransaction delegate) {
+        super(delegate);
+    }
+
+    @Override
+    public final void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        delegate().write(path, data);
+    }
+
+    @Override
+    public final void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
+        delegate().merge(path, data);
+    }
+
+    @Override
+    public final void delete(final YangInstanceIdentifier path) {
+        delegate().delete(path);
+    }
+
+    @Override
+    public final DOMStoreThreePhaseCommitCohort ready() {
+        return new DOMStoreThreePhaseCommitCohortAdaptor(delegate().ready());
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMStoreThreePhaseCommitCohortAdaptor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/DOMStoreThreePhaseCommitCohortAdaptor.java
new file mode 100644 (file)
index 0000000..e9a5a7d
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.databroker;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ForwardingObject;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
+
+/**
+ * Utility class from bridging {@link DOMStoreThreePhaseCommitCohort} and
+ * {@link org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort}.
+ *
+ * @author Robert Varga
+ */
+final class DOMStoreThreePhaseCommitCohortAdaptor extends ForwardingObject implements DOMStoreThreePhaseCommitCohort {
+    private final org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort delegate;
+
+    DOMStoreThreePhaseCommitCohortAdaptor(
+        final org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort delegate) {
+        this.delegate = Preconditions.checkNotNull(delegate);
+    }
+
+    @Override
+    public ListenableFuture<Boolean> canCommit() {
+        return delegate.canCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> preCommit() {
+        return delegate.preCommit();
+    }
+
+    @Override
+    public ListenableFuture<Void> abort() {
+        return delegate.abort();
+    }
+
+    @Override
+    public ListenableFuture<Void> commit() {
+        return delegate.commit();
+    }
+
+    @Override
+    protected Object delegate() {
+        return delegate;
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadTransaction.java
deleted file mode 100644 (file)
index a52254e..0000000
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.CheckedFuture;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.common.api.ReadFailedException;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * Proxy implementation of {@link DOMStoreReadTransaction}. It routes all requests to the backing
- * {@link ClientTransaction}. This class is not final to allow further subclassing by
- * {@link ShardedDOMStoreReadWriteTransaction}.
- *
- * @author Robert Varga
- */
-class ShardedDOMStoreReadTransaction extends AbstractShardedTransaction implements DOMStoreReadTransaction {
-    ShardedDOMStoreReadTransaction(final ClientTransaction tx) {
-        super(tx);
-    }
-
-    @Override
-    public final void close() {
-        transaction().abort();
-    }
-
-    @Override
-    public final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> read(
-            final YangInstanceIdentifier path) {
-        return transaction().read(path);
-    }
-
-    @Override
-    public final CheckedFuture<Boolean, ReadFailedException> exists(final YangInstanceIdentifier path) {
-        return transaction().exists(path);
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreReadWriteTransaction.java
deleted file mode 100644 (file)
index 9144f9b..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * Proxy implementation of {@link DOMStoreReadWriteTransaction}. It routes all requests to the backing
- * {@link ClientTransaction}.
- *
- * @author Robert Varga
- */
-final class ShardedDOMStoreReadWriteTransaction extends ShardedDOMStoreReadTransaction
-       implements DOMStoreReadWriteTransaction {
-    ShardedDOMStoreReadWriteTransaction(final ClientTransaction tx) {
-        super(tx);
-    }
-
-    @Override
-    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        transaction().write(path, data);
-    }
-
-    @Override
-    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        transaction().merge(path, data);
-    }
-
-    @Override
-    public void delete(final YangInstanceIdentifier path) {
-        transaction().delete(path);
-    }
-
-    @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
-        return transaction().ready();
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreTransactionChain.java
deleted file mode 100644 (file)
index 6b7237e..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import com.google.common.base.Preconditions;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientLocalHistory;
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreReadWriteTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreTransactionChain;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-
-/**
- * Implementation of {@link DOMStoreTransactionChain} backed by a {@link ClientLocalHistory}. It wraps
- * {@link ClientTransaction} into proxies like {@link ShardedDOMStoreReadTransaction} to provide isolation.
- *
- * @author Robert Varga
- */
-final class ShardedDOMStoreTransactionChain implements DOMStoreTransactionChain {
-    private final ClientLocalHistory history;
-
-    ShardedDOMStoreTransactionChain(final ClientLocalHistory history) {
-        this.history = Preconditions.checkNotNull(history);
-    }
-
-    @Override
-    public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new ShardedDOMStoreReadTransaction(history.createTransaction());
-    }
-
-    @Override
-    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new ShardedDOMStoreReadWriteTransaction(history.createTransaction());
-    }
-
-    @Override
-    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new ShardedDOMStoreWriteTransaction(history.createTransaction());
-    }
-
-    @Override
-    public void close() {
-        history.close();
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/databroker/ShardedDOMStoreWriteTransaction.java
deleted file mode 100644 (file)
index 0e13ac4..0000000
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License v1.0 which accompanies this distribution,
- * and is available at http://www.eclipse.org/legal/epl-v10.html
- */
-package org.opendaylight.controller.cluster.databroker;
-
-import org.opendaylight.controller.cluster.databroker.actors.dds.ClientTransaction;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.mdsal.dom.spi.store.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-
-/**
- * Proxy implementation of {@link DOMStoreWriteTransaction}. It routes all requests to the backing
- * {@link ClientTransaction}.
- *
- * @author Robert Varga
- */
-final class ShardedDOMStoreWriteTransaction extends AbstractShardedTransaction implements DOMStoreWriteTransaction {
-    ShardedDOMStoreWriteTransaction(final ClientTransaction tx) {
-        super(tx);
-    }
-
-    @Override
-    public void write(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        transaction().write(path, data);
-    }
-
-    @Override
-    public void merge(final YangInstanceIdentifier path, final NormalizedNode<?, ?> data) {
-        transaction().merge(path, data);
-    }
-
-    @Override
-    public void delete(final YangInstanceIdentifier path) {
-        transaction().delete(path);
-    }
-
-    @Override
-    public DOMStoreThreePhaseCommitCohort ready() {
-        return transaction().ready();
-    }
-
-    @Override
-    public void close() {
-        transaction().abort();
-    }
-}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractDataStore.java
new file mode 100644 (file)
index 0000000..36c822c
--- /dev/null
@@ -0,0 +1,290 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
+import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
+import org.opendaylight.controller.cluster.datastore.config.Configuration;
+import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
+import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
+import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
+import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
+import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base implementation of a distributed DOMStore.
+ */
+public abstract class AbstractDataStore implements DistributedDataStoreInterface, SchemaContextListener,
+        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher,
+        DOMDataTreeCommitCohortRegistry, AutoCloseable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractDataStore.class);
+
+    private static final long READY_WAIT_FACTOR = 3;
+
+    private final ActorContext actorContext;
+    private final long waitTillReadyTimeInMillis;
+
+    private AutoCloseable closeable;
+
+    private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
+
+    private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
+
+    private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
+
+    private final ClientIdentifier identifier;
+    private final DataStoreClient client;
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    protected AbstractDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
+            final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
+            final DatastoreSnapshot restoreFromSnapshot) {
+        Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
+        Preconditions.checkNotNull(cluster, "cluster should not be null");
+        Preconditions.checkNotNull(configuration, "configuration should not be null");
+        Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
+
+        String shardManagerId = ShardManagerIdentifier.builder()
+                .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
+
+        LOG.info("Creating ShardManager : {}", shardManagerId);
+
+        String shardDispatcher =
+                new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
+
+        PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
+
+        ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
+                .datastoreContextFactory(datastoreContextFactory)
+                .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
+                .primaryShardInfoCache(primaryShardInfoCache)
+                .restoreFromSnapshot(restoreFromSnapshot);
+
+        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
+                shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
+                primaryShardInfoCache);
+
+        final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
+            datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
+        final ActorRef clientActor = actorSystem.actorOf(clientProps);
+        try {
+            client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            LOG.error("Failed to get actor for {}", clientProps, e);
+            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+            throw Throwables.propagate(e);
+        }
+
+        identifier = client.getIdentifier();
+        LOG.debug("Distributed data store client {} started", identifier);
+
+        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+                .duration().toMillis() * READY_WAIT_FACTOR;
+
+        datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
+                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
+        datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
+        datastoreConfigMXBean.registerMBean();
+
+        datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext()
+                .getDataStoreMXBeanType(), actorContext);
+        datastoreInfoMXBean.registerMBean();
+    }
+
+    @VisibleForTesting
+    protected AbstractDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
+        this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.client = null;
+        this.identifier = Preconditions.checkNotNull(identifier);
+        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
+                .duration().toMillis() * READY_WAIT_FACTOR;
+    }
+
+    protected final DataStoreClient getClient() {
+        return client;
+    }
+
+    final ClientIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    public void setCloseable(final AutoCloseable closeable) {
+        this.closeable = closeable;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+                                              ListenerRegistration<L> registerChangeListener(
+        final YangInstanceIdentifier path, final L listener,
+        final AsyncDataBroker.DataChangeScope scope) {
+
+        Preconditions.checkNotNull(path, "path should not be null");
+        Preconditions.checkNotNull(listener, "listener should not be null");
+
+        LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
+
+        String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
+
+        final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
+                new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
+        listenerRegistrationProxy.init(path, scope);
+
+        return listenerRegistrationProxy;
+    }
+
+    @Override
+    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
+            final YangInstanceIdentifier treeId, final L listener) {
+        Preconditions.checkNotNull(treeId, "treeId should not be null");
+        Preconditions.checkNotNull(listener, "listener should not be null");
+
+        final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+        LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
+
+        final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
+                new DataTreeChangeListenerProxy<>(actorContext, listener);
+        listenerRegistrationProxy.init(shardName, treeId);
+
+        return listenerRegistrationProxy;
+    }
+
+
+    @Override
+    public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
+            final DOMDataTreeIdentifier subtree, final C cohort) {
+        YangInstanceIdentifier treeId =
+                Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
+        Preconditions.checkNotNull(cohort, "listener should not be null");
+
+
+        final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
+        LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
+
+        DataTreeCohortRegistrationProxy<C> cohortProxy =
+                new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort);
+        cohortProxy.init(shardName);
+        return cohortProxy;
+    }
+
+    @Override
+    public void onGlobalContextUpdated(final SchemaContext schemaContext) {
+        actorContext.setSchemaContext(schemaContext);
+    }
+
+    @Override
+    public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
+        LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
+
+        actorContext.setDatastoreContext(contextFactory);
+        datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    public void close() {
+        LOG.info("Closing data store {}", identifier);
+
+        if (datastoreConfigMXBean != null) {
+            datastoreConfigMXBean.unregisterMBean();
+        }
+        if (datastoreInfoMXBean != null) {
+            datastoreInfoMXBean.unregisterMBean();
+        }
+
+        if (closeable != null) {
+            try {
+                closeable.close();
+            } catch (Exception e) {
+                LOG.debug("Error closing instance", e);
+            }
+        }
+
+        actorContext.shutdown();
+
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
+    public ActorContext getActorContext() {
+        return actorContext;
+    }
+
+    public void waitTillReady() {
+        LOG.info("Beginning to wait for data store to become ready : {}", identifier);
+
+        try {
+            if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
+                LOG.debug("Data store {} is now ready", identifier);
+            } else {
+                LOG.error("Shard leaders failed to settle in {} seconds, giving up",
+                        TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Interrupted while waiting for shards to settle", e);
+        }
+    }
+
+    @SuppressWarnings("checkstyle:IllegalCatch")
+    private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
+            final String shardDispatcher, final String shardManagerId) {
+        Exception lastException = null;
+
+        for (int i = 0; i < 100; i++) {
+            try {
+                return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
+                        ActorContext.BOUNDED_MAILBOX), shardManagerId);
+            } catch (Exception e) {
+                lastException = e;
+                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+                LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
+                        + "(retry count = {})", shardManagerId, e.getMessage(), i);
+            }
+        }
+
+        throw new IllegalStateException("Failed to create Shard Manager", lastException);
+    }
+
+    @VisibleForTesting
+    public CountDownLatch getWaitTillReadyCountDownLatch() {
+        return waitTillReadyCountDownLatch;
+    }
+}
index a81806b..e889f0a 100644 (file)
@@ -76,6 +76,7 @@ public class DatastoreContext {
     private int shardBatchedModificationCount = DEFAULT_SHARD_BATCHED_MODIFICATION_COUNT;
     private boolean writeOnlyTransactionOptimizationsEnabled = true;
     private long shardCommitQueueExpiryTimeoutInMillis = DEFAULT_SHARD_COMMIT_QUEUE_EXPIRY_TIMEOUT_IN_MS;
+    private boolean useTellBasedProtocol = false;
     private boolean transactionDebugContextEnabled = false;
     private String shardManagerPersistenceId;
 
@@ -113,6 +114,7 @@ public class DatastoreContext {
         this.shardCommitQueueExpiryTimeoutInMillis = other.shardCommitQueueExpiryTimeoutInMillis;
         this.transactionDebugContextEnabled = other.transactionDebugContextEnabled;
         this.shardManagerPersistenceId = other.shardManagerPersistenceId;
+        this.useTellBasedProtocol = other.useTellBasedProtocol;
 
         setShardJournalRecoveryLogBatchSize(other.raftConfig.getJournalRecoveryLogBatchSize());
         setSnapshotBatchCount(other.raftConfig.getSnapshotBatchCount());
@@ -258,6 +260,10 @@ public class DatastoreContext {
         return transactionDebugContextEnabled;
     }
 
+    public boolean isUseTellBasedProtocol() {
+        return useTellBasedProtocol;
+    }
+
     public int getShardSnapshotChunkSize() {
         return raftConfig.getSnapshotChunkSize();
     }
@@ -471,6 +477,11 @@ public class DatastoreContext {
             return this;
         }
 
+        public Builder useTellBasedProtocol(boolean value) {
+            datastoreContext.useTellBasedProtocol = value;
+            return this;
+        }
+
         /**
          * For unit tests only.
          */
index 4ae64ac..47e5509 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import org.opendaylight.controller.cluster.access.concepts.ClientIdentifier;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DataStoreClient;
-import org.opendaylight.controller.cluster.databroker.actors.dds.DistributedDataStoreClientActor;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
-import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreConfigurationMXBeanImpl;
-import org.opendaylight.controller.cluster.datastore.jmx.mbeans.DatastoreInfoMXBeanImpl;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
-import org.opendaylight.controller.cluster.datastore.shardmanager.ShardManagerCreator;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
-import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
-import org.opendaylight.controller.cluster.datastore.utils.PrimaryShardInfoFutureCache;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
-import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohort;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistration;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeCommitCohortRegistry;
-import org.opendaylight.mdsal.dom.api.DOMDataTreeIdentifier;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
-import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
- * Implements a distributed DOMStore.
+ * Implements a distributed DOMStore using Akka Patterns.ask().
  */
-public class DistributedDataStore implements DistributedDataStoreInterface, SchemaContextListener,
-        DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher,
-        DOMDataTreeCommitCohortRegistry, AutoCloseable {
-
-    private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
-
-    private static final long READY_WAIT_FACTOR = 3;
-
-    private final ActorContext actorContext;
-    private final long waitTillReadyTimeInMillis;
-
-    private AutoCloseable closeable;
-
-    private DatastoreConfigurationMXBeanImpl datastoreConfigMXBean;
-
-    private DatastoreInfoMXBeanImpl datastoreInfoMXBean;
-
-    private final CountDownLatch waitTillReadyCountDownLatch = new CountDownLatch(1);
-
-    private final ClientIdentifier identifier;
-    private final DataStoreClient client;
+public class DistributedDataStore extends AbstractDataStore {
 
     private final TransactionContextFactory txContextFactory;
 
-    @SuppressWarnings("checkstyle:IllegalCatch")
     public DistributedDataStore(final ActorSystem actorSystem, final ClusterWrapper cluster,
             final Configuration configuration, final DatastoreContextFactory datastoreContextFactory,
             final DatastoreSnapshot restoreFromSnapshot) {
-        Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
-        Preconditions.checkNotNull(cluster, "cluster should not be null");
-        Preconditions.checkNotNull(configuration, "configuration should not be null");
-        Preconditions.checkNotNull(datastoreContextFactory, "datastoreContextFactory should not be null");
-
-        String shardManagerId = ShardManagerIdentifier.builder()
-                .type(datastoreContextFactory.getBaseDatastoreContext().getDataStoreName()).build().toString();
-
-        LOG.info("Creating ShardManager : {}", shardManagerId);
-
-        String shardDispatcher =
-                new Dispatchers(actorSystem.dispatchers()).getDispatcherPath(Dispatchers.DispatcherType.Shard);
-
-        PrimaryShardInfoFutureCache primaryShardInfoCache = new PrimaryShardInfoFutureCache();
-
-        ShardManagerCreator creator = new ShardManagerCreator().cluster(cluster).configuration(configuration)
-                .datastoreContextFactory(datastoreContextFactory)
-                .waitTillReadyCountDownLatch(waitTillReadyCountDownLatch)
-                .primaryShardInfoCache(primaryShardInfoCache)
-                .restoreFromSnapshot(restoreFromSnapshot);
-
-        actorContext = new ActorContext(actorSystem, createShardManager(actorSystem, creator, shardDispatcher,
-                shardManagerId), cluster, configuration, datastoreContextFactory.getBaseDatastoreContext(),
-                primaryShardInfoCache);
-
-        final Props clientProps = DistributedDataStoreClientActor.props(cluster.getCurrentMemberName(),
-            datastoreContextFactory.getBaseDatastoreContext().getDataStoreName(), actorContext);
-        final ActorRef clientActor = actorSystem.actorOf(clientProps);
-        try {
-            client = DistributedDataStoreClientActor.getDistributedDataStoreClient(clientActor, 30, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            LOG.error("Failed to get actor for {}", clientProps, e);
-            clientActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
-            throw Throwables.propagate(e);
-        }
-
-        identifier = client.getIdentifier();
-        LOG.debug("Distributed data store client {} started", identifier);
-
-        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
-                .duration().toMillis() * READY_WAIT_FACTOR;
-
-        this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
-
-        datastoreConfigMXBean = new DatastoreConfigurationMXBeanImpl(
-                datastoreContextFactory.getBaseDatastoreContext().getDataStoreMXBeanType());
-        datastoreConfigMXBean.setContext(datastoreContextFactory.getBaseDatastoreContext());
-        datastoreConfigMXBean.registerMBean();
-
-        datastoreInfoMXBean = new DatastoreInfoMXBeanImpl(datastoreContextFactory.getBaseDatastoreContext()
-                .getDataStoreMXBeanType(), actorContext);
-        datastoreInfoMXBean.registerMBean();
+        super(actorSystem, cluster, configuration, datastoreContextFactory, restoreFromSnapshot);
+        this.txContextFactory = new TransactionContextFactory(getActorContext(), getIdentifier());
     }
 
     @VisibleForTesting
     DistributedDataStore(final ActorContext actorContext, final ClientIdentifier identifier) {
-        this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
-        this.client = null;
-        this.identifier = Preconditions.checkNotNull(identifier);
-        this.txContextFactory = new TransactionContextFactory(actorContext, identifier);
-        this.waitTillReadyTimeInMillis = actorContext.getDatastoreContext().getShardLeaderElectionTimeout()
-                .duration().toMillis() * READY_WAIT_FACTOR;
-    }
-
-    public void setCloseable(final AutoCloseable closeable) {
-        this.closeable = closeable;
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <L extends AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-                                              ListenerRegistration<L> registerChangeListener(
-        final YangInstanceIdentifier path, final L listener,
-        final AsyncDataBroker.DataChangeScope scope) {
-
-        Preconditions.checkNotNull(path, "path should not be null");
-        Preconditions.checkNotNull(listener, "listener should not be null");
-
-        LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
-
-        String shardName = actorContext.getShardStrategyFactory().getStrategy(path).findShard(path);
-
-        final DataChangeListenerRegistrationProxy listenerRegistrationProxy =
-                new DataChangeListenerRegistrationProxy(shardName, actorContext, listener);
-        listenerRegistrationProxy.init(path, scope);
-
-        return listenerRegistrationProxy;
-    }
-
-    @Override
-    public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(
-            final YangInstanceIdentifier treeId, final L listener) {
-        Preconditions.checkNotNull(treeId, "treeId should not be null");
-        Preconditions.checkNotNull(listener, "listener should not be null");
-
-        final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
-        LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
-
-        final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
-                new DataTreeChangeListenerProxy<>(actorContext, listener);
-        listenerRegistrationProxy.init(shardName, treeId);
-
-        return listenerRegistrationProxy;
+        super(actorContext, identifier);
+        this.txContextFactory = new TransactionContextFactory(getActorContext(), getIdentifier());
     }
 
 
-    @Override
-    public <C extends DOMDataTreeCommitCohort> DOMDataTreeCommitCohortRegistration<C> registerCommitCohort(
-            final DOMDataTreeIdentifier subtree, final C cohort) {
-        YangInstanceIdentifier treeId =
-                Preconditions.checkNotNull(subtree, "subtree should not be null").getRootIdentifier();
-        Preconditions.checkNotNull(cohort, "listener should not be null");
-
-
-        final String shardName = actorContext.getShardStrategyFactory().getStrategy(treeId).findShard(treeId);
-        LOG.debug("Registering cohort: {} for tree: {} shard: {}", cohort, treeId, shardName);
-
-        DataTreeCohortRegistrationProxy<C> cohortProxy =
-                new DataTreeCohortRegistrationProxy<>(actorContext, subtree, cohort);
-        cohortProxy.init(shardName);
-        return cohortProxy;
-    }
-
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
         return txContextFactory.createTransactionChain();
@@ -216,99 +52,19 @@ public class DistributedDataStore implements DistributedDataStoreInterface, Sche
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        actorContext.acquireTxCreationPermit();
+        getActorContext().acquireTxCreationPermit();
         return new TransactionProxy(txContextFactory, TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        actorContext.acquireTxCreationPermit();
+        getActorContext().acquireTxCreationPermit();
         return new TransactionProxy(txContextFactory, TransactionType.READ_WRITE);
     }
 
     @Override
-    public void onGlobalContextUpdated(final SchemaContext schemaContext) {
-        actorContext.setSchemaContext(schemaContext);
-    }
-
-    @Override
-    public void onDatastoreContextUpdated(final DatastoreContextFactory contextFactory) {
-        LOG.info("DatastoreContext updated for data store {}", actorContext.getDataStoreName());
-
-        actorContext.setDatastoreContext(contextFactory);
-        datastoreConfigMXBean.setContext(contextFactory.getBaseDatastoreContext());
-    }
-
-    @Override
-    @SuppressWarnings("checkstyle:IllegalCatch")
     public void close() {
-        LOG.info("Closing data store {}", identifier);
-
-        if (datastoreConfigMXBean != null) {
-            datastoreConfigMXBean.unregisterMBean();
-        }
-        if (datastoreInfoMXBean != null) {
-            datastoreInfoMXBean.unregisterMBean();
-        }
-
-        if (closeable != null) {
-            try {
-                closeable.close();
-            } catch (Exception e) {
-                LOG.debug("Error closing instance", e);
-            }
-        }
-
         txContextFactory.close();
-        actorContext.shutdown();
-
-        if (client != null) {
-            client.close();
-        }
-    }
-
-    @Override
-    public ActorContext getActorContext() {
-        return actorContext;
-    }
-
-    public void waitTillReady() {
-        LOG.info("Beginning to wait for data store to become ready : {}", identifier);
-
-        try {
-            if (waitTillReadyCountDownLatch.await(waitTillReadyTimeInMillis, TimeUnit.MILLISECONDS)) {
-                LOG.debug("Data store {} is now ready", identifier);
-            } else {
-                LOG.error("Shard leaders failed to settle in {} seconds, giving up",
-                        TimeUnit.MILLISECONDS.toSeconds(waitTillReadyTimeInMillis));
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted while waiting for shards to settle", e);
-        }
-    }
-
-    @SuppressWarnings("checkstyle:IllegalCatch")
-    private static ActorRef createShardManager(final ActorSystem actorSystem, final ShardManagerCreator creator,
-            final String shardDispatcher, final String shardManagerId) {
-        Exception lastException = null;
-
-        for (int i = 0; i < 100; i++) {
-            try {
-                return actorSystem.actorOf(creator.props().withDispatcher(shardDispatcher).withMailbox(
-                        ActorContext.BOUNDED_MAILBOX), shardManagerId);
-            } catch (Exception e) {
-                lastException = e;
-                Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-                LOG.debug("Could not create actor {} because of {} - waiting for sometime before retrying "
-                        + "(retry count = {})", shardManagerId, e.getMessage(), i);
-            }
-        }
-
-        throw new IllegalStateException("Failed to create Shard Manager", lastException);
-    }
-
-    @VisibleForTesting
-    public CountDownLatch getWaitTillReadyCountDownLatch() {
-        return waitTillReadyCountDownLatch;
+        super.close();
     }
 }
index 484a400..be6d2d5 100644 (file)
@@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
 import org.opendaylight.controller.cluster.ActorSystemProvider;
+import org.opendaylight.controller.cluster.databroker.ClientBackedDataStore;
 import org.opendaylight.controller.cluster.datastore.config.Configuration;
 import org.opendaylight.controller.cluster.datastore.config.ConfigurationImpl;
 import org.opendaylight.controller.cluster.datastore.messages.DatastoreSnapshot;
@@ -20,9 +21,9 @@ import org.slf4j.LoggerFactory;
 public class DistributedDataStoreFactory {
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStoreFactory.class);
 
-    public static DistributedDataStore createInstance(SchemaService schemaService,
-            DatastoreContext datastoreContext, DatastoreSnapshotRestore datastoreSnapshotRestore,
-            ActorSystemProvider actorSystemProvider, BundleContext bundleContext) {
+    public static AbstractDataStore createInstance(final SchemaService schemaService,
+            final DatastoreContext datastoreContext, final DatastoreSnapshotRestore datastoreSnapshotRestore,
+            final ActorSystemProvider actorSystemProvider, final BundleContext bundleContext) {
 
         LOG.info("Create data store instance of type : {}", datastoreContext.getDataStoreName());
 
@@ -34,8 +35,12 @@ public class DistributedDataStoreFactory {
                 introspector, bundleContext);
 
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
-        final DistributedDataStore dataStore = new DistributedDataStore(actorSystem,
-                new ClusterWrapperImpl(actorSystem), config, introspector.newContextFactory(), restoreFromSnapshot);
+        ClusterWrapper clusterWrapper = new ClusterWrapperImpl(actorSystem);
+        DatastoreContextFactory contextFactory = introspector.newContextFactory();
+
+        final AbstractDataStore dataStore = datastoreContext.isUseTellBasedProtocol()
+                ? new ClientBackedDataStore(actorSystem, clusterWrapper, config, contextFactory, restoreFromSnapshot) :
+                    new DistributedDataStore(actorSystem, clusterWrapper, config, contextFactory, restoreFromSnapshot);
 
         overlay.setListener(dataStore);
 
index e30b324..18f5681 100644 (file)
@@ -93,6 +93,7 @@ public class DistributedConfigDataStoreProviderModule extends AbstractDistribute
                 .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
                 .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
                 .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+                .useTellBasedProtocol(props.getUseTellBasedProtocol())
                 .build();
     }
 
index 0597aba..ce4e543 100644 (file)
@@ -94,6 +94,7 @@ public class DistributedOperationalDataStoreProviderModule
                 .transactionDebugContextEnabled(props.getTransactionDebugContextEnabled())
                 .customRaftPolicyImplementation(props.getCustomRaftPolicyImplementation())
                 .shardSnapshotChunkSize(props.getShardSnapshotChunkSize().getValue().intValue())
+                .useTellBasedProtocol(props.getUseTellBasedProtocol())
                 .build();
     }
 
index 285de0e..2ae5f18 100644 (file)
@@ -66,157 +66,157 @@ module distributed-datastore-provider {
             default 1000;
             type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change notification executor.";
-         }
+        }
 
-         leaf max-shard-data-change-executor-pool-size {
+        leaf max-shard-data-change-executor-pool-size {
             default 20;
             type non-zero-uint32-type;
             description "The maximum thread pool size for each shard's data store data change notification executor.";
-         }
+        }
 
-         leaf max-shard-data-change-listener-queue-size {
+        leaf max-shard-data-change-listener-queue-size {
             default 1000;
             type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store data change listener.";
-         }
+        }
 
-         leaf max-shard-data-store-executor-queue-size {
+        leaf max-shard-data-store-executor-queue-size {
             default 5000;
             type non-zero-uint32-type;
             description "The maximum queue size for each shard's data store executor.";
-         }
+        }
 
-         leaf shard-transaction-idle-timeout-in-minutes {
+        leaf shard-transaction-idle-timeout-in-minutes {
             default 10;
             type non-zero-uint32-type;
             description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
-         }
+        }
 
-         leaf shard-snapshot-batch-count {
+        leaf shard-snapshot-batch-count {
             default 20000;
             type non-zero-uint32-type;
             description "The minimum number of entries to be present in the in-memory journal log before a snapshot is to be taken.";
-         }
+        }
 
-         leaf shard-snapshot-data-threshold-percentage {
+        leaf shard-snapshot-data-threshold-percentage {
             default 12;
             type percentage;
             description "The percentage of Runtime.maxMemory() used by the in-memory journal log before a snapshot is to be taken";
-         }
+        }
 
 
-         leaf shard-heartbeat-interval-in-millis {
+        leaf shard-heartbeat-interval-in-millis {
             default 500;
             type heartbeat-interval-type;
             description "The interval at which a shard will send a heart beat message to its remote shard.";
-         }
+        }
 
-         leaf shard-election-timeout-factor {
+        leaf shard-election-timeout-factor {
             default 20;
             type non-zero-uint32-type;
             description "The multiplication factor to be used to determine shard election timeout. The shard election timeout
                          is determined by multiplying shard-heartbeat-interval-in-millis with the shard-election-timeout-factor";
-         }
+        }
 
-         leaf operation-timeout-in-seconds {
+        leaf operation-timeout-in-seconds {
             default 5;
             type operation-timeout-type;
             description "The maximum amount of time for akka operations (remote or local) to complete before failing.";
-         }
+        }
 
-         leaf shard-journal-recovery-log-batch-size {
+        leaf shard-journal-recovery-log-batch-size {
             default 1;
             type non-zero-uint32-type;
             description "The maximum number of journal log entries to batch on recovery for a shard before committing to the data store.";
-         }
+        }
 
-         leaf shard-transaction-commit-timeout-in-seconds {
+        leaf shard-transaction-commit-timeout-in-seconds {
             default 30;
             type non-zero-uint32-type;
             description "The maximum amount of time a shard transaction three-phase commit can be idle without receiving the next messages before it aborts the transaction";
-         }
+        }
 
-         leaf shard-transaction-commit-queue-capacity {
+        leaf shard-transaction-commit-queue-capacity {
             default 50000;
             type non-zero-uint32-type;
             description "The maximum allowed capacity for each shard's transaction commit queue.";
-         }
-
-         leaf shard-commit-queue-expiry-timeout-in-seconds {
-             default 120; // 2 minutes
-             type non-zero-uint32-type;
-             description "The maximum amount of time a transaction can remain in a shard's commit queue waiting 
-                 to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be
-                 quick but latencies can occur in between transaction ready and CanCommit or a remote broker
-                 could lose connection and CanCommit might never occur. Expiring transactions from the queue
-                 allows subsequent pending transaction to be processed.";
-         }
-         
-         leaf shard-initialization-timeout-in-seconds {
+        }
+
+        leaf shard-commit-queue-expiry-timeout-in-seconds {
+            default 120; // 2 minutes
+            type non-zero-uint32-type;
+            description "The maximum amount of time a transaction can remain in a shard's commit queue waiting
+                to begin the CanCommit phase as coordinated by the broker front-end. Normally this should be
+                quick but latencies can occur in between transaction ready and CanCommit or a remote broker
+                could lose connection and CanCommit might never occur. Expiring transactions from the queue
+                allows subsequent pending transaction to be processed.";
+        }
+
+        leaf shard-initialization-timeout-in-seconds {
             default 300; // 5 minutes
             type non-zero-uint32-type;
             description "The maximum amount of time to wait for a shard to initialize from persistence
                          on startup before failing an operation (eg transaction create and change
                          listener registration).";
-         }
+        }
 
-         leaf shard-leader-election-timeout-in-seconds {
+        leaf shard-leader-election-timeout-in-seconds {
             default 30;
             type non-zero-uint32-type;
             description "The maximum amount of time to wait for a shard to elect a leader before failing
                           an operation (eg transaction create).";
-         }
+        }
 
-         leaf shard-batched-modification-count {
+        leaf shard-batched-modification-count {
             default 1000;
             type non-zero-uint32-type;
             description "The number of transaction modification operations (put, merge, delete) to
                         batch before sending to the shard transaction actor. Batching improves
                         performance as less modifications messages are sent to the actor and thus
                         lessens the chance that the transaction actor's mailbox queue could get full.";
-         }
+        }
 
-         leaf enable-metric-capture {
+        leaf enable-metric-capture {
             default false;
             type boolean;
             description "Enable or disable metric capture.";
-         }
+        }
 
-         leaf bounded-mailbox-capacity {
-             default 1000;
-             type non-zero-uint32-type;
-             description "Max queue size that an actor's mailbox can reach";
-         }
+        leaf bounded-mailbox-capacity {
+            default 1000;
+            type non-zero-uint32-type;
+            description "Max queue size that an actor's mailbox can reach";
+        }
 
-         leaf persistent {
+        leaf persistent {
             default true;
             type boolean;
             description "Enable or disable data persistence";
-         }
+        }
 
-         leaf shard-isolated-leader-check-interval-in-millis {
+        leaf shard-isolated-leader-check-interval-in-millis {
             default 5000;
             type heartbeat-interval-type;
             description "The interval at which the leader of the shard will check if its majority
                         followers are active and term itself as isolated";
-         }
+        }
 
-         leaf transaction-creation-initial-rate-limit {
+        leaf transaction-creation-initial-rate-limit {
             default 100;
             type non-zero-uint32-type;
             description "The initial number of transactions per second that are allowed before the data store
                          should begin applying back pressure. This number is only used as an initial guidance,
                          subsequently the datastore measures the latency for a commit and auto-adjusts the rate limit";
-         }
-         
-         leaf transaction-debug-context-enabled {
-             default false;
-             type boolean;
-             description "Enable or disable transaction context debug. This will log the call site trace for
-                          transactions that fail";
-         }
-
-         leaf custom-raft-policy-implementation {
+        }
+
+        leaf transaction-debug-context-enabled {
+            default false;
+            type boolean;
+            description "Enable or disable transaction context debug. This will log the call site trace for
+                         transactions that fail";
+        }
+
+        leaf custom-raft-policy-implementation {
             default "";
             type string;
             description "A fully qualified java class name. The class should implement
@@ -225,14 +225,21 @@ module distributed-datastore-provider {
                          reflection. For now let's assume that these classes to customize raft behaviors should be
                          present in the distributed data store module itself. If this property is set to a class which
                          cannot be found then the default raft behavior will be applied";
-         }
+        }
 
-         leaf shard-snapshot-chunk-size {
+        leaf shard-snapshot-chunk-size {
             default 2048000;
             type non-zero-uint32-type;
-            description "When sending a snapshot to a follower, this is the maximum size in bytes for 
+            description "When sending a snapshot to a follower, this is the maximum size in bytes for
                          a chunk of data.";
-         }
+        }
+
+        leaf use-tell-based-protocol {
+            default false;
+            type boolean;
+            description "Use a newer protocol between the frontend and backend. This feature is considered
+                         exprerimental at this point.";
+        }
     }
 
     // Augments the 'configuration' choice node under modules/module.
index 21c8497..301f65f 100644 (file)
@@ -86,7 +86,7 @@ public class DataTreeCohortIntegrationTest {
         ArgumentCaptor<DOMDataTreeCandidate> candidateCapt = ArgumentCaptor.forClass(DOMDataTreeCandidate.class);
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (final DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
+                try (final AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
                         "test-1")) {
                     final ObjectRegistration<DOMDataTreeCommitCohort> cohortReg =
                             dataStore.registerCommitCohort(TEST_ID, cohort);
@@ -122,7 +122,7 @@ public class DataTreeCohortIntegrationTest {
 
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (final DistributedDataStore dataStore =
+                try (final AbstractDataStore dataStore =
                         setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
                     dataStore.registerCommitCohort(TEST_ID, failedCohort);
                     Thread.sleep(1000); // Registration is asynchronous
@@ -157,7 +157,7 @@ public class DataTreeCohortIntegrationTest {
         Mockito.doReturn(ThreePhaseCommitStep.NOOP_ABORT_FUTURE).when(stepToAbort).abort();
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (final DistributedDataStore dataStore =
+                try (final AbstractDataStore dataStore =
                         setupDistributedDataStore("transactionIntegrationTest", "test-1")) {
                     dataStore.registerCommitCohort(TEST_ID, cohortToAbort);
                     Thread.sleep(1000); // Registration is asynchronous
index 4d81442..9b97104 100644 (file)
@@ -115,7 +115,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testWriteTransactionWithSingleShard() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
+                try (AbstractDataStore dataStore = setupDistributedDataStore("transactionIntegrationTest",
                         "test-1")) {
 
                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
@@ -132,7 +132,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testWriteTransactionWithMultipleShards() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
 
                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
@@ -183,7 +183,7 @@ public class DistributedDataStoreIntegrationTest {
         System.setProperty("shard.persistent", "true");
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testReadWriteTransactionWithSingleShard", "test-1")) {
 
                     // 1. Create a read-write Tx
@@ -230,7 +230,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testReadWriteTransactionWithMultipleShards() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testReadWriteTransactionWithMultipleShards", "cars-1", "people-1")) {
 
                     DOMStoreReadWriteTransaction readWriteTx = dataStore.newReadWriteTransaction();
@@ -288,7 +288,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testSingleTransactionsWritesInQuickSuccession() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testSingleTransactionsWritesInQuickSuccession", "cars-1")) {
 
                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -331,7 +331,7 @@ public class DistributedDataStoreIntegrationTest {
                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
 
-                try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+                try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
 
                     // Create the write Tx
 
@@ -437,7 +437,7 @@ public class DistributedDataStoreIntegrationTest {
                 CountDownLatch blockRecoveryLatch = new CountDownLatch(1);
                 InMemoryJournal.addBlockReadMessagesLatch(persistentID, blockRecoveryLatch);
 
-                try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+                try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
 
                     // Create the read-write Tx
 
@@ -520,7 +520,7 @@ public class DistributedDataStoreIntegrationTest {
 
                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
 
-                try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+                try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
 
                     // Create the write Tx
 
@@ -598,7 +598,7 @@ public class DistributedDataStoreIntegrationTest {
 
                 InMemoryJournal.addEntry(persistentID, 1, "Dummy data so akka will read from persistence");
 
-                try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+                try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
 
                     // Create the read-write Tx
 
@@ -675,7 +675,7 @@ public class DistributedDataStoreIntegrationTest {
                 datastoreContextBuilder.shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(1)
                         .shardInitializationTimeout(200, TimeUnit.MILLISECONDS);
 
-                try (DistributedDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
+                try (AbstractDataStore dataStore = setupDistributedDataStore(testName, false, shardName)) {
 
                     Object result = dataStore.getActorContext().executeOperation(
                             dataStore.getActorContext().getShardManager(), new FindLocalShard(shardName, true));
@@ -753,7 +753,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testTransactionAbort() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest",
+                try (AbstractDataStore dataStore = setupDistributedDataStore("transactionAbortIntegrationTest",
                         "test-1")) {
 
                     DOMStoreWriteTransaction writeTx = dataStore.newWriteOnlyTransaction();
@@ -779,7 +779,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testTransactionChainWithSingleShard() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
+                try (AbstractDataStore dataStore = setupDistributedDataStore("testTransactionChainWithSingleShard",
                         "test-1")) {
 
                     // 1. Create a Tx chain and write-only Tx
@@ -875,7 +875,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testTransactionChainWithMultipleShards() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testTransactionChainWithMultipleShards", "cars-1", "people-1")) {
 
                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -943,7 +943,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testCreateChainedTransactionsInQuickSuccession() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testCreateChainedTransactionsInQuickSuccession", "cars-1")) {
 
                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
@@ -992,7 +992,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testCreateChainedTransactionAfterEmptyTxReadied() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testCreateChainedTransactionAfterEmptyTxReadied", "test-1")) {
 
                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -1016,7 +1016,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testCreateChainedTransactionWhenPreviousNotReady() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testCreateChainedTransactionWhenPreviousNotReady", "test-1")) {
 
                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -1040,7 +1040,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testCreateChainedTransactionAfterClose() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testCreateChainedTransactionAfterClose", "test-1")) {
 
                     DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -1060,7 +1060,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testChainWithReadOnlyTxAfterPreviousReady() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testChainWithReadOnlyTxAfterPreviousReady", "test-1")) {
 
                     final DOMStoreTransactionChain txChain = dataStore.createTransactionChain();
@@ -1108,7 +1108,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testChainedTransactionFailureWithSingleShard() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testChainedTransactionFailureWithSingleShard", "cars-1")) {
 
                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
@@ -1148,7 +1148,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testChainedTransactionFailureWithMultipleShards() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore(
+                try (AbstractDataStore dataStore = setupDistributedDataStore(
                         "testChainedTransactionFailureWithMultipleShards", "cars-1", "people-1")) {
 
                     ConcurrentDOMDataBroker broker = new ConcurrentDOMDataBroker(
@@ -1194,7 +1194,7 @@ public class DistributedDataStoreIntegrationTest {
     public void testChangeListenerRegistration() throws Exception {
         new IntegrationTestKit(getSystem(), datastoreContextBuilder) {
             {
-                try (DistributedDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration",
+                try (AbstractDataStore dataStore = setupDistributedDataStore("testChangeListenerRegistration",
                         "test-1")) {
 
                     testWriteTransaction(dataStore, TestModel.TEST_PATH,
@@ -1274,7 +1274,7 @@ public class DistributedDataStoreIntegrationTest {
                                 new DatastoreSnapshot.ShardSnapshot("people",
                                         org.apache.commons.lang3.SerializationUtils.serialize(peopleSnapshot))));
 
-                try (DistributedDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
+                try (AbstractDataStore dataStore = setupDistributedDataStore(name, "module-shards-member1.conf",
                         true, "cars", "people")) {
 
                     DOMStoreReadTransaction readTx = dataStore.newReadOnlyTransaction();
index e157e42..7474690 100644 (file)
@@ -124,8 +124,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
     private final TransactionIdentifier tx1 = nextTransactionId();
     private final TransactionIdentifier tx2 = nextTransactionId();
 
-    private DistributedDataStore followerDistributedDataStore;
-    private DistributedDataStore leaderDistributedDataStore;
+    private AbstractDataStore followerDistributedDataStore;
+    private AbstractDataStore leaderDistributedDataStore;
     private IntegrationTestKit followerTestKit;
     private IntegrationTestKit leaderTestKit;
 
@@ -155,15 +155,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         JavaTestKit.shutdownActorSystem(follower2System);
     }
 
-    private void initDatastoresWithCars(String type) {
+    private void initDatastoresWithCars(final String type) {
         initDatastores(type, MODULE_SHARDS_CARS_ONLY_1_2, CARS);
     }
 
-    private void initDatastoresWithCarsAndPeople(String type) {
+    private void initDatastoresWithCarsAndPeople(final String type) {
         initDatastores(type, MODULE_SHARDS_CARS_PEOPLE_1_2, CARS_AND_PEOPLE);
     }
 
-    private void initDatastores(String type, String moduleShardsConfig, String[] shards) {
+    private void initDatastores(final String type, final String moduleShardsConfig, final String[] shards) {
         leaderTestKit = new IntegrationTestKit(leaderSystem, leaderDatastoreContextBuilder);
 
         leaderDistributedDataStore = leaderTestKit.setupDistributedDataStore(type, moduleShardsConfig, false, shards);
@@ -175,7 +175,8 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         leaderTestKit.waitUntilLeader(leaderDistributedDataStore.getActorContext(), shards);
     }
 
-    private static void verifyCars(DOMStoreReadTransaction readTx, MapEntryNode... entries) throws Exception {
+    private static void verifyCars(final DOMStoreReadTransaction readTx, final MapEntryNode... entries)
+            throws Exception {
         Optional<NormalizedNode<?, ?>> optional = readTx.read(CarsModel.CAR_LIST_PATH).get(5, TimeUnit.SECONDS);
         assertEquals("isPresent", true, optional.isPresent());
 
@@ -187,14 +188,15 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         assertEquals("Car list node", listBuilder.build(), optional.get());
     }
 
-    private static void verifyNode(DOMStoreReadTransaction readTx, YangInstanceIdentifier path,
-            NormalizedNode<?, ?> expNode) throws Exception {
+    private static void verifyNode(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path,
+            final NormalizedNode<?, ?> expNode) throws Exception {
         Optional<NormalizedNode<?, ?>> optional = readTx.read(path).get(5, TimeUnit.SECONDS);
         assertEquals("isPresent", true, optional.isPresent());
         assertEquals("Data node", expNode, optional.get());
     }
 
-    private static void verifyExists(DOMStoreReadTransaction readTx, YangInstanceIdentifier path) throws Exception {
+    private static void verifyExists(final DOMStoreReadTransaction readTx, final YangInstanceIdentifier path)
+            throws Exception {
         Boolean exists = readTx.exists(path).get(5, TimeUnit.SECONDS);
         assertEquals("exists", true, exists);
     }
@@ -250,7 +252,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
 
         ActorSystem newSystem = ActorSystem.create("reinstated-member2", ConfigFactory.load().getConfig("Member2"));
 
-        try (DistributedDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
+        try (AbstractDataStore member2Datastore = new IntegrationTestKit(newSystem, leaderDatastoreContextBuilder)
                 .setupDistributedDataStore(testName, "module-shards-member2", true, CARS_AND_PEOPLE)) {
             verifyCars(member2Datastore.newReadOnlyTransaction(), car2);
         }
@@ -538,7 +540,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit newMember1TestKit = new IntegrationTestKit(leaderSystem, newMember1Builder);
 
-        try (DistributedDataStore ds =
+        try (AbstractDataStore ds =
                 newMember1TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_ONLY_1_2, false, CARS)) {
 
             followerTestKit.waitUntilLeader(followerDistributedDataStore.getActorContext(), CARS);
@@ -805,7 +807,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         initDatastores(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, CARS_AND_PEOPLE);
 
         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, followerDatastoreContextBuilder);
-        try (DistributedDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
+        try (AbstractDataStore follower2DistributedDataStore = follower2TestKit.setupDistributedDataStore(testName,
                 MODULE_SHARDS_CARS_PEOPLE_1_2_3, false)) {
 
             // Create and submit a couple tx's so they're pending.
@@ -980,7 +982,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
                 .shardHeartbeatIntervalInMillis(100).shardElectionTimeoutFactor(5);
         IntegrationTestKit follower2TestKit = new IntegrationTestKit(follower2System, follower2DatastoreContextBuilder);
 
-        try (DistributedDataStore ds =
+        try (AbstractDataStore ds =
                 follower2TestKit.setupDistributedDataStore(testName, MODULE_SHARDS_CARS_PEOPLE_1_2_3, false, CARS)) {
 
             followerTestKit.waitForMembersUp("member-1", "member-3");
@@ -1008,7 +1010,7 @@ public class DistributedDataStoreRemotingIntegrationTest extends AbstractTest {
         }
     }
 
-    private static void sendDatastoreContextUpdate(DistributedDataStore dataStore, final Builder builder) {
+    private static void sendDatastoreContextUpdate(final AbstractDataStore dataStore, final Builder builder) {
         final Builder newBuilder = DatastoreContext.newBuilderFrom(builder.build());
         DatastoreContextFactory mockContextFactory = Mockito.mock(DatastoreContextFactory.class);
         Answer<DatastoreContext> answer = invocation -> newBuilder.build();
index f09441e..1d501e1 100644 (file)
@@ -49,7 +49,7 @@ public class IntegrationTestKit extends ShardTestKit {
     protected DatastoreContext.Builder datastoreContextBuilder;
     protected DatastoreSnapshot restoreFromSnapshot;
 
-    public IntegrationTestKit(ActorSystem actorSystem, Builder datastoreContextBuilder) {
+    public IntegrationTestKit(final ActorSystem actorSystem, final Builder datastoreContextBuilder) {
         super(actorSystem);
         this.datastoreContextBuilder = datastoreContextBuilder;
     }
@@ -58,23 +58,23 @@ public class IntegrationTestKit extends ShardTestKit {
         return datastoreContextBuilder;
     }
 
-    public DistributedDataStore setupDistributedDataStore(String typeName, String... shardNames) {
+    public AbstractDataStore setupDistributedDataStore(final String typeName, final String... shardNames) {
         return setupDistributedDataStore(typeName, "module-shards.conf", true, SchemaContextHelper.full(), shardNames);
     }
 
-    public DistributedDataStore setupDistributedDataStore(String typeName, boolean waitUntilLeader,
-            String... shardNames) {
+    public AbstractDataStore setupDistributedDataStore(final String typeName, final boolean waitUntilLeader,
+            final String... shardNames) {
         return setupDistributedDataStore(typeName, "module-shards.conf", waitUntilLeader,
                 SchemaContextHelper.full(), shardNames);
     }
 
-    public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+    public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
             final boolean waitUntilLeader, final String... shardNames) {
         return setupDistributedDataStore(typeName, moduleShardsConfig, waitUntilLeader,
                 SchemaContextHelper.full(), shardNames);
     }
 
-    public DistributedDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
+    public AbstractDataStore setupDistributedDataStore(final String typeName, final String moduleShardsConfig,
             final boolean waitUntilLeader, final SchemaContext schemaContext, final String... shardNames) {
         final ClusterWrapper cluster = new ClusterWrapperImpl(getSystem());
         final Configuration config = new ConfigurationImpl(moduleShardsConfig, "modules.conf");
@@ -86,7 +86,7 @@ public class IntegrationTestKit extends ShardTestKit {
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getBaseDatastoreContext();
         Mockito.doReturn(datastoreContext).when(mockContextFactory).getShardDatastoreContext(Mockito.anyString());
 
-        DistributedDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
+        AbstractDataStore dataStore = new DistributedDataStore(getSystem(), cluster, config, mockContextFactory,
                 restoreFromSnapshot);
 
         dataStore.onGlobalContextUpdated(schemaContext);
@@ -99,7 +99,7 @@ public class IntegrationTestKit extends ShardTestKit {
         return dataStore;
     }
 
-    public void waitUntilLeader(ActorContext actorContext, String... shardNames) {
+    public void waitUntilLeader(final ActorContext actorContext, final String... shardNames) {
         for (String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
 
@@ -109,7 +109,7 @@ public class IntegrationTestKit extends ShardTestKit {
         }
     }
 
-    public void waitUntilNoLeader(ActorContext actorContext, String... shardNames) {
+    public void waitUntilNoLeader(final ActorContext actorContext, final String... shardNames) {
         for (String shardName: shardNames) {
             ActorRef shard = findLocalShard(actorContext, shardName);
             assertNotNull("No local shard found for " + shardName, shard);
@@ -118,7 +118,7 @@ public class IntegrationTestKit extends ShardTestKit {
         }
     }
 
-    public void waitForMembersUp(String... otherMembers) {
+    public void waitForMembersUp(final String... otherMembers) {
         Set<String> otherMembersSet = Sets.newHashSet(otherMembers);
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
@@ -136,7 +136,7 @@ public class IntegrationTestKit extends ShardTestKit {
         fail("Member(s) " + otherMembersSet + " are not Up");
     }
 
-    public static ActorRef findLocalShard(ActorContext actorContext, String shardName) {
+    public static ActorRef findLocalShard(final ActorContext actorContext, final String shardName) {
         ActorRef shard = null;
         for (int i = 0; i < 20 * 5 && shard == null; i++) {
             Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
@@ -148,8 +148,8 @@ public class IntegrationTestKit extends ShardTestKit {
         return shard;
     }
 
-    public static void verifyShardStats(DistributedDataStore datastore, String shardName, ShardStatsVerifier verifier)
-            throws Exception {
+    public static void verifyShardStats(final AbstractDataStore datastore, final String shardName,
+            final ShardStatsVerifier verifier) throws Exception {
         ActorContext actorContext = datastore.getActorContext();
 
         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
@@ -173,8 +173,8 @@ public class IntegrationTestKit extends ShardTestKit {
         throw lastError;
     }
 
-    void testWriteTransaction(DistributedDataStore dataStore, YangInstanceIdentifier nodePath,
-            NormalizedNode<?, ?> nodeToWrite) throws Exception {
+    void testWriteTransaction(final AbstractDataStore dataStore, final YangInstanceIdentifier nodePath,
+            final NormalizedNode<?, ?> nodeToWrite) throws Exception {
 
         // 1. Create a write-only Tx
 
@@ -218,7 +218,7 @@ public class IntegrationTestKit extends ShardTestKit {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    void assertExceptionOnCall(Callable<Void> callable, Class<? extends Exception> expType)
+    void assertExceptionOnCall(final Callable<Void> callable, final Class<? extends Exception> expType)
             throws Exception {
         try {
             callable.call();
@@ -229,13 +229,10 @@ public class IntegrationTestKit extends ShardTestKit {
     }
 
     void assertExceptionOnTxChainCreates(final DOMStoreTransactionChain txChain,
-            Class<? extends Exception> expType) throws Exception {
-        assertExceptionOnCall(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                txChain.newWriteOnlyTransaction();
-                return null;
-            }
+            final Class<? extends Exception> expType) throws Exception {
+        assertExceptionOnCall(() -> {
+            txChain.newWriteOnlyTransaction();
+            return null;
         }, expType);
 
         assertExceptionOnCall(() -> {
index db6c5e6..bb2ce19 100644 (file)
@@ -50,8 +50,8 @@ public class MemberNode {
     static final Address MEMBER_1_ADDRESS = AddressFromURIString.parse("akka.tcp://cluster-test@127.0.0.1:2558");
 
     private IntegrationTestKit kit;
-    private DistributedDataStore configDataStore;
-    private DistributedDataStore operDataStore;
+    private AbstractDataStore configDataStore;
+    private AbstractDataStore operDataStore;
     private DatastoreContext.Builder datastoreContextBuilder;
     private boolean cleanedUp;
 
@@ -62,7 +62,7 @@ public class MemberNode {
      *                callers to cleanup instances on test completion.
      * @return a Builder instance
      */
-    public static Builder builder(List<MemberNode> members) {
+    public static Builder builder(final List<MemberNode> members) {
         return new Builder(members);
     }
 
@@ -71,12 +71,12 @@ public class MemberNode {
     }
 
 
-    public DistributedDataStore configDataStore() {
+    public AbstractDataStore configDataStore() {
         return configDataStore;
     }
 
 
-    public DistributedDataStore operDataStore() {
+    public AbstractDataStore operDataStore() {
         return operDataStore;
     }
 
@@ -84,11 +84,11 @@ public class MemberNode {
         return datastoreContextBuilder;
     }
 
-    public void waitForMembersUp(String... otherMembers) {
+    public void waitForMembersUp(final String... otherMembers) {
         kit.waitForMembersUp(otherMembers);
     }
 
-    public void waitForMemberDown(String member) {
+    public void waitForMemberDown(final String member) {
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 10) {
             CurrentClusterState state = Cluster.get(kit.getSystem()).state();
@@ -124,8 +124,8 @@ public class MemberNode {
         }
     }
 
-    public static void verifyRaftState(DistributedDataStore datastore, String shardName, RaftStateVerifier verifier)
-            throws Exception {
+    public static void verifyRaftState(final AbstractDataStore datastore, final String shardName,
+            final RaftStateVerifier verifier) throws Exception {
         ActorContext actorContext = datastore.getActorContext();
 
         Future<ActorRef> future = actorContext.findLocalShardAsync(shardName);
@@ -149,8 +149,8 @@ public class MemberNode {
         throw lastError;
     }
 
-    public static void verifyRaftPeersPresent(DistributedDataStore datastore, final String shardName,
-            String... peerMemberNames) throws Exception {
+    public static void verifyRaftPeersPresent(final AbstractDataStore datastore, final String shardName,
+            final String... peerMemberNames) throws Exception {
         final Set<String> peerIds = Sets.newHashSet();
         for (String p: peerMemberNames) {
             peerIds.add(ShardIdentifier.create(shardName, MemberName.forName(p),
@@ -161,7 +161,7 @@ public class MemberNode {
             raftState.getPeerAddresses().keySet()));
     }
 
-    public static void verifyNoShardPresent(DistributedDataStore datastore, String shardName) {
+    public static void verifyNoShardPresent(final AbstractDataStore datastore, final String shardName) {
         Stopwatch sw = Stopwatch.createStarted();
         while (sw.elapsed(TimeUnit.SECONDS) <= 5) {
             Optional<ActorRef> shardReply = datastore.getActorContext().findLocalShard(shardName);
@@ -186,7 +186,7 @@ public class MemberNode {
         private DatastoreContext.Builder datastoreContextBuilder = DatastoreContext.newBuilder()
                 .shardHeartbeatIntervalInMillis(300).shardElectionTimeoutFactor(30);
 
-        Builder(List<MemberNode> members) {
+        Builder(final List<MemberNode> members) {
             this.members = members;
         }
 
@@ -195,7 +195,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder moduleShardsConfig(String newModuleShardsConfig) {
+        public Builder moduleShardsConfig(final String newModuleShardsConfig) {
             this.moduleShardsConfig = newModuleShardsConfig;
             return this;
         }
@@ -205,7 +205,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder akkaConfig(String newAkkaConfig) {
+        public Builder akkaConfig(final String newAkkaConfig) {
             this.akkaConfig = newAkkaConfig;
             return this;
         }
@@ -215,7 +215,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder testName(String newTestName) {
+        public Builder testName(final String newTestName) {
             this.testName = newTestName;
             return this;
         }
@@ -225,7 +225,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder waitForShardLeader(String... shardNames) {
+        public Builder waitForShardLeader(final String... shardNames) {
             this.waitForshardLeader = shardNames;
             return this;
         }
@@ -235,7 +235,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder createOperDatastore(boolean value) {
+        public Builder createOperDatastore(final boolean value) {
             this.createOperDatastore = value;
             return this;
         }
@@ -245,7 +245,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder schemaContext(SchemaContext newSchemaContext) {
+        public Builder schemaContext(final SchemaContext newSchemaContext) {
             this.schemaContext = newSchemaContext;
             return this;
         }
@@ -255,7 +255,7 @@ public class MemberNode {
          *
          * @return this Builder
          */
-        public Builder datastoreContextBuilder(DatastoreContext.Builder builder) {
+        public Builder datastoreContextBuilder(final DatastoreContext.Builder builder) {
             datastoreContextBuilder = builder;
             return this;
         }
index 0bc811d..6ff4ad2 100644 (file)
@@ -44,8 +44,8 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
-import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
 import org.opendaylight.controller.cluster.datastore.IntegrationTestKit;
 import org.opendaylight.controller.cluster.datastore.MemberNode;
 import org.opendaylight.controller.cluster.datastore.entityownership.selectionstrategy.EntityOwnerSelectionStrategyConfig;
@@ -122,7 +122,7 @@ public class DistributedEntityOwnershipIntegrationTest {
         memberNodes.clear();
     }
 
-    private static DistributedEntityOwnershipService newOwnershipService(final DistributedDataStore datastore) {
+    private static DistributedEntityOwnershipService newOwnershipService(final AbstractDataStore datastore) {
         return DistributedEntityOwnershipService.start(datastore.getActorContext(),
                 EntityOwnerSelectionStrategyConfig.newBuilder().build());
     }
@@ -142,7 +142,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
 
-        DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+        AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
 
         leaderDistributedDataStore.waitTillReady();
         follower1Node.configDataStore().waitTillReady();
@@ -283,7 +283,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
 
-        DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+        AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
 
         leaderDistributedDataStore.waitTillReady();
         follower1Node.configDataStore().waitTillReady();
@@ -368,7 +368,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(MODULE_SHARDS_5_NODE_CONFIG).schemaContext(SCHEMA_CONTEXT)
                 .createOperDatastore(false).datastoreContextBuilder(followerDatastoreContextBuilder).build();
 
-        DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+        AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
 
         leaderDistributedDataStore.waitTillReady();
         follower1Node.configDataStore().waitTillReady();
@@ -457,7 +457,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
 
-        DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+        AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
 
         leaderDistributedDataStore.waitTillReady();
         follower1Node.configDataStore().waitTillReady();
@@ -538,7 +538,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
                 .datastoreContextBuilder(leaderDatastoreContextBuilder).build();
 
-        DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+        AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
         final DOMEntityOwnershipService leaderEntityOwnershipService = newOwnershipService(leaderDistributedDataStore);
 
         leaderNode.kit().waitUntilLeader(leaderNode.configDataStore().getActorContext(), ENTITY_OWNERSHIP_SHARD_NAME);
@@ -547,7 +547,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(moduleShardsConfig).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
 
-        DistributedDataStore follower1DistributedDataStore = follower1Node.configDataStore();
+        AbstractDataStore follower1DistributedDataStore = follower1Node.configDataStore();
         follower1DistributedDataStore.waitTillReady();
 
         leaderNode.waitForMembersUp("member-2");
@@ -616,7 +616,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
 
-        DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+        AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
 
         leaderDistributedDataStore.waitTillReady();
         follower1Node.configDataStore().waitTillReady();
@@ -659,7 +659,7 @@ public class DistributedEntityOwnershipIntegrationTest {
                 .moduleShardsConfig(MODULE_SHARDS_CONFIG).schemaContext(SCHEMA_CONTEXT).createOperDatastore(false)
                 .datastoreContextBuilder(followerDatastoreContextBuilder).build();
 
-        DistributedDataStore leaderDistributedDataStore = leaderNode.configDataStore();
+        AbstractDataStore leaderDistributedDataStore = leaderNode.configDataStore();
 
         leaderDistributedDataStore.waitTillReady();
         follower1Node.configDataStore().waitTillReady();
@@ -694,7 +694,7 @@ public class DistributedEntityOwnershipIntegrationTest {
         assertEquals("EntityOwnershipState", expState, state.get());
     }
 
-    private static void verifyCandidates(final DistributedDataStore dataStore, final DOMEntity entity,
+    private static void verifyCandidates(final AbstractDataStore dataStore, final DOMEntity entity,
             final String... expCandidates) throws Exception {
         AssertionError lastError = null;
         Stopwatch sw = Stopwatch.createStarted();
@@ -721,7 +721,7 @@ public class DistributedEntityOwnershipIntegrationTest {
     }
 
     @SuppressWarnings("checkstyle:IllegalCatch")
-    private static void verifyOwner(final DistributedDataStore dataStore, final DOMEntity entity,
+    private static void verifyOwner(final AbstractDataStore dataStore, final DOMEntity entity,
             final String expOwner) {
         AbstractEntityOwnershipTest.verifyOwner(expOwner, entity.getType(), entity.getIdentifier(), path -> {
             try {
index 711706a..aac4b23 100644 (file)
@@ -39,6 +39,7 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.access.concepts.MemberName;
+import org.opendaylight.controller.cluster.datastore.AbstractDataStore;
 import org.opendaylight.controller.cluster.datastore.DatastoreContext;
 import org.opendaylight.controller.cluster.datastore.DatastoreContextFactory;
 import org.opendaylight.controller.cluster.datastore.DistributedDataStore;
@@ -80,7 +81,7 @@ public class DistributedEntityOwnershipServiceTest extends AbstractEntityOwnersh
     static int ID_COUNTER = 1;
 
     private final String dataStoreName = "config" + ID_COUNTER++;
-    private DistributedDataStore dataStore;
+    private AbstractDataStore dataStore;
 
     @Before
     public void setUp() {

©2013 OpenDaylight, A Linux Foundation Collaborative Project. All Rights Reserved.
OpenDaylight is a registered trademark of The OpenDaylight Project, Inc.
Linux Foundation and OpenDaylight are registered trademarks of the Linux Foundation.
Linux is a registered trademark of Linus Torvalds.