Bug 1598: Cleanup stale ShardReadTransactions 75/10175/10
authortpantelis <tpanteli@brocade.com>
Fri, 22 Aug 2014 01:00:50 +0000 (21:00 -0400)
committertpantelis <tpanteli@brocade.com>
Mon, 25 Aug 2014 11:37:22 +0000 (07:37 -0400)
For read-only Tx, a FinalizablePhantomReference is created with the
TransacionProxy instance as the referent and is added to a static
FinalizableReferenceQueue. The FinalizablePhantomReference is subclassed
to hold the TransactionProxy's remoteTransactionPaths map. When the
TransactionProxy instance in GC'ed, the FinalizablePhantomReference is
notified and calls closeTransaction on each TransactionContext to clean it up.

Also to handle potentially stale Tx's due to disconnects, nodes prematurely
shutting down etc, I set set an idle time out (default 10 min) on ShardTransaction
actors via setReceiveTimeout. If the actor is idle with no messages
after the timeout, the actor self-destructs.

I made the idle timeout configurable via the config yang file. This
setting needs to be passed down from the module, thru the ShardManager
to the Shards to the ShardTransactions. I created a ShardCOntext class
to hold the idle timeout and other data that's passed down. This will
also make it easier in the future if additonal config data needs to be
passed down.

In the config yang file, I created a data-store-properties grouping to
avoid having to duplicate the config properties for the operational and
config data stores.

Many unit tests fail when running from Eclipse due to creating anonymous inner
Creator class instances in the static props methods. The akka code doesn't like this -
if the Creator instance class is enclosed in another class it expects
the class to be static. However this runs fine when running unit tests
from mvn and when running the production controller. I suspect the
difference is b/c the JDK compiler generates the anonymous class as
static since they're enclosed in static methods but the Eclipse compiler
doesn't. Anyway, to avoid this I refactored all the anonymous inner
Creator classes to private static classes. I think this is safer and
will avoid potential future issues with different JDK compilers or JDK upgrades.

Change-Id: Ie644612cb34e7219dc089b8add6d397a11bffdda
Signed-off-by: tpantelis <tpanteli@brocade.com>
34 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index 1dab285679474378b47c1c0a1c488ebf89d66fea..b3283a18b1baaf4c3c7530570b8ae8a09512211f 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
@@ -18,20 +20,14 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class DataChangeListener extends AbstractUntypedActor {
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private final SchemaContext schemaContext;
-    private final YangInstanceIdentifier pathId;
-    private boolean notificationsEnabled = false;
+    private volatile boolean notificationsEnabled = false;
 
-    public DataChangeListener(SchemaContext schemaContext,
-                              AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
-
-        this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier,
+                                                      NormalizedNode<?, ?>> listener) {
         this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
-        this.pathId  = Preconditions.checkNotNull(pathId, "pathId should not be null");
     }
 
     @Override public void handleReceive(Object message) throws Exception {
@@ -63,14 +59,24 @@ public class DataChangeListener extends AbstractUntypedActor {
         }
     }
 
-    public static Props props(final SchemaContext schemaContext, final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final YangInstanceIdentifier pathId) {
-        return Props.create(new Creator<DataChangeListener>() {
-            @Override
-            public DataChangeListener create() throws Exception {
-                return new DataChangeListener(schemaContext,listener,pathId );
-            }
+    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
+                                                            NormalizedNode<?, ?>> listener) {
+        return Props.create(new DataChangeListenerCreator(listener));
+    }
+
+    private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
+        private static final long serialVersionUID = 1L;
+
+        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
 
-        });
+        DataChangeListenerCreator(
+                AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+            this.listener = listener;
+        }
 
+        @Override
+        public DataChangeListener create() throws Exception {
+            return new DataChangeListener(listener);
+        }
     }
 }
index 6d835498afa2af24ec915e3b010e67710f685d51..a1b6b9252eb452082f101fe1bb748fb2d6b28536 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
+
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -24,13 +26,13 @@ public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInst
     private final ActorSelection dataChangeListenerActor;
     private final SchemaContext schemaContext;
 
-    public DataChangeListenerProxy(SchemaContext schemaContext,ActorSelection dataChangeListenerActor) {
+    public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
         this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
         this.schemaContext = schemaContext;
     }
 
     @Override public void onDataChanged(
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(schemaContext,change), null);
+        dataChangeListenerActor.tell(new DataChanged(schemaContext, change), null);
     }
 }
index 9e50b5b332babf4bcc033aeea3778fed850681d3..818f73392d1d880ad0dde5d84704c1e4c9d15c17 100644 (file)
@@ -11,19 +11,21 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class DataChangeListenerRegistration extends AbstractUntypedActor {
 
-    private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+    private final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
         registration;
 
     public DataChangeListenerRegistration(
-        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
         this.registration = registration;
     }
 
@@ -36,14 +38,8 @@ public class DataChangeListenerRegistration extends AbstractUntypedActor {
     }
 
     public static Props props(
-        final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-        return Props.create(new Creator<DataChangeListenerRegistration>() {
-
-            @Override
-            public DataChangeListenerRegistration create() throws Exception {
-                return new DataChangeListenerRegistration(registration);
-            }
-        });
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        return Props.create(new DataChangeListenerRegistrationCreator(registration));
     }
 
     private void closeListenerRegistration(
@@ -53,4 +49,21 @@ public class DataChangeListenerRegistration extends AbstractUntypedActor {
             .tell(new CloseDataChangeListenerRegistrationReply().toSerializable(), getSelf());
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
+
+    private static class DataChangeListenerRegistrationCreator
+                                            implements Creator<DataChangeListenerRegistration> {
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                           NormalizedNode<?, ?>>> registration;
+
+        DataChangeListenerRegistrationCreator(
+                ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                             NormalizedNode<?, ?>>> registration) {
+            this.registration = registration;
+        }
+
+        @Override
+        public DataChangeListenerRegistration create() throws Exception {
+            return new DataChangeListenerRegistration(registration);
+        }
+    }
 }
index 404a4e02033ea1c89f9fada4135cfe0e4b6a935e..51f3735f81ee6a9f4a5ad192cb6d1f1c27ebd031 100644 (file)
@@ -8,14 +8,16 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -34,6 +36,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.duration.Duration;
+
 /**
  *
  */
@@ -42,11 +46,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
-
-    private SchemaContext schemaContext;
+    private final ShardContext shardContext;
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
-            Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
@@ -57,13 +60,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
-        this.actorContext = new ActorContext(actorSystem, actorSystem
-            .actorOf(ShardManager.props(type, cluster, configuration, dataStoreProperties),
+        shardContext = new ShardContext(InMemoryDOMDataStoreConfigProperties.create(
+                dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
+                dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
+                dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
+                Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
+                        TimeUnit.MINUTES));
+
+        actorContext = new ActorContext(actorSystem, actorSystem
+            .actorOf(ShardManager.props(type, cluster, configuration, shardContext),
                 shardManagerId ), cluster, configuration);
     }
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.shardContext = new ShardContext();
     }
 
 
@@ -80,7 +91,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props(schemaContext,listener,path ));
+            DataChangeListener.props(listener ));
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
@@ -104,35 +115,31 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext, schemaContext);
+        return new TransactionChainProxy(actorContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
-    @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
-        this.schemaContext = schemaContext;
-        actorContext.getShardManager().tell(
-            new UpdateSchemaContext(schemaContext), null);
+    @Override
+    public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        actorContext.setSchemaContext(schemaContext);
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         actorContext.shutdown();
-
     }
 }
index a1a3e87510e78d151b769fd579c2602503ec656c..65a39a60e6c0819fe7f14543c268997a53fddff0 100644 (file)
@@ -11,12 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            DistributedDataStoreProperties dataStoreProperties) {
 
         ActorSystem actorSystem = ActorSystemFactory.getInstance();
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
new file mode 100644 (file)
index 0000000..eb6a536
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Wrapper class for DistributedDataStore configuration properties.
+ *
+ * @author Thomas Pantelis
+ */
+public class DistributedDataStoreProperties {
+    private final int maxShardDataChangeListenerQueueSize;
+    private final int maxShardDataChangeExecutorQueueSize;
+    private final int maxShardDataChangeExecutorPoolSize;
+    private final int shardTransactionIdleTimeoutInMinutes;
+
+    public DistributedDataStoreProperties() {
+        maxShardDataChangeListenerQueueSize = 1000;
+        maxShardDataChangeExecutorQueueSize = 1000;
+        maxShardDataChangeExecutorPoolSize = 20;
+        shardTransactionIdleTimeoutInMinutes = 10;
+    }
+
+    public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
+            int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
+            int shardTransactionIdleTimeoutInMinutes) {
+        this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+        this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+        this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
+        this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+    }
+
+    public int getMaxShardDataChangeListenerQueueSize() {
+        return maxShardDataChangeListenerQueueSize;
+    }
+
+    public int getMaxShardDataChangeExecutorQueueSize() {
+        return maxShardDataChangeExecutorQueueSize;
+    }
+
+    public int getMaxShardDataChangeExecutorPoolSize() {
+        return maxShardDataChangeExecutorPoolSize;
+    }
+
+    public int getShardTransactionIdleTimeoutInMinutes() {
+        return shardTransactionIdleTimeoutInMinutes;
+    }
+}
index 75f540ade088e6bb45b10e9b92bc4af8789b0218..abcde747b93b132f8c492f25ac8af43f96c84a26 100644 (file)
@@ -15,11 +15,13 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.serialization.Serialization;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -43,14 +45,15 @@ import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -89,17 +92,20 @@ public class Shard extends RaftActor {
     /// The name of this shard
     private final ShardIdentifier name;
 
-    private volatile SchemaContext schemaContext;
-
     private final ShardStats shardMBean;
 
     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
 
+    private final ShardContext shardContext;
+
+    private SchemaContext schemaContext;
+
     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-            InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            ShardContext shardContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
 
         this.name = name;
+        this.shardContext = shardContext;
 
         String setting = System.getProperty("shard.persistent");
 
@@ -107,7 +113,8 @@ public class Shard extends RaftActor {
 
         LOG.info("Shard created : {} persistent : {}", name, persistent);
 
-        store = InMemoryDOMDataStoreFactory.create(name.toString(), null, dataStoreProperties);
+        store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
+                shardContext.getDataStoreProperties());
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
 
@@ -125,27 +132,16 @@ public class Shard extends RaftActor {
         return map;
     }
 
-
-
-
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
-        final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+        ShardContext shardContext) {
         Preconditions.checkNotNull(name, "name should not be null");
-        Preconditions
-            .checkNotNull(peerAddresses, "peerAddresses should not be null");
+        Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+        Preconditions.checkNotNull(shardContext, "shardContext should not be null");
 
-        return Props.create(new Creator<Shard>() {
-
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(name, peerAddresses, dataStoreProperties);
-            }
-
-        });
+        return Props.create(new ShardCreator(name, peerAddresses, shardContext));
     }
 
-
     @Override public void onReceiveCommand(Object message) {
         LOG.debug("Received message {} from {}", message.getClass().toString(),
             getSender());
@@ -188,9 +184,8 @@ public class Shard extends RaftActor {
             shardMBean.incrementReadOnlyTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction
-                    .props(store.newReadOnlyTransaction(), getSelf(),
-                        schemaContext), transactionId.toString());
+                ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
+                        schemaContext, shardContext), transactionId.toString());
 
         } else if (createTransaction.getTransactionType()
             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
@@ -198,9 +193,8 @@ public class Shard extends RaftActor {
             shardMBean.incrementReadWriteTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction
-                    .props(store.newReadWriteTransaction(), getSelf(),
-                        schemaContext), transactionId.toString());
+                ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
+                        schemaContext, shardContext), transactionId.toString());
 
 
         } else if (createTransaction.getTransactionType()
@@ -209,9 +203,8 @@ public class Shard extends RaftActor {
             shardMBean.incrementWriteOnlyTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction
-                    .props(store.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext), transactionId.toString());
+                ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
+                        schemaContext, shardContext), transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -269,12 +262,14 @@ public class Shard extends RaftActor {
         final ActorRef self = getSelf();
 
         Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
             public void onSuccess(Void v) {
                sender.tell(new CommitTransactionReply().toSerializable(),self);
                shardMBean.incrementCommittedTransactionCount();
                shardMBean.setLastCommittedTransactionTime(new Date());
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during commit");
                 shardMBean.incrementFailedTransactionsCount();
@@ -327,12 +322,10 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-            listener =
-            new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+            listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
 
-        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            registration =
-            store.registerChangeListener(registerChangeListener.getPath(),
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            registration = store.registerChangeListener(registerChangeListener.getPath(),
                 listener, registerChangeListener.getScope());
         ActorRef listenerRegistration =
             getContext().actorOf(
@@ -349,12 +342,9 @@ public class Shard extends RaftActor {
 
     private void createTransactionChain() {
         DOMStoreTransactionChain chain = store.createTransactionChain();
-        ActorRef transactionChain =
-            getContext().actorOf(
-                ShardTransactionChain.props(chain, schemaContext));
-        getSender()
-            .tell(new CreateTransactionChainReply(transactionChain.path())
-                    .toSerializable(),
+        ActorRef transactionChain = getContext().actorOf(
+                ShardTransactionChain.props(chain, schemaContext, shardContext));
+        getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
                 getSelf());
     }
 
@@ -426,4 +416,25 @@ public class Shard extends RaftActor {
             return HEART_BEAT_INTERVAL;
         }
     }
+
+    private static class ShardCreator implements Creator<Shard> {
+
+        private static final long serialVersionUID = 1L;
+
+        final ShardIdentifier name;
+        final Map<ShardIdentifier, String> peerAddresses;
+        final ShardContext shardContext;
+
+        ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+                ShardContext shardContext) {
+            this.name = name;
+            this.peerAddresses = peerAddresses;
+            this.shardContext = shardContext;
+        }
+
+        @Override
+        public Shard create() throws Exception {
+            return new Shard(name, peerAddresses, shardContext);
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardContext.java
new file mode 100644 (file)
index 0000000..02bff77
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import com.google.common.base.Preconditions;
+
+import scala.concurrent.duration.Duration;
+
+/**
+ * Contains contextual data for shards.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardContext {
+
+    private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+    private final Duration shardTransactionIdleTimeout;
+
+    public ShardContext() {
+        this.dataStoreProperties = null;
+        this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+    }
+
+    public ShardContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+            Duration shardTransactionIdleTimeout) {
+        this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
+        this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+    }
+
+    public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
+        return dataStoreProperties;
+    }
+
+    public Duration getShardTransactionIdleTimeout() {
+        return shardTransactionIdleTimeout;
+    }
+}
index 2972772a4840cf5117ef3b9c9ca2cb4947d2352c..c9b7c07e9a3a44d1b0c64425122e53a5b3199543 100644 (file)
@@ -17,7 +17,9 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
+
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -30,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 
 import scala.concurrent.duration.Duration;
 
@@ -70,19 +71,19 @@ public class ShardManager extends AbstractUntypedActor {
 
     private ShardManagerInfoMBean mBean;
 
-    private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+    private final ShardContext shardContext;
 
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
-            InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            ShardContext shardContext) {
 
         this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.dataStoreProperties = dataStoreProperties;
+        this.shardContext = shardContext;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -95,22 +96,15 @@ public class ShardManager extends AbstractUntypedActor {
     public static Props props(final String type,
         final ClusterWrapper cluster,
         final Configuration configuration,
-        final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+        final ShardContext shardContext) {
 
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
 
-        return Props.create(new Creator<ShardManager>() {
-
-            @Override
-            public ShardManager create() throws Exception {
-                return new ShardManager(type, cluster, configuration, dataStoreProperties);
-            }
-        });
+        return Props.create(new ShardManagerCreator(type, cluster, configuration, shardContext));
     }
 
-
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
@@ -250,7 +244,7 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
+                .actorOf(Shard.props(shardId, peerAddresses, shardContext),
                     shardId.toString());
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
@@ -351,6 +345,28 @@ public class ShardManager extends AbstractUntypedActor {
             }
         }
     }
+
+    private static class ShardManagerCreator implements Creator<ShardManager> {
+        private static final long serialVersionUID = 1L;
+
+        final String type;
+        final ClusterWrapper cluster;
+        final Configuration configuration;
+        final ShardContext shardContext;
+
+        ShardManagerCreator(String type, ClusterWrapper cluster,
+                Configuration configuration, ShardContext shardContext) {
+            this.type = type;
+            this.cluster = cluster;
+            this.configuration = configuration;
+            this.shardContext = shardContext;
+        }
+
+        @Override
+        public ShardManager create() throws Exception {
+            return new ShardManager(type, cluster, configuration, shardContext);
+        }
+    }
 }
 
 
index 1328d466f34b6fff82a91b98379cd815b488155c..c54d3739b22c3b42d2a93c368afd3eb947ce681b 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -27,40 +23,27 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardReadTransaction extends ShardTransaction {
-  private final DOMStoreReadTransaction transaction;
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-
-  public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(shardActor, schemaContext);
-    this.transaction = transaction;
-
-  }
+    private final DOMStoreReadTransaction transaction;
 
-  public ShardReadTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(transactionChain, shardActor, schemaContext);
-    this.transaction = transaction;
-  }
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-        readData(transaction, ReadData.fromSerializable(message));
-    } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-        dataExists(transaction, DataExists.fromSerializable(message));
-    } else {
-      super.handleReceive(message);
+    public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext) {
+        super(shardActor, schemaContext);
+        this.transaction = transaction;
     }
-  }
-  protected void closeTransaction(CloseTransaction message) {
-    transaction.close();
-    getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
-    getSelf().tell(PoisonPill.getInstance(), getSelf());
-  }
 
-  //default scope test method to check if we get correct exception
-  void forUnitTestOnlyExplicitTransactionClose(){
-      transaction.close();
-  }
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readData(transaction, ReadData.fromSerializable(message));
+        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            dataExists(transaction, DataExists.fromSerializable(message));
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
+    @Override
+    protected DOMStoreTransaction getDOMStoreTransaction() {
+        return transaction;
+    }
 }
index 49c7b7e78f52295e9b3b1fc5b1ae2ea10926def2..1dc9ce0a3ecd99f552c13cf19ec9ff393ebdfd51 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -23,7 +19,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -31,50 +27,35 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardReadWriteTransaction extends ShardTransaction {
-  private final DOMStoreReadWriteTransaction transaction;
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-  public ShardReadWriteTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(transactionChain,  shardActor, schemaContext);
-    this.transaction = transaction;
-  }
-
-  public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super( shardActor, schemaContext);
-    this.transaction = transaction;
-  }
+    private final DOMStoreReadWriteTransaction transaction;
 
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      readData(transaction,ReadData.fromSerializable(message));
-    }else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      writeData(transaction, WriteData.fromSerializable(message, schemaContext));
-    } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
-    } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      deleteData(transaction,DeleteData.fromSerializable(message));
-    } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      readyTransaction(transaction,new ReadyTransaction());
-    } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-        dataExists(transaction, DataExists.fromSerializable(message));
-    }else {
-      super.handleReceive(message);
+    public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext) {
+        super(shardActor, schemaContext);
+        this.transaction = transaction;
     }
-  }
 
-  protected void closeTransaction(CloseTransaction message) {
-    transaction.close();
-    getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
-    getSelf().tell(PoisonPill.getInstance(), getSelf());
-  }
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readData(transaction, ReadData.fromSerializable(message));
+        } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            writeData(transaction, WriteData.fromSerializable(message, schemaContext));
+        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
+        } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            deleteData(transaction, DeleteData.fromSerializable(message));
+        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(transaction, new ReadyTransaction());
+        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            dataExists(transaction, DataExists.fromSerializable(message));
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
-    /**
-     * The following method is used in unit testing only
-     * hence the default scope.
-     * This is done to test out failure cases.
-     */
-    public void forUnitTestOnlyExplicitTransactionClose() {
-        transaction.close();
+    @Override
+    protected DOMStoreTransaction getDOMStoreTransaction() {
+        return transaction;
     }
 }
index 360a10722c06df2c3e357631b43d89eac801a3d3..365960713dc84754258b364765d0c0f0b794d83b 100644 (file)
@@ -9,14 +9,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -39,7 +42,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -69,142 +72,74 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public abstract class ShardTransaction extends AbstractUntypedActor {
 
-  private final ActorRef shardActor;
-  protected final SchemaContext schemaContext;
-
-  // FIXME : see below
-  // If transactionChain is not null then this transaction is part of a
-  // transactionChain. Not really clear as to what that buys us
-  private final DOMStoreTransactionChain transactionChain;
-
-
-  private final MutableCompositeModification modification =
-      new MutableCompositeModification();
-
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-
-  protected ShardTransaction(
-                          ActorRef shardActor, SchemaContext schemaContext) {
-    this(null,  shardActor, schemaContext);
-  }
-
-  protected ShardTransaction(DOMStoreTransactionChain transactionChain,
-                          ActorRef shardActor, SchemaContext schemaContext) {
-    this.transactionChain = transactionChain;
-    this.shardActor = shardActor;
-    this.schemaContext = schemaContext;
-  }
-
-
-
-  public static Props props(final DOMStoreReadTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadTransaction(transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadTransaction(transactionChain, transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreReadWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadWriteTransaction(transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadWriteTransaction(transactionChain, transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-
-  public static Props props(final DOMStoreWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardWriteTransaction(transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardWriteTransaction(transactionChain, transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-     if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
-      closeTransaction(new CloseTransaction());
-    } else if (message instanceof GetCompositedModification) {
-      // This is here for testing only
-      getSender().tell(new GetCompositeModificationReply(
-          new ImmutableCompositeModification(modification)), getSelf());
-    }else{
-         throw new UnknownMessageException(message);
+    private final ActorRef shardActor;
+    protected final SchemaContext schemaContext;
+
+    private final MutableCompositeModification modification = new MutableCompositeModification();
+
+    protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext) {
+        this.shardActor = shardActor;
+        this.schemaContext = schemaContext;
     }
-  }
 
-  abstract protected  void closeTransaction(CloseTransaction message);
+    public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext, ShardContext shardContext) {
+        return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+                shardContext));
+    }
 
-  protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
-    final ActorRef sender = getSender();
-    final ActorRef self = getSelf();
-    final YangInstanceIdentifier path = message.getPath();
-    final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-          transaction.read(path);
+    protected abstract DOMStoreTransaction getDOMStoreTransaction();
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
+            closeTransaction(true);
+        } else if (message instanceof GetCompositedModification) {
+            // This is here for testing only
+            getSender().tell(new GetCompositeModificationReply(
+                    new ImmutableCompositeModification(modification)), getSelf());
+        } else if (message instanceof ReceiveTimeout) {
+            LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+            closeTransaction(false);
+        } else {
+            throw new UnknownMessageException(message);
+        }
+    }
 
-      future.addListener(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-          if (optional.isPresent()) {
-            sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
-          } else {
-            sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
-          }
-        } catch (Exception e) {
-            sender.tell(new akka.actor.Status.Failure(e),self);
+    private void closeTransaction(boolean sendReply) {
+        getDOMStoreTransaction().close();
+
+        if(sendReply) {
+            getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
         }
 
-      }
-    }, getContext().dispatcher());
-  }
+        getSelf().tell(PoisonPill.getInstance(), getSelf());
+    }
+
+    protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
+        final ActorRef sender = getSender();
+        final ActorRef self = getSelf();
+        final YangInstanceIdentifier path = message.getPath();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+                transaction.read(path);
+
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
+                    if (optional.isPresent()) {
+                        sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
+                    } else {
+                        sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
+                    }
+                } catch (Exception e) {
+                    sender.tell(new akka.actor.Status.Failure(e),self);
+                }
+
+            }
+        }, getContext().dispatcher());
+    }
 
     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) {
         final YangInstanceIdentifier path = message.getPath();
@@ -218,71 +153,104 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
 
     }
 
-  protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
-    modification.addModification(
-        new WriteModification(message.getPath(), message.getData(),schemaContext));
-    LOG.debug("writeData at path : " + message.getPath().toString());
+    protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
+        modification.addModification(
+                new WriteModification(message.getPath(), message.getData(),schemaContext));
+        LOG.debug("writeData at path : " + message.getPath().toString());
 
-    try {
-        transaction.write(message.getPath(), message.getData());
-        getSender().tell(new WriteDataReply().toSerializable(), getSelf());
-    }catch(Exception e){
-        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        try {
+            transaction.write(message.getPath(), message.getData());
+            getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
-  }
-
-  protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
-    modification.addModification(
-        new MergeModification(message.getPath(), message.getData(), schemaContext));
-    LOG.debug("mergeData at path : " + message.getPath().toString());
-    try {
-        transaction.merge(message.getPath(), message.getData());
-        getSender().tell(new MergeDataReply().toSerializable(), getSelf());
-    }catch(Exception e){
-        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+    protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
+        modification.addModification(
+                new MergeModification(message.getPath(), message.getData(), schemaContext));
+        LOG.debug("mergeData at path : " + message.getPath().toString());
+        try {
+            transaction.merge(message.getPath(), message.getData());
+            getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
-  }
-
-  protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
-    LOG.debug("deleteData at path : " + message.getPath().toString());
-    modification.addModification(new DeleteModification(message.getPath()));
-    try {
-        transaction.delete(message.getPath());
-        getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
-    }catch(Exception e){
-        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+    protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+        LOG.debug("deleteData at path : " + message.getPath().toString());
+        modification.addModification(new DeleteModification(message.getPath()));
+        try {
+            transaction.delete(message.getPath());
+            getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
-  }
 
-  protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
-    DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
-    ActorRef cohortActor = getContext().actorOf(
-        ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
-    getSender()
+    protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
+        DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
+        ActorRef cohortActor = getContext().actorOf(
+                ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
+        getSender()
         .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
 
-  }
+    }
 
+    private static class ShardTransactionCreator implements Creator<ShardTransaction> {
 
-  // These classes are in here for test purposes only
+        private static final long serialVersionUID = 1L;
 
+        final DOMStoreTransaction transaction;
+        final ActorRef shardActor;
+        final SchemaContext schemaContext;
+        final ShardContext shardContext;
 
-  static class GetCompositedModification {
+        ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
+                SchemaContext schemaContext, ShardContext actorContext) {
+            this.transaction = transaction;
+            this.shardActor = shardActor;
+            this.shardContext = actorContext;
+            this.schemaContext = schemaContext;
+        }
 
-  }
+        @Override
+        public ShardTransaction create() throws Exception {
+            ShardTransaction tx;
+            if(transaction instanceof DOMStoreReadWriteTransaction) {
+                tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
+                        shardActor, schemaContext);
+            } else if(transaction instanceof DOMStoreReadTransaction) {
+                tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
+                        schemaContext);
+            } else {
+                tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
+                        shardActor, schemaContext);
+            }
+
+            tx.getContext().setReceiveTimeout(shardContext.getShardTransactionIdleTimeout());
+            return tx;
+        }
+    }
 
+    // These classes are in here for test purposes only
 
-  static class GetCompositeModificationReply {
-    private final CompositeModification modification;
+    static class GetCompositedModification {
+    }
 
 
-    GetCompositeModificationReply(CompositeModification modification) {
-      this.modification = modification;
-    }
+    static class GetCompositeModificationReply {
+        private final CompositeModification modification;
 
 
-    public CompositeModification getModification() {
-      return modification;
+        GetCompositeModificationReply(CompositeModification modification) {
+            this.modification = modification;
+        }
+
+
+        public CompositeModification getModification() {
+            return modification;
+        }
     }
-  }
 }
index c508255ea490ee09b370dae8a49320495166c820..42bd257ad1cdb97b3d28ab2588bbe6161894b8e9 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -24,10 +25,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class ShardTransactionChain extends AbstractUntypedActor {
 
     private final DOMStoreTransactionChain chain;
+    private final ShardContext shardContext;
     private final SchemaContext schemaContext;
 
-    public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext) {
+    public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+            ShardContext shardContext) {
         this.chain = chain;
+        this.shardContext = shardContext;
         this.schemaContext = schemaContext;
     }
 
@@ -48,23 +52,29 @@ public class ShardTransactionChain extends AbstractUntypedActor {
         return getContext().parent();
     }
 
-  private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
-    if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
-      return getContext().actorOf(
-          ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), schemaContext), transactionId);
-
-    }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
-      return getContext().actorOf(
-          ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), schemaContext), transactionId);
-
-
-    }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
-      return getContext().actorOf(
-          ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), schemaContext), transactionId);
-    }else{
-      throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
+    private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,
+            String transactionId) {
+        if(createTransaction.getTransactionType() ==
+                TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+            return getContext().actorOf(
+                    ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
+                            schemaContext, shardContext), transactionId);
+        } else if (createTransaction.getTransactionType() ==
+                TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+            return getContext().actorOf(
+                    ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
+                            schemaContext, shardContext), transactionId);
+        } else if (createTransaction.getTransactionType() ==
+                TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+            return getContext().actorOf(
+                    ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
+                            schemaContext, shardContext), transactionId);
+        } else {
+            throw new IllegalArgumentException (
+                    "CreateTransaction message has unidentified transaction type=" +
+                             createTransaction.getTransactionType());
+        }
     }
-  }
 
     private void createTransaction(CreateTransaction createTransaction) {
 
@@ -74,13 +84,28 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 getSelf());
     }
 
-    public static Props props(final DOMStoreTransactionChain chain, final SchemaContext schemaContext) {
-        return Props.create(new Creator<ShardTransactionChain>() {
+    public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+            ShardContext shardContext) {
+        return Props.create(new ShardTransactionChainCreator(chain, schemaContext, shardContext));
+    }
 
-            @Override
-            public ShardTransactionChain create() throws Exception {
-                return new ShardTransactionChain(chain, schemaContext);
-            }
-        });
+    private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
+        private static final long serialVersionUID = 1L;
+
+        final DOMStoreTransactionChain chain;
+        final ShardContext shardContext;
+        final SchemaContext schemaContext;
+
+        ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+                ShardContext shardContext) {
+            this.chain = chain;
+            this.shardContext = shardContext;
+            this.schemaContext = schemaContext;
+        }
+
+        @Override
+        public ShardTransactionChain create() throws Exception {
+            return new ShardTransactionChain(chain, schemaContext, shardContext);
+        }
     }
 }
index b01fe7d4ac11a4a0eea7cfa28b16f5bea8e7aec5..8b4d576163c94b873a3fd09ed99bd580392813bc 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -29,47 +25,31 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardWriteTransaction extends ShardTransaction {
-  private final DOMStoreWriteTransaction transaction;
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-  public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super( shardActor, schemaContext);
-    this.transaction = transaction;
-
-  }
+    private final DOMStoreWriteTransaction transaction;
 
-  public ShardWriteTransaction(DOMStoreTransactionChain transactionChain, DOMStoreWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(transactionChain, shardActor, schemaContext);
-    this.transaction = transaction;
-  }
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      writeData(transaction, WriteData.fromSerializable(message, schemaContext));
-    } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
-    } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      deleteData(transaction,DeleteData.fromSerializable(message));
-    } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      readyTransaction(transaction,new ReadyTransaction());
-    }else {
-      super.handleReceive(message);
+    public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext) {
+        super(shardActor, schemaContext);
+        this.transaction = transaction;
     }
-  }
 
-  protected void closeTransaction(CloseTransaction message) {
-    transaction.close();
-    getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
-    getSelf().tell(PoisonPill.getInstance(), getSelf());
-  }
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            writeData(transaction, WriteData.fromSerializable(message, schemaContext));
+        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
+        } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            deleteData(transaction, DeleteData.fromSerializable(message));
+        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(transaction, new ReadyTransaction());
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
-    /**
-     * The following method is used in unit testing only
-     * hence the default scope.
-     * This is done to test out failure cases.
-     */
-    public void forUnitTestOnlyExplicitTransactionClose() {
-        transaction.close();
+    @Override
+    protected DOMStoreTransaction getDOMStoreTransaction() {
+        return transaction;
     }
 }
index 34d35312838fd4346e7446161cd4f1f1c8f9cc74..d0c29294cbea9b992644817436ad21fb5f81d46b 100644 (file)
@@ -14,9 +14,11 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -46,16 +48,9 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
 
     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
         final ActorRef shardActor, final CompositeModification modification) {
-        return Props.create(new Creator<ThreePhaseCommitCohort>() {
-            @Override
-            public ThreePhaseCommitCohort create() throws Exception {
-                return new ThreePhaseCommitCohort(cohort, shardActor,
-                    modification);
-            }
-        });
+        return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification));
     }
 
-
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass()
@@ -81,12 +76,14 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ActorRef self = getSelf();
 
         Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
             public void onSuccess(Void v) {
                 sender
                     .tell(new AbortTransactionReply().toSerializable(),
                         self);
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during abort");
                 sender
@@ -110,12 +107,14 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
             public void onSuccess(Void v) {
                 sender
                     .tell(new PreCommitTransactionReply().toSerializable(),
                         self);
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during pre-commit");
                 sender
@@ -130,18 +129,36 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         Futures.addCallback(future, new FutureCallback<Boolean>() {
+            @Override
             public void onSuccess(Boolean canCommit) {
                 sender.tell(new CanCommitTransactionReply(canCommit)
                     .toSerializable(), self);
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during canCommit");
                 sender
                     .tell(new akka.actor.Status.Failure(t), self);
             }
         });
+    }
+
+    private static class ThreePhaseCommitCohortCreator implements Creator<ThreePhaseCommitCohort> {
+        final DOMStoreThreePhaseCommitCohort cohort;
+        final ActorRef shardActor;
+        final CompositeModification modification;
 
+        ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
+                ActorRef shardActor, CompositeModification modification) {
+            this.cohort = cohort;
+            this.shardActor = shardActor;
+            this.modification = modification;
+        }
 
+        @Override
+        public ThreePhaseCommitCohort create() throws Exception {
+            return new ThreePhaseCommitCohort(cohort, shardActor, modification);
+        }
     }
 }
index 76bbef713c350ad975c9dbb256bdef83bd1e4915..9b4610a99c4a6e5977115f560d12b551131e9be5 100644 (file)
@@ -13,36 +13,33 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
-    private final SchemaContext schemaContext;
 
-    public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) {
+    public TransactionChainProxy(ActorContext actorContext) {
         this.actorContext = actorContext;
-        this.schemaContext = schemaContext;
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+            TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE, schemaContext);
+            TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY, schemaContext);
+            TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
index 2e7b2feb85bd97c7b80d09b8156f4b44c6fb343e..f447f3c718980285d32e79c5a9b2579e26309971 100644 (file)
@@ -13,6 +13,8 @@ import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -52,6 +54,8 @@ import scala.runtime.AbstractFunction1;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -87,18 +91,98 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         LOG = LoggerFactory.getLogger(TransactionProxy.class);
 
 
+    /**
+     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+     * trickery to clean up its internal thread when the bundle is unloaded.
+     */
+    private static final FinalizableReferenceQueue phantomReferenceQueue =
+                                                                  new FinalizableReferenceQueue();
+
+    /**
+     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+     * and thus becomes eligible for garbage collection.
+     */
+    private static final Map<TransactionProxyCleanupPhantomReference,
+                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
+                                                                        new ConcurrentHashMap<>();
+
+    /**
+     * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+     * garbage collected. This is used for read-only transactions as they're not explicitly closed
+     * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+     * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+     * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+     * to the old generation space and thus should be cleaned up in a timely manner as the GC
+     * runs on the young generation (eden, swap1...) space much more frequently.
+     */
+    private static class TransactionProxyCleanupPhantomReference
+                                           extends FinalizablePhantomReference<TransactionProxy> {
+
+        private final List<ActorSelection> remoteTransactionActors;
+        private final AtomicBoolean remoteTransactionActorsMB;
+        private final ActorContext actorContext;
+        private final TransactionIdentifier identifier;
+
+        protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+            super(referent, phantomReferenceQueue);
+
+            // Note we need to cache the relevant fields from the TransactionProxy as we can't
+            // have a hard reference to the TransactionProxy instance itself.
+
+            remoteTransactionActors = referent.remoteTransactionActors;
+            remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+            actorContext = referent.actorContext;
+            identifier = referent.identifier;
+        }
+
+        @Override
+        public void finalizeReferent() {
+            LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+                    remoteTransactionActors.size(), identifier);
+
+            phantomReferenceCache.remove(this);
+
+            // Access the memory barrier volatile to ensure all previous updates to the
+            // remoteTransactionActors list are visible to this thread.
+
+            if(remoteTransactionActorsMB.get()) {
+                for(ActorSelection actor : remoteTransactionActors) {
+                    LOG.trace("Sending CloseTransaction to {}", actor);
+                    actorContext.sendRemoteOperationAsync(actor,
+                            new CloseTransaction().toSerializable());
+                }
+            }
+        }
+    }
+
+    /**
+     * Stores the remote Tx actors for each requested data store path to be used by the
+     * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
+     * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
+     * remoteTransactionActors list so they will be visible to the thread accessing the
+     * PhantomReference.
+     */
+    private List<ActorSelection> remoteTransactionActors;
+    private AtomicBoolean remoteTransactionActorsMB;
+
+    private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
+
     private final TransactionType transactionType;
     private final ActorContext actorContext;
-    private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
     private final TransactionIdentifier identifier;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
 
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            SchemaContext schemaContext) {
-        this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
-        this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
-        this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+        this.actorContext = Preconditions.checkNotNull(actorContext,
+                "actorContext should not be null");
+        this.transactionType = Preconditions.checkNotNull(transactionType,
+                "transactionType should not be null");
+        this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
+                "schemaContext should not be null");
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -108,8 +192,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
                 counter.getAndIncrement()).build();
 
-        LOG.debug("Created txn {} of type {}", identifier, transactionType);
+        if(transactionType == TransactionType.READ_ONLY) {
+            // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+            // to close the remote Tx's when this instance is no longer in use and is garbage
+            // collected.
 
+            remoteTransactionActors = Lists.newArrayList();
+            remoteTransactionActorsMB = new AtomicBoolean();
+
+            TransactionProxyCleanupPhantomReference cleanup =
+                                              new TransactionProxyCleanupPhantomReference(this);
+            phantomReferenceCache.put(cleanup, cleanup);
+        }
+
+        LOG.debug("Created txn {} of type {}", identifier, transactionType);
     }
 
     @VisibleForTesting
@@ -226,6 +322,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
             transactionContext.closeTransaction();
         }
+
+        remoteTransactionPaths.clear();
+
+        if(transactionType == TransactionType.READ_ONLY) {
+            remoteTransactionActors.clear();
+            remoteTransactionActorsMB.set(true);
+        }
     }
 
     private TransactionContext transactionContext(YangInstanceIdentifier path){
@@ -260,11 +363,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
 
-                ActorSelection transactionActor =
-                    actorContext.actorSelection(transactionPath);
-                transactionContext =
-                    new TransactionContextImpl(shardName, transactionPath,
-                        transactionActor);
+                ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+
+                if(transactionType == TransactionType.READ_ONLY) {
+                    // Add the actor to the remoteTransactionActors list for access by the
+                    // cleanup PhantonReference.
+                    remoteTransactionActors.add(transactionActor);
+
+                    // Write to the memory barrier volatile to publish the above update to the
+                    // remoteTransactionActors list for thread visibility.
+                    remoteTransactionActorsMB.set(true);
+                }
+
+                transactionContext = new TransactionContextImpl(shardName, transactionPath,
+                        transactionActor, identifier, actorContext, schemaContext);
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
@@ -273,7 +385,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         } catch(Exception e){
             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
-            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
+            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
         }
     }
 
@@ -298,13 +410,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         List<Future<Object>> getRecordedOperationFutures();
     }
 
-    private abstract class AbstractTransactionContext implements TransactionContext {
+    private static abstract class AbstractTransactionContext implements TransactionContext {
 
+        protected final TransactionIdentifier identifier;
         protected final String shardName;
         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
 
-        AbstractTransactionContext(String shardName) {
+        AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
             this.shardName = shardName;
+            this.identifier = identifier;
         }
 
         @Override
@@ -318,17 +432,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
     }
 
-    private class TransactionContextImpl extends AbstractTransactionContext {
+    private static class TransactionContextImpl extends AbstractTransactionContext {
         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
+        private final ActorContext actorContext;
+        private final SchemaContext schemaContext;
         private final String actorPath;
         private final ActorSelection actor;
 
         private TransactionContextImpl(String shardName, String actorPath,
-            ActorSelection actor) {
-            super(shardName);
+                ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
+                SchemaContext schemaContext) {
+            super(shardName, identifier);
             this.actorPath = actorPath;
             this.actor = actor;
+            this.actorContext = actorContext;
+            this.schemaContext = schemaContext;
         }
 
         private ActorSelection getActor() {
@@ -600,14 +719,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
     }
 
-    private class NoOpTransactionContext extends AbstractTransactionContext {
+    private static class NoOpTransactionContext extends AbstractTransactionContext {
 
         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
         private final Exception failure;
 
-        public NoOpTransactionContext(String shardName, Exception failure){
-            super(shardName);
+        public NoOpTransactionContext(String shardName, Exception failure,
+                TransactionIdentifier identifier){
+            super(shardName, identifier);
             this.failure = failure;
         }
 
index e12a9663d1fca8a969c25166f38986ea19eab51a..f76430f5a1cb1cdb9e84f3b92f85c923a7111967 100644 (file)
@@ -23,6 +23,8 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,7 @@ public class ActorContext {
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
+    private volatile SchemaContext schemaContext;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
@@ -80,6 +83,17 @@ public class ActorContext {
         return actorSystem.actorSelection(actorPath);
     }
 
+    public void setSchemaContext(SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
+
+        if(shardManager != null) {
+            shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+        }
+    }
+
+    public SchemaContext getSchemaContext() {
+        return schemaContext;
+    }
 
     /**
      * Finds the primary for a given shard
index ce31c3ad5738edc81db36f8666c67dc309cbdfb7..f5a0d3783ab011728ab0688db60b404a9c53719e 100644 (file)
@@ -1,7 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
 
 public class DistributedConfigDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
@@ -26,10 +26,16 @@ public class DistributedConfigDataStoreProviderModule extends
 
     @Override
     public java.lang.AutoCloseable createInstance() {
+
+        ConfigProperties props = getConfigProperties();
+        if(props == null) {
+            props = new ConfigProperties();
+        }
+
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
-                InMemoryDOMDataStoreConfigProperties.create(getConfigMaxShardDataChangeExecutorPoolSize(),
-                        getConfigMaxShardDataChangeExecutorQueueSize(),
-                        getConfigMaxShardDataChangeListenerQueueSize()));
+                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
+                        props.getMaxShardDataChangeExecutorQueueSize(),
+                        props.getMaxShardDataChangeListenerQueueSize(),
+                        props.getShardTransactionIdleTimeoutInMinutes()));
     }
-
 }
index 4d5b07420f7cda5d0a0c1d74740026cde5e74eab..443334d11f65378871da7a6f2ae97097d4c2eb87 100644 (file)
@@ -1,7 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
 
 public class DistributedOperationalDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
@@ -26,11 +26,18 @@ public class DistributedOperationalDataStoreProviderModule extends
 
     @Override
     public java.lang.AutoCloseable createInstance() {
+
+        OperationalProperties props = getOperationalProperties();
+        if(props == null) {
+            props = new OperationalProperties();
+        }
+
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(),
-                InMemoryDOMDataStoreConfigProperties.create(getOperationalMaxShardDataChangeExecutorPoolSize(),
-                        getOperationalMaxShardDataChangeExecutorQueueSize(),
-                        getOperationalMaxShardDataChangeListenerQueueSize()));
+                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
+                        props.getMaxShardDataChangeExecutorQueueSize(),
+                        props.getMaxShardDataChangeListenerQueueSize(),
+                        props.getShardTransactionIdleTimeoutInMinutes()));
     }
 
 }
index 6bca5ce25cefe5dbcea73279fa5395f359bb8916..a9a8a1ad986c65835018f80773707561473934b8 100644 (file)
@@ -36,36 +36,48 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
+    grouping data-store-properties {
+        leaf max-shard-data-change-executor-queue-size {
+            default 1000;
+            type uint16;
+            description "The maximum queue size for each shard's data store data change notification executor.";
+         }
+
+         leaf max-shard-data-change-executor-pool-size {
+            default 20;
+            type uint16;
+            description "The maximum thread pool size for each shard's data store data change notification executor.";
+         }
+
+         leaf max-shard-data-change-listener-queue-size {
+            default 1000;
+            type uint16;
+            description "The maximum queue size for each shard's data store data change listeners.";
+         }
+         
+         leaf shard-transaction-idle-timeout-in-minutes {
+            default 10;
+            type uint16;
+            description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
+         }
+    }
+    
     // Augments the 'configuration' choice node under modules/module.
     augment "/config:modules/config:module/config:configuration" {
         case distributed-config-datastore-provider {
             when "/config:modules/config:module/config:type = 'distributed-config-datastore-provider'";
-            container config-schema-service {
-                uses config:service-ref {
-                    refine type {
-                        mandatory false;
-                        config:required-identity sal:schema-service;
+                container config-schema-service {
+                    uses config:service-ref {
+                        refine type {
+                            mandatory false;
+                            config:required-identity sal:schema-service;
+                        }
                     }
                 }
-            }
 
-            leaf config-max-shard-data-change-executor-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change notification executor.";
-            }
-
-            leaf config-max-shard-data-change-executor-pool-size {
-                default 20;
-                type uint16;
-                description "The maximum thread pool size for each shard's data store data change notification executor.";
-            }
-
-            leaf config-max-shard-data-change-listener-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change listeners.";
-            }
+                container config-properties {
+                    uses data-store-properties;
+                }
         }
     }
 
@@ -82,23 +94,9 @@ module distributed-datastore-provider {
                     }
                 }
 
-            leaf operational-max-shard-data-change-executor-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change notification executor.";
-            }
-
-            leaf operational-max-shard-data-change-executor-pool-size {
-                default 20;
-                type uint16;
-                description "The maximum thread pool size for each shard's data store data change notification executor.";
-            }
-
-            leaf operational-max-shard-data-change-listener-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change listeners.";
-            }
-            }
+                container operational-properties {
+                    uses data-store-properties;
+                }
         }
+    }
 }
index 036b00a4c94bd46e77c480c32a16b7ad0b71b3f6..e70453f2d629857dd92ef24b919395af91c9b742 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
+
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -30,6 +31,8 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -58,22 +61,23 @@ public class BasicIntegrationTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final SchemaContext schemaContext = TestModel.createTestContext();
+            ShardContext shardContext = new ShardContext();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef shard = getSystem().actorOf(props);
 
-            new Within(duration("5 seconds")) {
+            new Within(duration("10 seconds")) {
+                @Override
                 protected void run() {
-
-
-                    shard.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+                    shard.tell(new UpdateSchemaContext(schemaContext), getRef());
 
 
                     // Wait for a specific log message to show up
                     final boolean result =
                         new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                         ) {
+                            @Override
                             protected Boolean run() {
                                 return true;
                             }
@@ -87,7 +91,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     shard.tell(new CreateTransactionChain().toSerializable(), getRef());
 
                     final ActorSelection transactionChain =
-                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionChainReply") {
+                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionChainReply") {
+                            @Override
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
                                     ActorPath transactionChainPath =
@@ -109,7 +114,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
 
                     final ActorSelection transaction =
-                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
+                            @Override
                             protected ActorSelection match(Object in) {
                                 if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
                                     CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
@@ -128,10 +134,11 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     // 3. Write some data
                     transaction.tell(new WriteData(TestModel.TEST_PATH,
-                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext).toSerializable(),
                         getRef());
 
-                    Boolean writeDone = new ExpectMsg<Boolean>(duration("1 seconds"), "WriteDataReply") {
+                    Boolean writeDone = new ExpectMsg<Boolean>(duration("3 seconds"), "WriteDataReply") {
+                        @Override
                         protected Boolean match(Object in) {
                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
                                 return true;
@@ -150,7 +157,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
                     final ActorSelection cohort =
-                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "ReadyTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "ReadyTransactionReply") {
+                            @Override
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                                     ActorPath cohortPath =
@@ -173,7 +181,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
 
                     Boolean preCommitDone =
-                        new ExpectMsg<Boolean>(duration("1 seconds"), "PreCommitTransactionReply") {
+                        new ExpectMsg<Boolean>(duration("3 seconds"), "PreCommitTransactionReply") {
+                            @Override
                             protected Boolean match(Object in) {
                                 if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
                                     return true;
index b2ee4a49fee1b40ee86730da0044f5f987b090a9..e653c3d3717351182a6a57cf570c4a6bf6449500 100644 (file)
@@ -3,7 +3,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import junit.framework.Assert;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -47,7 +47,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
 
     @Override
     public Set<YangInstanceIdentifier> getRemovedPaths() {
-      Set<YangInstanceIdentifier>ids = new HashSet();
+      Set<YangInstanceIdentifier>ids = new HashSet<>();
       ids.add( CompositeModel.TEST_PATH);
       return ids;
     }
@@ -73,9 +73,8 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         final Props props = Props.create(MessageCollectorActor.class);
         final ActorRef actorRef = getSystem().actorOf(props);
 
-        DataChangeListenerProxy dataChangeListenerProxy =
-            new DataChangeListenerProxy(TestModel.createTestContext(),
-                getSystem().actorSelection(actorRef.path()));
+        DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
+                TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
 
         dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
 
index 26ec583b3e058108f9cc18517f5f6909847f5cd6..b39cc84ceff6943d045c7a609afcbb996e212b31 100644 (file)
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue;
 public class DataChangeListenerTest extends AbstractActorTest {
 
     private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap();
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap();
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap<>();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap<>();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap<>();
 
 
 
@@ -90,11 +90,12 @@ public class DataChangeListenerTest extends AbstractActorTest {
     public void testDataChangedWhenNotificationsAreEnabled(){
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
-            final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+            final Props props = DataChangeListener.props(listener);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     // Let the DataChangeListener know that notifications should
@@ -107,6 +108,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in != null && in.getClass().equals(DataChangedReply.class)) {
 
@@ -123,8 +125,6 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
@@ -133,11 +133,12 @@ public class DataChangeListenerTest extends AbstractActorTest {
     public void testDataChangedWhenNotificationsAreDisabled(){
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
-            final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+            final Props props = DataChangeListener.props(listener);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -146,8 +147,6 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
index 49408b741019c98debbe70b37857d871d0f80b8b..21aa00e9e0b98c60b2d117fa67f36864e0c7700a 100644 (file)
@@ -3,9 +3,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import junit.framework.Assert;
+
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -69,10 +72,13 @@ public class DistributedDataStoreIntegrationTest {
             {
 
                 new Within(duration("10 seconds")) {
+                    @Override
                     protected void run() {
                         try {
                             final DistributedDataStore distributedDataStore =
-                                new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration, null);
+                                new DistributedDataStore(getSystem(), "config",
+                                        new MockClusterWrapper(), configuration,
+                                        new DistributedDataStoreProperties());
 
                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
@@ -80,6 +86,7 @@ public class DistributedDataStoreIntegrationTest {
                             final boolean result =
                                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                                     ) {
+                                    @Override
                                     protected Boolean run() {
                                         return true;
                                     }
@@ -150,6 +157,7 @@ public class DistributedDataStoreIntegrationTest {
             {
 
                 new Within(duration("10 seconds")) {
+                    @Override
                     protected void run() {
                         try {
                             final DistributedDataStore distributedDataStore =
@@ -164,6 +172,7 @@ public class DistributedDataStoreIntegrationTest {
                                 new JavaTestKit.EventFilter<Boolean>(
                                     Logging.Info.class
                                 ) {
+                                    @Override
                                     protected Boolean run() {
                                         return true;
                                     }
index 69590e62fb1b5886fc0b5fb1223706d02c1706cd..cb473cb9360e03656ea83d0212ef18b70237a92e 100644 (file)
@@ -3,6 +3,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,12 +64,14 @@ public class DistributedDataStoreTest extends AbstractActorTest{
 
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testConstructor(){
         ActorSystem actorSystem = mock(ActorSystem.class);
 
         new DistributedDataStore(actorSystem, "config",
-            mock(ClusterWrapper.class), mock(Configuration.class), null);
+            mock(ClusterWrapper.class), mock(Configuration.class),
+            new DistributedDataStoreProperties());
 
         verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
     }
index 499b4e1f3111d097cc0696aa7cd23367fc4c591d..e6f68c032a325f59bea291e2f17e129fd47da766 100644 (file)
@@ -42,11 +42,12 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
@@ -66,11 +67,12 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
@@ -89,16 +91,18 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindLocalShard("inventory"), getRef());
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+                        @Override
                         protected String match(Object in) {
                             if (in instanceof LocalShardNotFound) {
                                 return ((LocalShardNotFound) in).getShardName();
@@ -124,16 +128,18 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", mockClusterWrapper,
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
 
                     final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+                        @Override
                         protected ActorRef match(Object in) {
                             if (in instanceof LocalShardFound) {
                                 return ((LocalShardFound) in).getPath();
@@ -158,12 +164,13 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             // the run() method needs to finish within 3 seconds
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
@@ -172,6 +179,7 @@ public class ShardManagerTest {
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
                                 PrimaryFound f = PrimaryFound.fromSerializable(in);
@@ -196,12 +204,13 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             // the run() method needs to finish within 3 seconds
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
index 7740b8e667a974c10b53d0e436edb41455e40cf5..fc45efcdea854bea791c44b12a4437c2c74bc2f1 100644 (file)
@@ -4,7 +4,8 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -28,11 +29,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ShardTest extends AbstractActorTest {
+
+    private static final ShardContext shardContext = new ShardContext();
+
     @Test
     public void testOnReceiveCreateTransactionChain() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -40,7 +44,7 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransactionChain");
 
@@ -49,6 +53,7 @@ public class ShardTest extends AbstractActorTest {
             final boolean result =
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -58,13 +63,15 @@ public class ShardTest extends AbstractActorTest {
 
             Assert.assertEquals(true, result);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new CreateTransactionChain().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
                                 CreateTransactionChainReply reply =
@@ -96,11 +103,12 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testRegisterChangeListener");
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -111,8 +119,10 @@ public class ShardTest extends AbstractActorTest {
                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
                         getRef());
 
-                    final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
+                    final Boolean notificationEnabled = new ExpectMsg<Boolean>(
+                                                   duration("3 seconds"), "enable notification") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if(in instanceof EnableNotification){
                                 return ((EnableNotification) in).isEnabled();
@@ -124,8 +134,9 @@ public class ShardTest extends AbstractActorTest {
 
                     assertFalse(notificationEnabled);
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(RegisterChangeListenerReply.class)) {
                                 RegisterChangeListenerReply reply =
@@ -154,15 +165,15 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransaction");
 
-
             // Wait for a specific log message to show up
             final boolean result =
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -172,7 +183,8 @@ public class ShardTest extends AbstractActorTest {
 
             Assert.assertEquals(true, result);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -182,8 +194,9 @@ public class ShardTest extends AbstractActorTest {
                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in instanceof CreateTransactionReply) {
                                 CreateTransactionReply reply =
@@ -200,8 +213,6 @@ public class ShardTest extends AbstractActorTest {
                         out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
@@ -216,11 +227,12 @@ public class ShardTest extends AbstractActorTest {
                     .shardName("inventory").type("config").build();
 
             peerAddresses.put(identifier, null);
-            final Props props = Shard.props(identifier, peerAddresses, null);
+            final Props props = Shard.props(identifier, peerAddresses, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testPeerAddressResolved");
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -229,8 +241,6 @@ public class ShardTest extends AbstractActorTest {
 
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
index d468af6664981d08ad603b1a841fefbdaccc8d47..0262b8c1457a171c262f5da83dc755d9bcbb174d 100644 (file)
@@ -3,8 +3,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
@@ -12,87 +15,96 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import static org.junit.Assert.assertEquals;
 
 public class ShardTransactionChainTest extends AbstractActorTest {
 
-  private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
-  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
-          MoreExecutors.sameThreadExecutor());
-
-  static {
-    store.onGlobalContextUpdated(TestModel.createTestContext());
-  }
-  @Test
-  public void testOnReceiveCreateTransaction() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransactionChain.props(store.createTransactionChain(), TestModel.createTestContext());
-      final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
-
-     new Within(duration("1 seconds")) {
-        @Override
-        protected void run() {
-
-          subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
-
-          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-            // do not put code outside this method, will run afterwards
-            @Override
-            protected String match(Object in) {
-              if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                return CreateTransactionReply.fromSerializable(in).getTransactionPath();
-              }else{
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertEquals("Unexpected transaction path " + out,
-              "akka://test/user/testCreateTransaction/shard-txn-1",
-              out);
-
-          // Will wait for the rest of the 3 seconds
-          expectNoMsg();
-        }
-
-
-      };
-    }};
-  }
-
-  @Test
-  public void testOnReceiveCloseTransactionChain() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransactionChain.props(store.createTransactionChain(), TestModel.createTestContext());
-      final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
-
-      new Within(duration("1 seconds")) {
-        @Override
-        protected void run() {
-
-          subject.tell(new CloseTransactionChain().toSerializable(), getRef());
-
-          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-            // do not put code outside this method, will run afterwards
-            @Override
-            protected String match(Object in) {
-              if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
-                return "match";
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertEquals("match", out);
-          // Will wait for the rest of the 3 seconds
-          expectNoMsg();
-        }
-
-
-      };
-    }};
-  }
+    private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+    private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+            MoreExecutors.sameThreadExecutor());
+
+    private static final SchemaContext testSchemaContext = TestModel.createTestContext();
+
+    private static final ShardContext shardContext = new ShardContext();
+
+    @BeforeClass
+    public static void staticSetup() {
+        store.onGlobalContextUpdated(testSchemaContext);
+    }
+
+    @Test
+    public void testOnReceiveCreateTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final Props props = ShardTransactionChain.props(store.createTransactionChain(),
+                    testSchemaContext, shardContext);
+            final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
+
+            new Within(duration("1 seconds")) {
+                @Override
+                protected void run() {
+
+                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                        // do not put code outside this method, will run afterwards
+                        @Override
+                        protected String match(Object in) {
+                            if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                                return CreateTransactionReply.fromSerializable(in).getTransactionPath();
+                            }else{
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("Unexpected transaction path " + out,
+                            "akka://test/user/testCreateTransaction/shard-txn-1",
+                            out);
+
+                    // Will wait for the rest of the 3 seconds
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveCloseTransactionChain() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final Props props = ShardTransactionChain.props(store.createTransactionChain(),
+                    testSchemaContext, shardContext);
+            final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
+
+            new Within(duration("1 seconds")) {
+                @Override
+                protected void run() {
+
+                    subject.tell(new CloseTransactionChain().toSerializable(), getRef());
+
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                        // do not put code outside this method, will run afterwards
+                        @Override
+                        protected String match(Object in) {
+                            if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+                    // Will wait for the rest of the 3 seconds
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
 }
index 16b73040a5b6e5375a24570fb5b1617240eadb04..127f9f55f89f0bedf78e2f748cd116de3819066a 100644 (file)
@@ -13,8 +13,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -23,13 +26,13 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
 import java.util.Collections;
-
-import static org.junit.Assert.assertTrue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Covers negative test cases
@@ -51,20 +54,21 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("operational").build();
 
-    static {
+    private final ShardContext shardContext = new ShardContext();
+
+    @BeforeClass
+    public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
-
     @Test(expected = ReadFailedException.class)
     public void testNegativeReadWithReadOnlyTransactionClosed()
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-        final Props props =
-            ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -78,32 +82,27 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 ).build();
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        ((ShardReadTransaction) subject.underlyingActor())
-            .forUnitTestOnlyExplicitTransactionClose();
+        subject.underlyingActor().getDOMStoreTransaction().close();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
 
     @Test(expected = ReadFailedException.class)
-    public void testNegativeReadWithReadWriteOnlyTransactionClosed()
+    public void testNegativeReadWithReadWriteTransactionClosed()
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
-                "testNegativeReadWithReadWriteOnlyTransactionClosed");
+                "testNegativeReadWithReadWriteTransactionClosed");
 
         ShardTransactionMessages.ReadData readData =
             ShardTransactionMessages.ReadData.newBuilder()
@@ -111,33 +110,29 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
                         .build()
                 ).build();
+
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        ((ShardReadWriteTransaction) subject.underlyingActor())
-            .forUnitTestOnlyExplicitTransactionClose();
+        subject.underlyingActor().getDOMStoreTransaction().close();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
     @Test(expected = ReadFailedException.class)
-    public void testNegativeExistsWithReadWriteOnlyTransactionClosed()
+    public void testNegativeExistsWithReadWriteTransactionClosed()
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
-                "testNegativeExistsWithReadWriteOnlyTransactionClosed");
+                "testNegativeExistsWithReadWriteTransactionClosed");
 
         ShardTransactionMessages.DataExists dataExists =
             ShardTransactionMessages.DataExists.newBuilder()
@@ -148,16 +143,12 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, dataExists, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        ((ShardReadWriteTransaction) subject.underlyingActor())
-            .forUnitTestOnlyExplicitTransactionClose();
+        subject.underlyingActor().getDOMStoreTransaction().close();
 
         future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -165,10 +156,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -179,8 +169,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
@@ -192,22 +181,17 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             ).build();
 
         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
-
     @Test(expected = IllegalStateException.class)
     public void testNegativeReadWriteWithTransactionReady() throws Exception {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -218,8 +202,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
@@ -231,10 +214,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             ).build();
 
         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -242,10 +222,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -255,8 +234,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.MergeData mergeData =
             ShardTransactionMessages.MergeData.newBuilder()
@@ -268,10 +246,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             ).build();
 
         future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
 
@@ -280,10 +255,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -294,8 +268,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.DeleteData deleteData =
             ShardTransactionMessages.DeleteData.newBuilder()
@@ -304,9 +277,6 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                         .build()).build();
 
         future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 }
index 8f5d0c28d603c39a008f9638ece377aacdf6f1f8..1dd824568a81820d21f50fd820d47eeef68c73e9 100644 (file)
@@ -5,9 +5,11 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Assert;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -32,11 +34,15 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import scala.concurrent.duration.Duration;
+
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -54,17 +60,20 @@ public class ShardTransactionTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("config").build();
 
+    private ShardContext shardContext = new ShardContext();
 
-    static {
+    @BeforeClass
+    public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testReadData");
 
             new Within(duration("1 seconds")) {
@@ -104,9 +113,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
 
             new Within(duration("1 seconds")) {
@@ -147,9 +157,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
 
             new Within(duration("1 seconds")) {
@@ -190,9 +201,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveDataExistsNegative() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
 
             new Within(duration("1 seconds")) {
@@ -267,9 +278,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveWriteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testWriteData");
 
@@ -307,9 +319,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testMergeData");
 
@@ -348,9 +361,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDeleteData");
 
@@ -387,9 +401,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testReadyTransaction");
 
@@ -425,24 +440,26 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCloseTransaction");
 
             watch(subject);
 
-            new Within(duration("2 seconds")) {
+            new Within(duration("6 seconds")) {
                 @Override
                 protected void run() {
 
                     subject.tell(new CloseTransaction().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         @Override
                         protected String match(Object in) {
+                            System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
                                 return "match";
                             } else {
@@ -453,10 +470,11 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     assertEquals("match", out);
 
-                    final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         @Override
                         protected String match(Object in) {
+                            System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
                             if (in instanceof Terminated) {
                                 return "match";
                             } else {
@@ -465,33 +483,54 @@ public class ShardTransactionTest extends AbstractActorTest {
                         }
                     }.get(); // this extracts the received message
 
-
-                    expectNoMsg();
+                    assertEquals("match", termination);
                 }
-
-
             };
         }};
+    }
 
+    @Test(expected=UnknownMessageException.class)
+    public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                testSchemaContext, shardContext);
+        final TestActorRef subject = TestActorRef.apply(props,getSystem());
+
+        subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
     }
 
+    @Test
+    public void testShardTransactionInactivity() {
+
+        shardContext = new ShardContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
+                Duration.create(500, TimeUnit.MILLISECONDS));
 
-  @Test
-  public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
-    try {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testShardTransactionInactivity");
 
-        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-        final Props props =
-            ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext());
-         final TestActorRef subject = TestActorRef.apply(props,getSystem());
+            watch(subject);
 
-        subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
-        Assert.assertFalse(true);
+            // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
 
+            final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
+                // do not put code outside this method, will run afterwards
+                @Override
+                protected String match(Object in) {
+                    if (in instanceof Terminated) {
+                        return "match";
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get(); // this extracts the received message
 
-    } catch (Exception cs) {
-      assertEquals(UnknownMessageException.class.getSimpleName(), cs.getClass().getSimpleName());
-      assertTrue(cs.getMessage(), cs.getMessage().startsWith("Unknown message received "));
+            assertEquals("match", termination);
+        }};
     }
-  }
 }
index 34697977a5a4639e4b507dde478302834eae5ad5..672166c4424c7ea0a8fd5c7ddd2e07b057e572b2 100644 (file)
@@ -13,9 +13,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -31,6 +34,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -59,19 +63,21 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("config").build();
 
-    static {
+    private final ShardContext shardContext = new ShardContext();
+
+    @BeforeClass
+    public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
-    private FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
+    private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
 
 
     @Test(expected = TestException.class)
     public void testNegativeAbortResultsInException() throws Exception {
 
-        final ActorRef shard =
-            getSystem()
-                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, shardContext));
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
@@ -93,18 +99,14 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         assertTrue(future.isCompleted());
 
         Await.result(future, ASK_RESULT_DURATION);
-
-
-
     }
 
 
     @Test(expected = OptimisticLockFailedException.class)
     public void testNegativeCanCommitResultsInException() throws Exception {
 
-        final ActorRef shard =
-            getSystem()
-                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, shardContext));
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
@@ -135,9 +137,8 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testNegativePreCommitResultsInException() throws Exception {
 
-        final ActorRef shard =
-            getSystem()
-                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, shardContext));
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
@@ -166,15 +167,13 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testNegativeCommitResultsInException() throws Exception {
 
-        final TestActorRef<Shard> subject = TestActorRef
-            .create(getSystem(),
-                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null),
+        final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
+                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, shardContext),
                 "testNegativeCommitResultsInException");
 
         final ActorRef shardTransaction =
-            getSystem().actorOf(
-                ShardTransaction.props(store.newReadWriteTransaction(), subject,
-                    TestModel.createTestContext()));
+            getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
+                    testSchemaContext, shardContext));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
@@ -221,12 +220,8 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
                 mockForwardCommitTransaction
                 , 3000);
         Await.result(future, ASK_RESULT_DURATION);
-
-
     }
 
     private class TestException extends Exception {
     }
-
-
 }
index 9b7039764f57ba56eb9469354446cad35a6b7537..93145bdd6d86360070dce5102dbd968c6ebc0629 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.mockito.Mockito.doReturn;
+
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -23,30 +26,39 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class TransactionChainProxyTest {
     ActorContext actorContext = Mockito.mock(ActorContext.class);
     SchemaContext schemaContext = Mockito.mock(SchemaContext.class);
+
+    @Before
+    public void setUp() {
+        doReturn(schemaContext).when(actorContext).getSchemaContext();
+    }
+
+    @SuppressWarnings("resource")
     @Test
     public void testNewReadOnlyTransaction() throws Exception {
 
-     DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newReadOnlyTransaction();
+     DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction();
          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
 
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testNewReadWriteTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newReadWriteTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction();
         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
 
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testNewWriteOnlyTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newWriteOnlyTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction();
         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
 
     }
 
     @Test(expected=UnsupportedOperationException.class)
     public void testClose() throws Exception {
-        new TransactionChainProxy(actorContext, schemaContext).close();
+        new TransactionChainProxy(actorContext).close();
     }
 }
index 6b11a24e9cedbb3a8234fb6e337f7750a413721f..f69ae88ec873ed044ab40c36a63e6f0b68b9db67 100644 (file)
@@ -99,6 +99,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -255,7 +256,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
@@ -285,7 +286,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -298,7 +299,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
@@ -310,7 +311,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 anyString(), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
     }
@@ -357,7 +358,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -384,7 +385,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
 
@@ -400,7 +401,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReadPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH);
     }
@@ -410,7 +411,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
@@ -445,7 +446,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -458,7 +459,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
@@ -480,7 +481,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -507,7 +508,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -520,7 +521,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testxistsPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.exists(TestModel.TEST_PATH);
     }
@@ -558,7 +559,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -573,7 +574,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testWritePreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -583,7 +584,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testWriteAfterReadyPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.ready();
 
@@ -601,7 +602,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -620,7 +621,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -673,7 +674,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -709,7 +710,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -742,7 +743,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -765,7 +766,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 anyString(), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -799,7 +800,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -816,7 +817,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testGetIdentifier() {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+                TransactionProxy.TransactionType.READ_ONLY);
 
         Object id = transactionProxy.getIdentifier();
         assertNotNull("getIdentifier returned null", id);
@@ -832,7 +833,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);