Merge "Bug 1598: Cleanup stale ShardReadTransactions"
authorMoiz Raja <moraja@cisco.com>
Fri, 29 Aug 2014 05:04:40 +0000 (05:04 +0000)
committerGerrit Code Review <gerrit@opendaylight.org>
Fri, 29 Aug 2014 05:04:40 +0000 (05:04 +0000)
34 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardContext.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardReadWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardWriteTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohort.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/utils/ActorContext.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedConfigDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/config/yang/config/distributed_datastore_provider/DistributedOperationalDataStoreProviderModule.java
opendaylight/md-sal/sal-distributed-datastore/src/main/yang/distributed-datastore-provider.yang
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardManagerTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortFailureTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index 1dab285679474378b47c1c0a1c488ebf89d66fea..b3283a18b1baaf4c3c7530570b8ae8a09512211f 100644 (file)
@@ -10,7 +10,9 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.messages.DataChangedReply;
 import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
@@ -18,20 +20,14 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 public class DataChangeListener extends AbstractUntypedActor {
     private final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
-    private final SchemaContext schemaContext;
-    private final YangInstanceIdentifier pathId;
-    private boolean notificationsEnabled = false;
+    private volatile boolean notificationsEnabled = false;
 
-    public DataChangeListener(SchemaContext schemaContext,
-                              AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, YangInstanceIdentifier pathId) {
-
-        this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+    public DataChangeListener(AsyncDataChangeListener<YangInstanceIdentifier,
+                                                      NormalizedNode<?, ?>> listener) {
         this.listener = Preconditions.checkNotNull(listener, "listener should not be null");
-        this.pathId  = Preconditions.checkNotNull(pathId, "pathId should not be null");
     }
 
     @Override public void handleReceive(Object message) throws Exception {
@@ -63,14 +59,24 @@ public class DataChangeListener extends AbstractUntypedActor {
         }
     }
 
-    public static Props props(final SchemaContext schemaContext, final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener, final YangInstanceIdentifier pathId) {
-        return Props.create(new Creator<DataChangeListener>() {
-            @Override
-            public DataChangeListener create() throws Exception {
-                return new DataChangeListener(schemaContext,listener,pathId );
-            }
+    public static Props props(final AsyncDataChangeListener<YangInstanceIdentifier,
+                                                            NormalizedNode<?, ?>> listener) {
+        return Props.create(new DataChangeListenerCreator(listener));
+    }
+
+    private static class DataChangeListenerCreator implements Creator<DataChangeListener> {
+        private static final long serialVersionUID = 1L;
+
+        final AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener;
 
-        });
+        DataChangeListenerCreator(
+                AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener) {
+            this.listener = listener;
+        }
 
+        @Override
+        public DataChangeListener create() throws Exception {
+            return new DataChangeListener(listener);
+        }
     }
 }
index 6d835498afa2af24ec915e3b010e67710f685d51..a1b6b9252eb452082f101fe1bb748fb2d6b28536 100644 (file)
@@ -9,7 +9,9 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSelection;
+
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -24,13 +26,13 @@ public class DataChangeListenerProxy implements AsyncDataChangeListener<YangInst
     private final ActorSelection dataChangeListenerActor;
     private final SchemaContext schemaContext;
 
-    public DataChangeListenerProxy(SchemaContext schemaContext,ActorSelection dataChangeListenerActor) {
+    public DataChangeListenerProxy(SchemaContext schemaContext, ActorSelection dataChangeListenerActor) {
         this.dataChangeListenerActor = Preconditions.checkNotNull(dataChangeListenerActor, "dataChangeListenerActor should not be null");
         this.schemaContext = schemaContext;
     }
 
     @Override public void onDataChanged(
         AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> change) {
-        dataChangeListenerActor.tell(new DataChanged(schemaContext,change), null);
+        dataChangeListenerActor.tell(new DataChanged(schemaContext, change), null);
     }
 }
index 9e50b5b332babf4bcc033aeea3778fed850681d3..818f73392d1d880ad0dde5d84704c1e4c9d15c17 100644 (file)
@@ -11,19 +11,21 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistrationReply;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class DataChangeListenerRegistration extends AbstractUntypedActor {
 
-    private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+    private final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
         registration;
 
     public DataChangeListenerRegistration(
-        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
         this.registration = registration;
     }
 
@@ -36,14 +38,8 @@ public class DataChangeListenerRegistration extends AbstractUntypedActor {
     }
 
     public static Props props(
-        final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-        return Props.create(new Creator<DataChangeListenerRegistration>() {
-
-            @Override
-            public DataChangeListenerRegistration create() throws Exception {
-                return new DataChangeListenerRegistration(registration);
-            }
-        });
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        return Props.create(new DataChangeListenerRegistrationCreator(registration));
     }
 
     private void closeListenerRegistration(
@@ -53,4 +49,21 @@ public class DataChangeListenerRegistration extends AbstractUntypedActor {
             .tell(new CloseDataChangeListenerRegistrationReply().toSerializable(), getSelf());
         getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
+
+    private static class DataChangeListenerRegistrationCreator
+                                            implements Creator<DataChangeListenerRegistration> {
+        final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                           NormalizedNode<?, ?>>> registration;
+
+        DataChangeListenerRegistrationCreator(
+                ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
+                                                             NormalizedNode<?, ?>>> registration) {
+            this.registration = registration;
+        }
+
+        @Override
+        public DataChangeListenerRegistration create() throws Exception {
+            return new DataChangeListenerRegistration(registration);
+        }
+    }
 }
index 404a4e02033ea1c89f9fada4135cfe0e4b6a935e..51f3735f81ee6a9f4a5ad192cb6d1f1c27ebd031 100644 (file)
@@ -8,14 +8,16 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import java.util.concurrent.TimeUnit;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
 import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
-import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
@@ -34,6 +36,8 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import scala.concurrent.duration.Duration;
+
 /**
  *
  */
@@ -42,11 +46,10 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
 
     private final ActorContext actorContext;
-
-    private SchemaContext schemaContext;
+    private final ShardContext shardContext;
 
     public DistributedDataStore(ActorSystem actorSystem, String type, ClusterWrapper cluster,
-            Configuration configuration, InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            Configuration configuration, DistributedDataStoreProperties dataStoreProperties) {
         Preconditions.checkNotNull(actorSystem, "actorSystem should not be null");
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
@@ -57,13 +60,21 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
         LOG.info("Creating ShardManager : {}", shardManagerId);
 
-        this.actorContext = new ActorContext(actorSystem, actorSystem
-            .actorOf(ShardManager.props(type, cluster, configuration, dataStoreProperties),
+        shardContext = new ShardContext(InMemoryDOMDataStoreConfigProperties.create(
+                dataStoreProperties.getMaxShardDataChangeExecutorPoolSize(),
+                dataStoreProperties.getMaxShardDataChangeExecutorQueueSize(),
+                dataStoreProperties.getMaxShardDataChangeListenerQueueSize()),
+                Duration.create(dataStoreProperties.getShardTransactionIdleTimeoutInMinutes(),
+                        TimeUnit.MINUTES));
+
+        actorContext = new ActorContext(actorSystem, actorSystem
+            .actorOf(ShardManager.props(type, cluster, configuration, shardContext),
                 shardManagerId ), cluster, configuration);
     }
 
     public DistributedDataStore(ActorContext actorContext) {
         this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
+        this.shardContext = new ShardContext();
     }
 
 
@@ -80,7 +91,7 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         LOG.debug("Registering listener: {} for path: {} scope: {}", listener, path, scope);
 
         ActorRef dataChangeListenerActor = actorContext.getActorSystem().actorOf(
-            DataChangeListener.props(schemaContext,listener,path ));
+            DataChangeListener.props(listener ));
 
         String shardName = ShardStrategyFactory.getStrategy(path).findShard(path);
 
@@ -104,35 +115,31 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext, schemaContext);
+        return new TransactionChainProxy(actorContext);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
-            schemaContext);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
     }
 
-    @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
-        this.schemaContext = schemaContext;
-        actorContext.getShardManager().tell(
-            new UpdateSchemaContext(schemaContext), null);
+    @Override
+    public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        actorContext.setSchemaContext(schemaContext);
     }
 
-    @Override public void close() throws Exception {
+    @Override
+    public void close() throws Exception {
         actorContext.shutdown();
-
     }
 }
index a1a3e87510e78d151b769fd579c2602503ec656c..65a39a60e6c0819fe7f14543c268997a53fddff0 100644 (file)
@@ -11,12 +11,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 
 import org.opendaylight.controller.cluster.datastore.shardstrategy.ShardStrategyFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.controller.sal.core.api.model.SchemaService;
 
 public class DistributedDataStoreFactory {
     public static DistributedDataStore createInstance(String name, SchemaService schemaService,
-            InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            DistributedDataStoreProperties dataStoreProperties) {
 
         ActorSystem actorSystem = ActorSystemFactory.getInstance();
         Configuration config = new ConfigurationImpl("module-shards.conf", "modules.conf");
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreProperties.java
new file mode 100644 (file)
index 0000000..eb6a536
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Wrapper class for DistributedDataStore configuration properties.
+ *
+ * @author Thomas Pantelis
+ */
+public class DistributedDataStoreProperties {
+    private final int maxShardDataChangeListenerQueueSize;
+    private final int maxShardDataChangeExecutorQueueSize;
+    private final int maxShardDataChangeExecutorPoolSize;
+    private final int shardTransactionIdleTimeoutInMinutes;
+
+    public DistributedDataStoreProperties() {
+        maxShardDataChangeListenerQueueSize = 1000;
+        maxShardDataChangeExecutorQueueSize = 1000;
+        maxShardDataChangeExecutorPoolSize = 20;
+        shardTransactionIdleTimeoutInMinutes = 10;
+    }
+
+    public DistributedDataStoreProperties(int maxShardDataChangeListenerQueueSize,
+            int maxShardDataChangeExecutorQueueSize, int maxShardDataChangeExecutorPoolSize,
+            int shardTransactionIdleTimeoutInMinutes) {
+        this.maxShardDataChangeListenerQueueSize = maxShardDataChangeListenerQueueSize;
+        this.maxShardDataChangeExecutorQueueSize = maxShardDataChangeExecutorQueueSize;
+        this.maxShardDataChangeExecutorPoolSize = maxShardDataChangeExecutorPoolSize;
+        this.shardTransactionIdleTimeoutInMinutes = shardTransactionIdleTimeoutInMinutes;
+    }
+
+    public int getMaxShardDataChangeListenerQueueSize() {
+        return maxShardDataChangeListenerQueueSize;
+    }
+
+    public int getMaxShardDataChangeExecutorQueueSize() {
+        return maxShardDataChangeExecutorQueueSize;
+    }
+
+    public int getMaxShardDataChangeExecutorPoolSize() {
+        return maxShardDataChangeExecutorPoolSize;
+    }
+
+    public int getShardTransactionIdleTimeoutInMinutes() {
+        return shardTransactionIdleTimeoutInMinutes;
+    }
+}
index 75f540ade088e6bb45b10e9b92bc4af8789b0218..abcde747b93b132f8c492f25ac8af43f96c84a26 100644 (file)
@@ -15,11 +15,13 @@ import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
 import akka.serialization.Serialization;
+
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardTransactionIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shard.ShardMBeanFactory;
@@ -43,14 +45,15 @@ import org.opendaylight.controller.cluster.raft.RaftActor;
 import org.opendaylight.controller.cluster.raft.ReplicatedLogEntry;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -89,17 +92,20 @@ public class Shard extends RaftActor {
     /// The name of this shard
     private final ShardIdentifier name;
 
-    private volatile SchemaContext schemaContext;
-
     private final ShardStats shardMBean;
 
     private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
 
+    private final ShardContext shardContext;
+
+    private SchemaContext schemaContext;
+
     private Shard(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
-            InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            ShardContext shardContext) {
         super(name.toString(), mapPeerAddresses(peerAddresses), Optional.of(configParams));
 
         this.name = name;
+        this.shardContext = shardContext;
 
         String setting = System.getProperty("shard.persistent");
 
@@ -107,7 +113,8 @@ public class Shard extends RaftActor {
 
         LOG.info("Shard created : {} persistent : {}", name, persistent);
 
-        store = InMemoryDOMDataStoreFactory.create(name.toString(), null, dataStoreProperties);
+        store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
+                shardContext.getDataStoreProperties());
 
         shardMBean = ShardMBeanFactory.getShardStatsMBean(name.toString());
 
@@ -125,27 +132,16 @@ public class Shard extends RaftActor {
         return map;
     }
 
-
-
-
     public static Props props(final ShardIdentifier name,
         final Map<ShardIdentifier, String> peerAddresses,
-        final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+        ShardContext shardContext) {
         Preconditions.checkNotNull(name, "name should not be null");
-        Preconditions
-            .checkNotNull(peerAddresses, "peerAddresses should not be null");
+        Preconditions.checkNotNull(peerAddresses, "peerAddresses should not be null");
+        Preconditions.checkNotNull(shardContext, "shardContext should not be null");
 
-        return Props.create(new Creator<Shard>() {
-
-            @Override
-            public Shard create() throws Exception {
-                return new Shard(name, peerAddresses, dataStoreProperties);
-            }
-
-        });
+        return Props.create(new ShardCreator(name, peerAddresses, shardContext));
     }
 
-
     @Override public void onReceiveCommand(Object message) {
         LOG.debug("Received message {} from {}", message.getClass().toString(),
             getSender());
@@ -188,9 +184,8 @@ public class Shard extends RaftActor {
             shardMBean.incrementReadOnlyTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction
-                    .props(store.newReadOnlyTransaction(), getSelf(),
-                        schemaContext), transactionId.toString());
+                ShardTransaction.props(store.newReadOnlyTransaction(), getSelf(),
+                        schemaContext, shardContext), transactionId.toString());
 
         } else if (createTransaction.getTransactionType()
             == TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
@@ -198,9 +193,8 @@ public class Shard extends RaftActor {
             shardMBean.incrementReadWriteTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction
-                    .props(store.newReadWriteTransaction(), getSelf(),
-                        schemaContext), transactionId.toString());
+                ShardTransaction.props(store.newReadWriteTransaction(), getSelf(),
+                        schemaContext, shardContext), transactionId.toString());
 
 
         } else if (createTransaction.getTransactionType()
@@ -209,9 +203,8 @@ public class Shard extends RaftActor {
             shardMBean.incrementWriteOnlyTransactionCount();
 
             return getContext().actorOf(
-                ShardTransaction
-                    .props(store.newWriteOnlyTransaction(), getSelf(),
-                        schemaContext), transactionId.toString());
+                ShardTransaction.props(store.newWriteOnlyTransaction(), getSelf(),
+                        schemaContext, shardContext), transactionId.toString());
         } else {
             throw new IllegalArgumentException(
                 "Shard="+name + ":CreateTransaction message has unidentified transaction type="
@@ -269,12 +262,14 @@ public class Shard extends RaftActor {
         final ActorRef self = getSelf();
 
         Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
             public void onSuccess(Void v) {
                sender.tell(new CommitTransactionReply().toSerializable(),self);
                shardMBean.incrementCommittedTransactionCount();
                shardMBean.setLastCommittedTransactionTime(new Date());
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during commit");
                 shardMBean.incrementFailedTransactionsCount();
@@ -327,12 +322,10 @@ public class Shard extends RaftActor {
         dataChangeListeners.add(dataChangeListenerPath);
 
         AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>
-            listener =
-            new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
+            listener = new DataChangeListenerProxy(schemaContext, dataChangeListenerPath);
 
-        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
-            registration =
-            store.registerChangeListener(registerChangeListener.getPath(),
+        ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>
+            registration = store.registerChangeListener(registerChangeListener.getPath(),
                 listener, registerChangeListener.getScope());
         ActorRef listenerRegistration =
             getContext().actorOf(
@@ -349,12 +342,9 @@ public class Shard extends RaftActor {
 
     private void createTransactionChain() {
         DOMStoreTransactionChain chain = store.createTransactionChain();
-        ActorRef transactionChain =
-            getContext().actorOf(
-                ShardTransactionChain.props(chain, schemaContext));
-        getSender()
-            .tell(new CreateTransactionChainReply(transactionChain.path())
-                    .toSerializable(),
+        ActorRef transactionChain = getContext().actorOf(
+                ShardTransactionChain.props(chain, schemaContext, shardContext));
+        getSender().tell(new CreateTransactionChainReply(transactionChain.path()).toSerializable(),
                 getSelf());
     }
 
@@ -426,4 +416,25 @@ public class Shard extends RaftActor {
             return HEART_BEAT_INTERVAL;
         }
     }
+
+    private static class ShardCreator implements Creator<Shard> {
+
+        private static final long serialVersionUID = 1L;
+
+        final ShardIdentifier name;
+        final Map<ShardIdentifier, String> peerAddresses;
+        final ShardContext shardContext;
+
+        ShardCreator(ShardIdentifier name, Map<ShardIdentifier, String> peerAddresses,
+                ShardContext shardContext) {
+            this.name = name;
+            this.peerAddresses = peerAddresses;
+            this.shardContext = shardContext;
+        }
+
+        @Override
+        public Shard create() throws Exception {
+            return new Shard(name, peerAddresses, shardContext);
+        }
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardContext.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardContext.java
new file mode 100644 (file)
index 0000000..02bff77
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2014 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import java.util.concurrent.TimeUnit;
+
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import com.google.common.base.Preconditions;
+
+import scala.concurrent.duration.Duration;
+
+/**
+ * Contains contextual data for shards.
+ *
+ * @author Thomas Pantelis
+ */
+public class ShardContext {
+
+    private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+    private final Duration shardTransactionIdleTimeout;
+
+    public ShardContext() {
+        this.dataStoreProperties = null;
+        this.shardTransactionIdleTimeout = Duration.create(10, TimeUnit.MINUTES);
+    }
+
+    public ShardContext(InMemoryDOMDataStoreConfigProperties dataStoreProperties,
+            Duration shardTransactionIdleTimeout) {
+        this.dataStoreProperties = Preconditions.checkNotNull(dataStoreProperties);
+        this.shardTransactionIdleTimeout = Preconditions.checkNotNull(shardTransactionIdleTimeout);
+    }
+
+    public InMemoryDOMDataStoreConfigProperties getDataStoreProperties() {
+        return dataStoreProperties;
+    }
+
+    public Duration getShardTransactionIdleTimeout() {
+        return shardTransactionIdleTimeout;
+    }
+}
index 2972772a4840cf5117ef3b9c9ca2cb4947d2352c..c9b7c07e9a3a44d1b0c64425122e53a5b3199543 100644 (file)
@@ -17,7 +17,9 @@ import akka.actor.SupervisorStrategy;
 import akka.cluster.ClusterEvent;
 import akka.japi.Creator;
 import akka.japi.Function;
+
 import com.google.common.base.Preconditions;
+
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardManagerIdentifier;
 import org.opendaylight.controller.cluster.datastore.jmx.mbeans.shardmanager.ShardManagerInfo;
@@ -30,7 +32,6 @@ import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolve
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
 import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 
 import scala.concurrent.duration.Duration;
 
@@ -70,19 +71,19 @@ public class ShardManager extends AbstractUntypedActor {
 
     private ShardManagerInfoMBean mBean;
 
-    private final InMemoryDOMDataStoreConfigProperties dataStoreProperties;
+    private final ShardContext shardContext;
 
     /**
      * @param type defines the kind of data that goes into shards created by this shard manager. Examples of type would be
      *             configuration or operational
      */
     private ShardManager(String type, ClusterWrapper cluster, Configuration configuration,
-            InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+            ShardContext shardContext) {
 
         this.type = Preconditions.checkNotNull(type, "type should not be null");
         this.cluster = Preconditions.checkNotNull(cluster, "cluster should not be null");
         this.configuration = Preconditions.checkNotNull(configuration, "configuration should not be null");
-        this.dataStoreProperties = dataStoreProperties;
+        this.shardContext = shardContext;
 
         // Subscribe this actor to cluster member events
         cluster.subscribeToMemberEvents(getSelf());
@@ -95,22 +96,15 @@ public class ShardManager extends AbstractUntypedActor {
     public static Props props(final String type,
         final ClusterWrapper cluster,
         final Configuration configuration,
-        final InMemoryDOMDataStoreConfigProperties dataStoreProperties) {
+        final ShardContext shardContext) {
 
         Preconditions.checkNotNull(type, "type should not be null");
         Preconditions.checkNotNull(cluster, "cluster should not be null");
         Preconditions.checkNotNull(configuration, "configuration should not be null");
 
-        return Props.create(new Creator<ShardManager>() {
-
-            @Override
-            public ShardManager create() throws Exception {
-                return new ShardManager(type, cluster, configuration, dataStoreProperties);
-            }
-        });
+        return Props.create(new ShardManagerCreator(type, cluster, configuration, shardContext));
     }
 
-
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass().equals(FindPrimary.SERIALIZABLE_CLASS)) {
@@ -250,7 +244,7 @@ public class ShardManager extends AbstractUntypedActor {
             ShardIdentifier shardId = getShardIdentifier(memberName, shardName);
             Map<ShardIdentifier, String> peerAddresses = getPeerAddresses(shardName);
             ActorRef actor = getContext()
-                .actorOf(Shard.props(shardId, peerAddresses, dataStoreProperties),
+                .actorOf(Shard.props(shardId, peerAddresses, shardContext),
                     shardId.toString());
             localShardActorNames.add(shardId.toString());
             localShards.put(shardName, new ShardInformation(shardName, actor, peerAddresses));
@@ -351,6 +345,28 @@ public class ShardManager extends AbstractUntypedActor {
             }
         }
     }
+
+    private static class ShardManagerCreator implements Creator<ShardManager> {
+        private static final long serialVersionUID = 1L;
+
+        final String type;
+        final ClusterWrapper cluster;
+        final Configuration configuration;
+        final ShardContext shardContext;
+
+        ShardManagerCreator(String type, ClusterWrapper cluster,
+                Configuration configuration, ShardContext shardContext) {
+            this.type = type;
+            this.cluster = cluster;
+            this.configuration = configuration;
+            this.shardContext = shardContext;
+        }
+
+        @Override
+        public ShardManager create() throws Exception {
+            return new ShardManager(type, cluster, configuration, shardContext);
+        }
+    }
 }
 
 
index 1328d466f34b6fff82a91b98379cd815b488155c..c54d3739b22c3b42d2a93c368afd3eb947ce681b 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -27,40 +23,27 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardReadTransaction extends ShardTransaction {
-  private final DOMStoreReadTransaction transaction;
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-
-  public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(shardActor, schemaContext);
-    this.transaction = transaction;
-
-  }
+    private final DOMStoreReadTransaction transaction;
 
-  public ShardReadTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(transactionChain, shardActor, schemaContext);
-    this.transaction = transaction;
-  }
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-        readData(transaction, ReadData.fromSerializable(message));
-    } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-        dataExists(transaction, DataExists.fromSerializable(message));
-    } else {
-      super.handleReceive(message);
+    public ShardReadTransaction(DOMStoreReadTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext) {
+        super(shardActor, schemaContext);
+        this.transaction = transaction;
     }
-  }
-  protected void closeTransaction(CloseTransaction message) {
-    transaction.close();
-    getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
-    getSelf().tell(PoisonPill.getInstance(), getSelf());
-  }
 
-  //default scope test method to check if we get correct exception
-  void forUnitTestOnlyExplicitTransactionClose(){
-      transaction.close();
-  }
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readData(transaction, ReadData.fromSerializable(message));
+        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            dataExists(transaction, DataExists.fromSerializable(message));
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
+    @Override
+    protected DOMStoreTransaction getDOMStoreTransaction() {
+        return transaction;
+    }
 }
index 49c7b7e78f52295e9b3b1fc5b1ae2ea10926def2..1dc9ce0a3ecd99f552c13cf19ec9ff393ebdfd51 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
@@ -23,7 +19,7 @@ import org.opendaylight.controller.cluster.datastore.messages.ReadData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
@@ -31,50 +27,35 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardReadWriteTransaction extends ShardTransaction {
-  private final DOMStoreReadWriteTransaction transaction;
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-  public ShardReadWriteTransaction(DOMStoreTransactionChain transactionChain, DOMStoreReadWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(transactionChain,  shardActor, schemaContext);
-    this.transaction = transaction;
-  }
-
-  public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super( shardActor, schemaContext);
-    this.transaction = transaction;
-  }
+    private final DOMStoreReadWriteTransaction transaction;
 
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if (ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      readData(transaction,ReadData.fromSerializable(message));
-    }else if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      writeData(transaction, WriteData.fromSerializable(message, schemaContext));
-    } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
-    } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      deleteData(transaction,DeleteData.fromSerializable(message));
-    } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      readyTransaction(transaction,new ReadyTransaction());
-    } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
-        dataExists(transaction, DataExists.fromSerializable(message));
-    }else {
-      super.handleReceive(message);
+    public ShardReadWriteTransaction(DOMStoreReadWriteTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext) {
+        super(shardActor, schemaContext);
+        this.transaction = transaction;
     }
-  }
 
-  protected void closeTransaction(CloseTransaction message) {
-    transaction.close();
-    getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
-    getSelf().tell(PoisonPill.getInstance(), getSelf());
-  }
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(ReadData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readData(transaction, ReadData.fromSerializable(message));
+        } else if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            writeData(transaction, WriteData.fromSerializable(message, schemaContext));
+        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
+        } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            deleteData(transaction, DeleteData.fromSerializable(message));
+        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(transaction, new ReadyTransaction());
+        } else if(DataExists.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            dataExists(transaction, DataExists.fromSerializable(message));
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
-    /**
-     * The following method is used in unit testing only
-     * hence the default scope.
-     * This is done to test out failure cases.
-     */
-    public void forUnitTestOnlyExplicitTransactionClose() {
-        transaction.close();
+    @Override
+    protected DOMStoreTransaction getDOMStoreTransaction() {
+        return transaction;
     }
 }
index 360a10722c06df2c3e357631b43d89eac801a3d3..365960713dc84754258b364765d0c0f0b794d83b 100644 (file)
@@ -9,14 +9,17 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
 import akka.actor.Props;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
+import akka.actor.ReceiveTimeout;
 import akka.japi.Creator;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.CheckedFuture;
+
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
+import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.DataExists;
 import org.opendaylight.controller.cluster.datastore.messages.DataExistsReply;
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
@@ -39,7 +42,7 @@ import org.opendaylight.controller.md.sal.common.api.data.ReadFailedException;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
@@ -69,142 +72,74 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  */
 public abstract class ShardTransaction extends AbstractUntypedActor {
 
-  private final ActorRef shardActor;
-  protected final SchemaContext schemaContext;
-
-  // FIXME : see below
-  // If transactionChain is not null then this transaction is part of a
-  // transactionChain. Not really clear as to what that buys us
-  private final DOMStoreTransactionChain transactionChain;
-
-
-  private final MutableCompositeModification modification =
-      new MutableCompositeModification();
-
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-
-  protected ShardTransaction(
-                          ActorRef shardActor, SchemaContext schemaContext) {
-    this(null,  shardActor, schemaContext);
-  }
-
-  protected ShardTransaction(DOMStoreTransactionChain transactionChain,
-                          ActorRef shardActor, SchemaContext schemaContext) {
-    this.transactionChain = transactionChain;
-    this.shardActor = shardActor;
-    this.schemaContext = schemaContext;
-  }
-
-
-
-  public static Props props(final DOMStoreReadTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadTransaction(transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadTransaction(transactionChain, transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreReadWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadWriteTransaction(transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreReadWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardReadWriteTransaction(transactionChain, transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-
-  public static Props props(final DOMStoreWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardWriteTransaction(transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-  public static Props props(final DOMStoreTransactionChain transactionChain, final DOMStoreWriteTransaction transaction,
-                            final ActorRef shardActor, final SchemaContext schemaContext) {
-    return Props.create(new Creator<ShardTransaction>() {
-
-      @Override
-      public ShardTransaction create() throws Exception {
-        return new ShardWriteTransaction(transactionChain, transaction, shardActor, schemaContext);
-      }
-    });
-  }
-
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-     if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
-      closeTransaction(new CloseTransaction());
-    } else if (message instanceof GetCompositedModification) {
-      // This is here for testing only
-      getSender().tell(new GetCompositeModificationReply(
-          new ImmutableCompositeModification(modification)), getSelf());
-    }else{
-         throw new UnknownMessageException(message);
+    private final ActorRef shardActor;
+    protected final SchemaContext schemaContext;
+
+    private final MutableCompositeModification modification = new MutableCompositeModification();
+
+    protected ShardTransaction(ActorRef shardActor, SchemaContext schemaContext) {
+        this.shardActor = shardActor;
+        this.schemaContext = schemaContext;
     }
-  }
 
-  abstract protected  void closeTransaction(CloseTransaction message);
+    public static Props props(DOMStoreTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext, ShardContext shardContext) {
+        return Props.create(new ShardTransactionCreator(transaction, shardActor, schemaContext,
+                shardContext));
+    }
 
-  protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
-    final ActorRef sender = getSender();
-    final ActorRef self = getSelf();
-    final YangInstanceIdentifier path = message.getPath();
-    final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
-          transaction.read(path);
+    protected abstract DOMStoreTransaction getDOMStoreTransaction();
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if (message.getClass().equals(CloseTransaction.SERIALIZABLE_CLASS)) {
+            closeTransaction(true);
+        } else if (message instanceof GetCompositedModification) {
+            // This is here for testing only
+            getSender().tell(new GetCompositeModificationReply(
+                    new ImmutableCompositeModification(modification)), getSelf());
+        } else if (message instanceof ReceiveTimeout) {
+            LOG.debug("Got ReceiveTimeout for inactivity - closing Tx");
+            closeTransaction(false);
+        } else {
+            throw new UnknownMessageException(message);
+        }
+    }
 
-      future.addListener(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
-          if (optional.isPresent()) {
-            sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
-          } else {
-            sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
-          }
-        } catch (Exception e) {
-            sender.tell(new akka.actor.Status.Failure(e),self);
+    private void closeTransaction(boolean sendReply) {
+        getDOMStoreTransaction().close();
+
+        if(sendReply) {
+            getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
         }
 
-      }
-    }, getContext().dispatcher());
-  }
+        getSelf().tell(PoisonPill.getInstance(), getSelf());
+    }
+
+    protected void readData(DOMStoreReadTransaction transaction,ReadData message) {
+        final ActorRef sender = getSender();
+        final ActorRef self = getSelf();
+        final YangInstanceIdentifier path = message.getPath();
+        final CheckedFuture<Optional<NormalizedNode<?, ?>>, ReadFailedException> future =
+                transaction.read(path);
+
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    Optional<NormalizedNode<?, ?>> optional = future.checkedGet();
+                    if (optional.isPresent()) {
+                        sender.tell(new ReadDataReply(schemaContext,optional.get()).toSerializable(), self);
+                    } else {
+                        sender.tell(new ReadDataReply(schemaContext,null).toSerializable(), self);
+                    }
+                } catch (Exception e) {
+                    sender.tell(new akka.actor.Status.Failure(e),self);
+                }
+
+            }
+        }, getContext().dispatcher());
+    }
 
     protected void dataExists(DOMStoreReadTransaction transaction, DataExists message) {
         final YangInstanceIdentifier path = message.getPath();
@@ -218,71 +153,104 @@ public abstract class ShardTransaction extends AbstractUntypedActor {
 
     }
 
-  protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
-    modification.addModification(
-        new WriteModification(message.getPath(), message.getData(),schemaContext));
-    LOG.debug("writeData at path : " + message.getPath().toString());
+    protected void writeData(DOMStoreWriteTransaction transaction, WriteData message) {
+        modification.addModification(
+                new WriteModification(message.getPath(), message.getData(),schemaContext));
+        LOG.debug("writeData at path : " + message.getPath().toString());
 
-    try {
-        transaction.write(message.getPath(), message.getData());
-        getSender().tell(new WriteDataReply().toSerializable(), getSelf());
-    }catch(Exception e){
-        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        try {
+            transaction.write(message.getPath(), message.getData());
+            getSender().tell(new WriteDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
-  }
-
-  protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
-    modification.addModification(
-        new MergeModification(message.getPath(), message.getData(), schemaContext));
-    LOG.debug("mergeData at path : " + message.getPath().toString());
-    try {
-        transaction.merge(message.getPath(), message.getData());
-        getSender().tell(new MergeDataReply().toSerializable(), getSelf());
-    }catch(Exception e){
-        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+    protected void mergeData(DOMStoreWriteTransaction transaction, MergeData message) {
+        modification.addModification(
+                new MergeModification(message.getPath(), message.getData(), schemaContext));
+        LOG.debug("mergeData at path : " + message.getPath().toString());
+        try {
+            transaction.merge(message.getPath(), message.getData());
+            getSender().tell(new MergeDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
-  }
-
-  protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
-    LOG.debug("deleteData at path : " + message.getPath().toString());
-    modification.addModification(new DeleteModification(message.getPath()));
-    try {
-        transaction.delete(message.getPath());
-        getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
-    }catch(Exception e){
-        getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+
+    protected void deleteData(DOMStoreWriteTransaction transaction, DeleteData message) {
+        LOG.debug("deleteData at path : " + message.getPath().toString());
+        modification.addModification(new DeleteModification(message.getPath()));
+        try {
+            transaction.delete(message.getPath());
+            getSender().tell(new DeleteDataReply().toSerializable(), getSelf());
+        }catch(Exception e){
+            getSender().tell(new akka.actor.Status.Failure(e), getSelf());
+        }
     }
-  }
 
-  protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
-    DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
-    ActorRef cohortActor = getContext().actorOf(
-        ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
-    getSender()
+    protected void readyTransaction(DOMStoreWriteTransaction transaction, ReadyTransaction message) {
+        DOMStoreThreePhaseCommitCohort cohort =  transaction.ready();
+        ActorRef cohortActor = getContext().actorOf(
+                ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
+        getSender()
         .tell(new ReadyTransactionReply(cohortActor.path()).toSerializable(), getSelf());
 
-  }
+    }
 
+    private static class ShardTransactionCreator implements Creator<ShardTransaction> {
 
-  // These classes are in here for test purposes only
+        private static final long serialVersionUID = 1L;
 
+        final DOMStoreTransaction transaction;
+        final ActorRef shardActor;
+        final SchemaContext schemaContext;
+        final ShardContext shardContext;
 
-  static class GetCompositedModification {
+        ShardTransactionCreator(DOMStoreTransaction transaction, ActorRef shardActor,
+                SchemaContext schemaContext, ShardContext actorContext) {
+            this.transaction = transaction;
+            this.shardActor = shardActor;
+            this.shardContext = actorContext;
+            this.schemaContext = schemaContext;
+        }
 
-  }
+        @Override
+        public ShardTransaction create() throws Exception {
+            ShardTransaction tx;
+            if(transaction instanceof DOMStoreReadWriteTransaction) {
+                tx = new ShardReadWriteTransaction((DOMStoreReadWriteTransaction)transaction,
+                        shardActor, schemaContext);
+            } else if(transaction instanceof DOMStoreReadTransaction) {
+                tx = new ShardReadTransaction((DOMStoreReadTransaction)transaction, shardActor,
+                        schemaContext);
+            } else {
+                tx = new ShardWriteTransaction((DOMStoreWriteTransaction)transaction,
+                        shardActor, schemaContext);
+            }
+
+            tx.getContext().setReceiveTimeout(shardContext.getShardTransactionIdleTimeout());
+            return tx;
+        }
+    }
 
+    // These classes are in here for test purposes only
 
-  static class GetCompositeModificationReply {
-    private final CompositeModification modification;
+    static class GetCompositedModification {
+    }
 
 
-    GetCompositeModificationReply(CompositeModification modification) {
-      this.modification = modification;
-    }
+    static class GetCompositeModificationReply {
+        private final CompositeModification modification;
 
 
-    public CompositeModification getModification() {
-      return modification;
+        GetCompositeModificationReply(CompositeModification modification) {
+            this.modification = modification;
+        }
+
+
+        public CompositeModification getModification() {
+            return modification;
+        }
     }
-  }
 }
index c508255ea490ee09b370dae8a49320495166c820..42bd257ad1cdb97b3d28ab2588bbe6161894b8e9 100644 (file)
@@ -11,6 +11,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.japi.Creator;
+
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -24,10 +25,13 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class ShardTransactionChain extends AbstractUntypedActor {
 
     private final DOMStoreTransactionChain chain;
+    private final ShardContext shardContext;
     private final SchemaContext schemaContext;
 
-    public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext) {
+    public ShardTransactionChain(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+            ShardContext shardContext) {
         this.chain = chain;
+        this.shardContext = shardContext;
         this.schemaContext = schemaContext;
     }
 
@@ -48,23 +52,29 @@ public class ShardTransactionChain extends AbstractUntypedActor {
         return getContext().parent();
     }
 
-  private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,String transactionId){
-    if(createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_ONLY.ordinal()){
-      return getContext().actorOf(
-          ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(), schemaContext), transactionId);
-
-    }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.READ_WRITE.ordinal()){
-      return getContext().actorOf(
-          ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(), schemaContext), transactionId);
-
-
-    }else if (createTransaction.getTransactionType()== TransactionProxy.TransactionType.WRITE_ONLY.ordinal()){
-      return getContext().actorOf(
-          ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(), schemaContext), transactionId);
-    }else{
-      throw new IllegalArgumentException ("CreateTransaction message has unidentified transaction type="+createTransaction.getTransactionType()) ;
+    private ActorRef createTypedTransactionActor(CreateTransaction createTransaction,
+            String transactionId) {
+        if(createTransaction.getTransactionType() ==
+                TransactionProxy.TransactionType.READ_ONLY.ordinal()) {
+            return getContext().actorOf(
+                    ShardTransaction.props( chain.newReadOnlyTransaction(), getShardActor(),
+                            schemaContext, shardContext), transactionId);
+        } else if (createTransaction.getTransactionType() ==
+                TransactionProxy.TransactionType.READ_WRITE.ordinal()) {
+            return getContext().actorOf(
+                    ShardTransaction.props( chain.newReadWriteTransaction(), getShardActor(),
+                            schemaContext, shardContext), transactionId);
+        } else if (createTransaction.getTransactionType() ==
+                TransactionProxy.TransactionType.WRITE_ONLY.ordinal()) {
+            return getContext().actorOf(
+                    ShardTransaction.props( chain.newWriteOnlyTransaction(), getShardActor(),
+                            schemaContext, shardContext), transactionId);
+        } else {
+            throw new IllegalArgumentException (
+                    "CreateTransaction message has unidentified transaction type=" +
+                             createTransaction.getTransactionType());
+        }
     }
-  }
 
     private void createTransaction(CreateTransaction createTransaction) {
 
@@ -74,13 +84,28 @@ public class ShardTransactionChain extends AbstractUntypedActor {
                 getSelf());
     }
 
-    public static Props props(final DOMStoreTransactionChain chain, final SchemaContext schemaContext) {
-        return Props.create(new Creator<ShardTransactionChain>() {
+    public static Props props(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+            ShardContext shardContext) {
+        return Props.create(new ShardTransactionChainCreator(chain, schemaContext, shardContext));
+    }
 
-            @Override
-            public ShardTransactionChain create() throws Exception {
-                return new ShardTransactionChain(chain, schemaContext);
-            }
-        });
+    private static class ShardTransactionChainCreator implements Creator<ShardTransactionChain> {
+        private static final long serialVersionUID = 1L;
+
+        final DOMStoreTransactionChain chain;
+        final ShardContext shardContext;
+        final SchemaContext schemaContext;
+
+        ShardTransactionChainCreator(DOMStoreTransactionChain chain, SchemaContext schemaContext,
+                ShardContext shardContext) {
+            this.chain = chain;
+            this.shardContext = shardContext;
+            this.schemaContext = schemaContext;
+        }
+
+        @Override
+        public ShardTransactionChain create() throws Exception {
+            return new ShardTransactionChain(chain, schemaContext, shardContext);
+        }
     }
 }
index b01fe7d4ac11a4a0eea7cfa28b16f5bea8e7aec5..8b4d576163c94b873a3fd09ed99bd580392813bc 100644 (file)
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
-import akka.actor.PoisonPill;
-import akka.event.Logging;
-import akka.event.LoggingAdapter;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransaction;
-import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionReply;
+
 import org.opendaylight.controller.cluster.datastore.messages.DeleteData;
 import org.opendaylight.controller.cluster.datastore.messages.MergeData;
 import org.opendaylight.controller.cluster.datastore.messages.ReadyTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.WriteData;
-import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
@@ -29,47 +25,31 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
  * Date: 8/6/14
  */
 public class ShardWriteTransaction extends ShardTransaction {
-  private final DOMStoreWriteTransaction transaction;
-  private final LoggingAdapter log =
-      Logging.getLogger(getContext().system(), this);
-  public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super( shardActor, schemaContext);
-    this.transaction = transaction;
-
-  }
+    private final DOMStoreWriteTransaction transaction;
 
-  public ShardWriteTransaction(DOMStoreTransactionChain transactionChain, DOMStoreWriteTransaction transaction, ActorRef shardActor, SchemaContext schemaContext) {
-    super(transactionChain, shardActor, schemaContext);
-    this.transaction = transaction;
-  }
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if (WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      writeData(transaction, WriteData.fromSerializable(message, schemaContext));
-    } else if (MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
-    } else if (DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      deleteData(transaction,DeleteData.fromSerializable(message));
-    } else if (ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
-      readyTransaction(transaction,new ReadyTransaction());
-    }else {
-      super.handleReceive(message);
+    public ShardWriteTransaction(DOMStoreWriteTransaction transaction, ActorRef shardActor,
+            SchemaContext schemaContext) {
+        super(shardActor, schemaContext);
+        this.transaction = transaction;
     }
-  }
 
-  protected void closeTransaction(CloseTransaction message) {
-    transaction.close();
-    getSender().tell(new CloseTransactionReply().toSerializable(), getSelf());
-    getSelf().tell(PoisonPill.getInstance(), getSelf());
-  }
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if(WriteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            writeData(transaction, WriteData.fromSerializable(message, schemaContext));
+        } else if(MergeData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            mergeData(transaction, MergeData.fromSerializable(message, schemaContext));
+        } else if(DeleteData.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            deleteData(transaction, DeleteData.fromSerializable(message));
+        } else if(ReadyTransaction.SERIALIZABLE_CLASS.equals(message.getClass())) {
+            readyTransaction(transaction, new ReadyTransaction());
+        } else {
+            super.handleReceive(message);
+        }
+    }
 
-    /**
-     * The following method is used in unit testing only
-     * hence the default scope.
-     * This is done to test out failure cases.
-     */
-    public void forUnitTestOnlyExplicitTransactionClose() {
-        transaction.close();
+    @Override
+    protected DOMStoreTransaction getDOMStoreTransaction() {
+        return transaction;
     }
 }
index 34d35312838fd4346e7446161cd4f1f1c8f9cc74..d0c29294cbea9b992644817436ad21fb5f81d46b 100644 (file)
@@ -14,9 +14,11 @@ import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
 import akka.japi.Creator;
+
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.AbortTransactionReply;
 import org.opendaylight.controller.cluster.datastore.messages.CanCommitTransaction;
@@ -46,16 +48,9 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
 
     public static Props props(final DOMStoreThreePhaseCommitCohort cohort,
         final ActorRef shardActor, final CompositeModification modification) {
-        return Props.create(new Creator<ThreePhaseCommitCohort>() {
-            @Override
-            public ThreePhaseCommitCohort create() throws Exception {
-                return new ThreePhaseCommitCohort(cohort, shardActor,
-                    modification);
-            }
-        });
+        return Props.create(new ThreePhaseCommitCohortCreator(cohort, shardActor, modification));
     }
 
-
     @Override
     public void handleReceive(Object message) throws Exception {
         if (message.getClass()
@@ -81,12 +76,14 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ActorRef self = getSelf();
 
         Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
             public void onSuccess(Void v) {
                 sender
                     .tell(new AbortTransactionReply().toSerializable(),
                         self);
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during abort");
                 sender
@@ -110,12 +107,14 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         Futures.addCallback(future, new FutureCallback<Void>() {
+            @Override
             public void onSuccess(Void v) {
                 sender
                     .tell(new PreCommitTransactionReply().toSerializable(),
                         self);
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during pre-commit");
                 sender
@@ -130,18 +129,36 @@ public class ThreePhaseCommitCohort extends AbstractUntypedActor {
         final ActorRef sender = getSender();
         final ActorRef self = getSelf();
         Futures.addCallback(future, new FutureCallback<Boolean>() {
+            @Override
             public void onSuccess(Boolean canCommit) {
                 sender.tell(new CanCommitTransactionReply(canCommit)
                     .toSerializable(), self);
             }
 
+            @Override
             public void onFailure(Throwable t) {
                 LOG.error(t, "An exception happened during canCommit");
                 sender
                     .tell(new akka.actor.Status.Failure(t), self);
             }
         });
+    }
+
+    private static class ThreePhaseCommitCohortCreator implements Creator<ThreePhaseCommitCohort> {
+        final DOMStoreThreePhaseCommitCohort cohort;
+        final ActorRef shardActor;
+        final CompositeModification modification;
 
+        ThreePhaseCommitCohortCreator(DOMStoreThreePhaseCommitCohort cohort,
+                ActorRef shardActor, CompositeModification modification) {
+            this.cohort = cohort;
+            this.shardActor = shardActor;
+            this.modification = modification;
+        }
 
+        @Override
+        public ThreePhaseCommitCohort create() throws Exception {
+            return new ThreePhaseCommitCohort(cohort, shardActor, modification);
+        }
     }
 }
index 76bbef713c350ad975c9dbb256bdef83bd1e4915..9b4610a99c4a6e5977115f560d12b551131e9be5 100644 (file)
@@ -13,36 +13,33 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
-import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
-    private final SchemaContext schemaContext;
 
-    public TransactionChainProxy(ActorContext actorContext, SchemaContext schemaContext) {
+    public TransactionChainProxy(ActorContext actorContext) {
         this.actorContext = actorContext;
-        this.schemaContext = schemaContext;
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+            TransactionProxy.TransactionType.READ_ONLY);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE, schemaContext);
+            TransactionProxy.TransactionType.READ_WRITE);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY, schemaContext);
+            TransactionProxy.TransactionType.WRITE_ONLY);
     }
 
     @Override
index 2e7b2feb85bd97c7b80d09b8156f4b44c6fb343e..f447f3c718980285d32e79c5a9b2579e26309971 100644 (file)
@@ -13,6 +13,8 @@ import akka.actor.ActorSelection;
 import akka.dispatch.OnComplete;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.FinalizablePhantomReference;
+import com.google.common.base.FinalizableReferenceQueue;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -52,6 +54,8 @@ import scala.runtime.AbstractFunction1;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -87,18 +91,98 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         LOG = LoggerFactory.getLogger(TransactionProxy.class);
 
 
+    /**
+     * Used to enqueue the PhantomReferences for read-only TransactionProxy instances. The
+     * FinalizableReferenceQueue is safe to use statically in an OSGi environment as it uses some
+     * trickery to clean up its internal thread when the bundle is unloaded.
+     */
+    private static final FinalizableReferenceQueue phantomReferenceQueue =
+                                                                  new FinalizableReferenceQueue();
+
+    /**
+     * This stores the TransactionProxyCleanupPhantomReference instances statically, This is
+     * necessary because PhantomReferences need a hard reference so they're not garbage collected.
+     * Once finalized, the TransactionProxyCleanupPhantomReference removes itself from this map
+     * and thus becomes eligible for garbage collection.
+     */
+    private static final Map<TransactionProxyCleanupPhantomReference,
+                             TransactionProxyCleanupPhantomReference> phantomReferenceCache =
+                                                                        new ConcurrentHashMap<>();
+
+    /**
+     * A PhantomReference that closes remote transactions for a TransactionProxy when it's
+     * garbage collected. This is used for read-only transactions as they're not explicitly closed
+     * by clients. So the only way to detect that a transaction is no longer in use and it's safe
+     * to clean up is when it's garbage collected. It's inexact as to when an instance will be GC'ed
+     * but TransactionProxy instances should generally be short-lived enough to avoid being moved
+     * to the old generation space and thus should be cleaned up in a timely manner as the GC
+     * runs on the young generation (eden, swap1...) space much more frequently.
+     */
+    private static class TransactionProxyCleanupPhantomReference
+                                           extends FinalizablePhantomReference<TransactionProxy> {
+
+        private final List<ActorSelection> remoteTransactionActors;
+        private final AtomicBoolean remoteTransactionActorsMB;
+        private final ActorContext actorContext;
+        private final TransactionIdentifier identifier;
+
+        protected TransactionProxyCleanupPhantomReference(TransactionProxy referent) {
+            super(referent, phantomReferenceQueue);
+
+            // Note we need to cache the relevant fields from the TransactionProxy as we can't
+            // have a hard reference to the TransactionProxy instance itself.
+
+            remoteTransactionActors = referent.remoteTransactionActors;
+            remoteTransactionActorsMB = referent.remoteTransactionActorsMB;
+            actorContext = referent.actorContext;
+            identifier = referent.identifier;
+        }
+
+        @Override
+        public void finalizeReferent() {
+            LOG.trace("Cleaning up {} Tx actors for TransactionProxy {}",
+                    remoteTransactionActors.size(), identifier);
+
+            phantomReferenceCache.remove(this);
+
+            // Access the memory barrier volatile to ensure all previous updates to the
+            // remoteTransactionActors list are visible to this thread.
+
+            if(remoteTransactionActorsMB.get()) {
+                for(ActorSelection actor : remoteTransactionActors) {
+                    LOG.trace("Sending CloseTransaction to {}", actor);
+                    actorContext.sendRemoteOperationAsync(actor,
+                            new CloseTransaction().toSerializable());
+                }
+            }
+        }
+    }
+
+    /**
+     * Stores the remote Tx actors for each requested data store path to be used by the
+     * PhantomReference to close the remote Tx's. This is only used for read-only Tx's. The
+     * remoteTransactionActorsMB volatile serves as a memory barrier to publish updates to the
+     * remoteTransactionActors list so they will be visible to the thread accessing the
+     * PhantomReference.
+     */
+    private List<ActorSelection> remoteTransactionActors;
+    private AtomicBoolean remoteTransactionActorsMB;
+
+    private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
+
     private final TransactionType transactionType;
     private final ActorContext actorContext;
-    private final Map<String, TransactionContext> remoteTransactionPaths = new HashMap<>();
     private final TransactionIdentifier identifier;
     private final SchemaContext schemaContext;
     private boolean inReadyState;
 
-    public TransactionProxy(ActorContext actorContext, TransactionType transactionType,
-            SchemaContext schemaContext) {
-        this.actorContext = Preconditions.checkNotNull(actorContext, "actorContext should not be null");
-        this.transactionType = Preconditions.checkNotNull(transactionType, "transactionType should not be null");
-        this.schemaContext = Preconditions.checkNotNull(schemaContext, "schemaContext should not be null");
+    public TransactionProxy(ActorContext actorContext, TransactionType transactionType) {
+        this.actorContext = Preconditions.checkNotNull(actorContext,
+                "actorContext should not be null");
+        this.transactionType = Preconditions.checkNotNull(transactionType,
+                "transactionType should not be null");
+        this.schemaContext = Preconditions.checkNotNull(actorContext.getSchemaContext(),
+                "schemaContext should not be null");
 
         String memberName = actorContext.getCurrentMemberName();
         if(memberName == null){
@@ -108,8 +192,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         this.identifier = TransactionIdentifier.builder().memberName(memberName).counter(
                 counter.getAndIncrement()).build();
 
-        LOG.debug("Created txn {} of type {}", identifier, transactionType);
+        if(transactionType == TransactionType.READ_ONLY) {
+            // Read-only Tx's aren't explicitly closed by the client so we create a PhantomReference
+            // to close the remote Tx's when this instance is no longer in use and is garbage
+            // collected.
 
+            remoteTransactionActors = Lists.newArrayList();
+            remoteTransactionActorsMB = new AtomicBoolean();
+
+            TransactionProxyCleanupPhantomReference cleanup =
+                                              new TransactionProxyCleanupPhantomReference(this);
+            phantomReferenceCache.put(cleanup, cleanup);
+        }
+
+        LOG.debug("Created txn {} of type {}", identifier, transactionType);
     }
 
     @VisibleForTesting
@@ -226,6 +322,13 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         for(TransactionContext transactionContext : remoteTransactionPaths.values()) {
             transactionContext.closeTransaction();
         }
+
+        remoteTransactionPaths.clear();
+
+        if(transactionType == TransactionType.READ_ONLY) {
+            remoteTransactionActors.clear();
+            remoteTransactionActorsMB.set(true);
+        }
     }
 
     private TransactionContext transactionContext(YangInstanceIdentifier path){
@@ -260,11 +363,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
 
                 LOG.debug("Tx {} Received transaction path = {}", identifier, transactionPath);
 
-                ActorSelection transactionActor =
-                    actorContext.actorSelection(transactionPath);
-                transactionContext =
-                    new TransactionContextImpl(shardName, transactionPath,
-                        transactionActor);
+                ActorSelection transactionActor = actorContext.actorSelection(transactionPath);
+
+                if(transactionType == TransactionType.READ_ONLY) {
+                    // Add the actor to the remoteTransactionActors list for access by the
+                    // cleanup PhantonReference.
+                    remoteTransactionActors.add(transactionActor);
+
+                    // Write to the memory barrier volatile to publish the above update to the
+                    // remoteTransactionActors list for thread visibility.
+                    remoteTransactionActorsMB.set(true);
+                }
+
+                transactionContext = new TransactionContextImpl(shardName, transactionPath,
+                        transactionActor, identifier, actorContext, schemaContext);
 
                 remoteTransactionPaths.put(shardName, transactionContext);
             } else {
@@ -273,7 +385,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         } catch(Exception e){
             LOG.debug("Tx {} Creating NoOpTransaction because of : {}", identifier, e.getMessage());
-            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e));
+            remoteTransactionPaths.put(shardName, new NoOpTransactionContext(shardName, e, identifier));
         }
     }
 
@@ -298,13 +410,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         List<Future<Object>> getRecordedOperationFutures();
     }
 
-    private abstract class AbstractTransactionContext implements TransactionContext {
+    private static abstract class AbstractTransactionContext implements TransactionContext {
 
+        protected final TransactionIdentifier identifier;
         protected final String shardName;
         protected final List<Future<Object>> recordedOperationFutures = Lists.newArrayList();
 
-        AbstractTransactionContext(String shardName) {
+        AbstractTransactionContext(String shardName, TransactionIdentifier identifier) {
             this.shardName = shardName;
+            this.identifier = identifier;
         }
 
         @Override
@@ -318,17 +432,22 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
     }
 
-    private class TransactionContextImpl extends AbstractTransactionContext {
+    private static class TransactionContextImpl extends AbstractTransactionContext {
         private final Logger LOG = LoggerFactory.getLogger(TransactionContextImpl.class);
 
+        private final ActorContext actorContext;
+        private final SchemaContext schemaContext;
         private final String actorPath;
         private final ActorSelection actor;
 
         private TransactionContextImpl(String shardName, String actorPath,
-            ActorSelection actor) {
-            super(shardName);
+                ActorSelection actor, TransactionIdentifier identifier, ActorContext actorContext,
+                SchemaContext schemaContext) {
+            super(shardName, identifier);
             this.actorPath = actorPath;
             this.actor = actor;
+            this.actorContext = actorContext;
+            this.schemaContext = schemaContext;
         }
 
         private ActorSelection getActor() {
@@ -600,14 +719,15 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         }
     }
 
-    private class NoOpTransactionContext extends AbstractTransactionContext {
+    private static class NoOpTransactionContext extends AbstractTransactionContext {
 
         private final Logger LOG = LoggerFactory.getLogger(NoOpTransactionContext.class);
 
         private final Exception failure;
 
-        public NoOpTransactionContext(String shardName, Exception failure){
-            super(shardName);
+        public NoOpTransactionContext(String shardName, Exception failure,
+                TransactionIdentifier identifier){
+            super(shardName, identifier);
             this.failure = failure;
         }
 
index e12a9663d1fca8a969c25166f38986ea19eab51a..f76430f5a1cb1cdb9e84f3b92f85c923a7111967 100644 (file)
@@ -23,6 +23,8 @@ import org.opendaylight.controller.cluster.datastore.messages.FindLocalShard;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.LocalShardFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,7 @@ public class ActorContext {
     private final ActorRef shardManager;
     private final ClusterWrapper clusterWrapper;
     private final Configuration configuration;
+    private volatile SchemaContext schemaContext;
 
     public ActorContext(ActorSystem actorSystem, ActorRef shardManager,
         ClusterWrapper clusterWrapper,
@@ -80,6 +83,17 @@ public class ActorContext {
         return actorSystem.actorSelection(actorPath);
     }
 
+    public void setSchemaContext(SchemaContext schemaContext) {
+        this.schemaContext = schemaContext;
+
+        if(shardManager != null) {
+            shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+        }
+    }
+
+    public SchemaContext getSchemaContext() {
+        return schemaContext;
+    }
 
     /**
      * Finds the primary for a given shard
index ce31c3ad5738edc81db36f8666c67dc309cbdfb7..f5a0d3783ab011728ab0688db60b404a9c53719e 100644 (file)
@@ -1,7 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
 
 public class DistributedConfigDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedConfigDataStoreProviderModule {
@@ -26,10 +26,16 @@ public class DistributedConfigDataStoreProviderModule extends
 
     @Override
     public java.lang.AutoCloseable createInstance() {
+
+        ConfigProperties props = getConfigProperties();
+        if(props == null) {
+            props = new ConfigProperties();
+        }
+
         return DistributedDataStoreFactory.createInstance("config", getConfigSchemaServiceDependency(),
-                InMemoryDOMDataStoreConfigProperties.create(getConfigMaxShardDataChangeExecutorPoolSize(),
-                        getConfigMaxShardDataChangeExecutorQueueSize(),
-                        getConfigMaxShardDataChangeListenerQueueSize()));
+                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
+                        props.getMaxShardDataChangeExecutorQueueSize(),
+                        props.getMaxShardDataChangeListenerQueueSize(),
+                        props.getShardTransactionIdleTimeoutInMinutes()));
     }
-
 }
index 4d5b07420f7cda5d0a0c1d74740026cde5e74eab..443334d11f65378871da7a6f2ae97097d4c2eb87 100644 (file)
@@ -1,7 +1,7 @@
 package org.opendaylight.controller.config.yang.config.distributed_datastore_provider;
 
 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreFactory;
-import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
+import org.opendaylight.controller.cluster.datastore.DistributedDataStoreProperties;
 
 public class DistributedOperationalDataStoreProviderModule extends
     org.opendaylight.controller.config.yang.config.distributed_datastore_provider.AbstractDistributedOperationalDataStoreProviderModule {
@@ -26,11 +26,18 @@ public class DistributedOperationalDataStoreProviderModule extends
 
     @Override
     public java.lang.AutoCloseable createInstance() {
+
+        OperationalProperties props = getOperationalProperties();
+        if(props == null) {
+            props = new OperationalProperties();
+        }
+
         return DistributedDataStoreFactory.createInstance("operational",
                 getOperationalSchemaServiceDependency(),
-                InMemoryDOMDataStoreConfigProperties.create(getOperationalMaxShardDataChangeExecutorPoolSize(),
-                        getOperationalMaxShardDataChangeExecutorQueueSize(),
-                        getOperationalMaxShardDataChangeListenerQueueSize()));
+                new DistributedDataStoreProperties(props.getMaxShardDataChangeExecutorPoolSize(),
+                        props.getMaxShardDataChangeExecutorQueueSize(),
+                        props.getMaxShardDataChangeListenerQueueSize(),
+                        props.getShardTransactionIdleTimeoutInMinutes()));
     }
 
 }
index 6bca5ce25cefe5dbcea73279fa5395f359bb8916..a9a8a1ad986c65835018f80773707561473934b8 100644 (file)
@@ -36,36 +36,48 @@ module distributed-datastore-provider {
                 config:java-name-prefix DistributedOperationalDataStoreProvider;
      }
 
+    grouping data-store-properties {
+        leaf max-shard-data-change-executor-queue-size {
+            default 1000;
+            type uint16;
+            description "The maximum queue size for each shard's data store data change notification executor.";
+         }
+
+         leaf max-shard-data-change-executor-pool-size {
+            default 20;
+            type uint16;
+            description "The maximum thread pool size for each shard's data store data change notification executor.";
+         }
+
+         leaf max-shard-data-change-listener-queue-size {
+            default 1000;
+            type uint16;
+            description "The maximum queue size for each shard's data store data change listeners.";
+         }
+         
+         leaf shard-transaction-idle-timeout-in-minutes {
+            default 10;
+            type uint16;
+            description "The maximum amount of time a shard transaction can be idle without receiving any messages before it self-destructs.";
+         }
+    }
+    
     // Augments the 'configuration' choice node under modules/module.
     augment "/config:modules/config:module/config:configuration" {
         case distributed-config-datastore-provider {
             when "/config:modules/config:module/config:type = 'distributed-config-datastore-provider'";
-            container config-schema-service {
-                uses config:service-ref {
-                    refine type {
-                        mandatory false;
-                        config:required-identity sal:schema-service;
+                container config-schema-service {
+                    uses config:service-ref {
+                        refine type {
+                            mandatory false;
+                            config:required-identity sal:schema-service;
+                        }
                     }
                 }
-            }
 
-            leaf config-max-shard-data-change-executor-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change notification executor.";
-            }
-
-            leaf config-max-shard-data-change-executor-pool-size {
-                default 20;
-                type uint16;
-                description "The maximum thread pool size for each shard's data store data change notification executor.";
-            }
-
-            leaf config-max-shard-data-change-listener-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change listeners.";
-            }
+                container config-properties {
+                    uses data-store-properties;
+                }
         }
     }
 
@@ -82,23 +94,9 @@ module distributed-datastore-provider {
                     }
                 }
 
-            leaf operational-max-shard-data-change-executor-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change notification executor.";
-            }
-
-            leaf operational-max-shard-data-change-executor-pool-size {
-                default 20;
-                type uint16;
-                description "The maximum thread pool size for each shard's data store data change notification executor.";
-            }
-
-            leaf operational-max-shard-data-change-listener-queue-size {
-                default 1000;
-                type uint16;
-                description "The maximum queue size for each shard's data store data change listeners.";
-            }
-            }
+                container operational-properties {
+                    uses data-store-properties;
+                }
         }
+    }
 }
index 036b00a4c94bd46e77c480c32a16b7ad0b71b3f6..e70453f2d629857dd92ef24b919395af91c9b742 100644 (file)
@@ -14,6 +14,7 @@ import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
+
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CommitTransaction;
@@ -30,6 +31,8 @@ import org.opendaylight.controller.cluster.datastore.messages.WriteData;
 import org.opendaylight.controller.cluster.datastore.messages.WriteDataReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -58,22 +61,23 @@ public class BasicIntegrationTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final SchemaContext schemaContext = TestModel.createTestContext();
+            ShardContext shardContext = new ShardContext();
+
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef shard = getSystem().actorOf(props);
 
-            new Within(duration("5 seconds")) {
+            new Within(duration("10 seconds")) {
+                @Override
                 protected void run() {
-
-
-                    shard.tell(
-                        new UpdateSchemaContext(TestModel.createTestContext()),
-                        getRef());
+                    shard.tell(new UpdateSchemaContext(schemaContext), getRef());
 
 
                     // Wait for a specific log message to show up
                     final boolean result =
                         new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                         ) {
+                            @Override
                             protected Boolean run() {
                                 return true;
                             }
@@ -87,7 +91,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     shard.tell(new CreateTransactionChain().toSerializable(), getRef());
 
                     final ActorSelection transactionChain =
-                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionChainReply") {
+                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionChainReply") {
+                            @Override
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)) {
                                     ActorPath transactionChainPath =
@@ -109,7 +114,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transactionChain.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.WRITE_ONLY.ordinal() ).toSerializable(), getRef());
 
                     final ActorSelection transaction =
-                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "CreateTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "CreateTransactionReply") {
+                            @Override
                             protected ActorSelection match(Object in) {
                                 if (CreateTransactionReply.SERIALIZABLE_CLASS.equals(in.getClass())) {
                                     CreateTransactionReply reply = CreateTransactionReply.fromSerializable(in);
@@ -128,10 +134,11 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     // 3. Write some data
                     transaction.tell(new WriteData(TestModel.TEST_PATH,
-                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), TestModel.createTestContext()).toSerializable(),
+                        ImmutableNodes.containerNode(TestModel.TEST_QNAME), schemaContext).toSerializable(),
                         getRef());
 
-                    Boolean writeDone = new ExpectMsg<Boolean>(duration("1 seconds"), "WriteDataReply") {
+                    Boolean writeDone = new ExpectMsg<Boolean>(duration("3 seconds"), "WriteDataReply") {
+                        @Override
                         protected Boolean match(Object in) {
                             if (in.getClass().equals(WriteDataReply.SERIALIZABLE_CLASS)) {
                                 return true;
@@ -150,7 +157,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     transaction.tell(new ReadyTransaction().toSerializable(), getRef());
 
                     final ActorSelection cohort =
-                        new ExpectMsg<ActorSelection>(duration("1 seconds"), "ReadyTransactionReply") {
+                        new ExpectMsg<ActorSelection>(duration("3 seconds"), "ReadyTransactionReply") {
+                            @Override
                             protected ActorSelection match(Object in) {
                                 if (in.getClass().equals(ReadyTransactionReply.SERIALIZABLE_CLASS)) {
                                     ActorPath cohortPath =
@@ -173,7 +181,8 @@ public class BasicIntegrationTest extends AbstractActorTest {
                     cohort.tell(new PreCommitTransaction().toSerializable(), getRef());
 
                     Boolean preCommitDone =
-                        new ExpectMsg<Boolean>(duration("1 seconds"), "PreCommitTransactionReply") {
+                        new ExpectMsg<Boolean>(duration("3 seconds"), "PreCommitTransactionReply") {
+                            @Override
                             protected Boolean match(Object in) {
                                 if (in.getClass().equals(PreCommitTransactionReply.SERIALIZABLE_CLASS)) {
                                     return true;
index b2ee4a49fee1b40ee86730da0044f5f987b090a9..e653c3d3717351182a6a57cf570c4a6bf6449500 100644 (file)
@@ -3,7 +3,7 @@ package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
 import akka.actor.Props;
-import junit.framework.Assert;
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.DataChanged;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -47,7 +47,7 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
 
     @Override
     public Set<YangInstanceIdentifier> getRemovedPaths() {
-      Set<YangInstanceIdentifier>ids = new HashSet();
+      Set<YangInstanceIdentifier>ids = new HashSet<>();
       ids.add( CompositeModel.TEST_PATH);
       return ids;
     }
@@ -73,9 +73,8 @@ public class DataChangeListenerProxyTest extends AbstractActorTest {
         final Props props = Props.create(MessageCollectorActor.class);
         final ActorRef actorRef = getSystem().actorOf(props);
 
-        DataChangeListenerProxy dataChangeListenerProxy =
-            new DataChangeListenerProxy(TestModel.createTestContext(),
-                getSystem().actorSelection(actorRef.path()));
+        DataChangeListenerProxy dataChangeListenerProxy = new DataChangeListenerProxy(
+                TestModel.createTestContext(), getSystem().actorSelection(actorRef.path()));
 
         dataChangeListenerProxy.onDataChanged(new MockDataChangedEvent());
 
index 26ec583b3e058108f9cc18517f5f6909847f5cd6..b39cc84ceff6943d045c7a609afcbb996e212b31 100644 (file)
@@ -24,9 +24,9 @@ import static org.junit.Assert.assertTrue;
 public class DataChangeListenerTest extends AbstractActorTest {
 
     private static class MockDataChangedEvent implements AsyncDataChangeEvent<YangInstanceIdentifier, NormalizedNode<?, ?>> {
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap();
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap();
-       Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> createdData = new HashMap<>();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> updatedData = new HashMap<>();
+       Map<YangInstanceIdentifier,NormalizedNode<?,?>> originalData = new HashMap<>();
 
 
 
@@ -90,11 +90,12 @@ public class DataChangeListenerTest extends AbstractActorTest {
     public void testDataChangedWhenNotificationsAreEnabled(){
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
-            final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+            final Props props = DataChangeListener.props(listener);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsEnabled");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     // Let the DataChangeListener know that notifications should
@@ -107,6 +108,7 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
                     final Boolean out = new ExpectMsg<Boolean>(duration("800 millis"), "dataChanged") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if (in != null && in.getClass().equals(DataChangedReply.class)) {
 
@@ -123,8 +125,6 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
@@ -133,11 +133,12 @@ public class DataChangeListenerTest extends AbstractActorTest {
     public void testDataChangedWhenNotificationsAreDisabled(){
         new JavaTestKit(getSystem()) {{
             final MockDataChangeListener listener = new MockDataChangeListener();
-            final Props props = DataChangeListener.props(CompositeModel.createTestContext(),listener,CompositeModel.FAMILY_PATH );
+            final Props props = DataChangeListener.props(listener);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDataChangedNotificationsDisabled");
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -146,8 +147,6 @@ public class DataChangeListenerTest extends AbstractActorTest {
 
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
index 49408b741019c98debbe70b37857d871d0f80b8b..21aa00e9e0b98c60b2d117fa67f36864e0c7700a 100644 (file)
@@ -3,9 +3,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorSystem;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
+
 import junit.framework.Assert;
+
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -69,10 +72,13 @@ public class DistributedDataStoreIntegrationTest {
             {
 
                 new Within(duration("10 seconds")) {
+                    @Override
                     protected void run() {
                         try {
                             final DistributedDataStore distributedDataStore =
-                                new DistributedDataStore(getSystem(), "config", new MockClusterWrapper(), configuration, null);
+                                new DistributedDataStore(getSystem(), "config",
+                                        new MockClusterWrapper(), configuration,
+                                        new DistributedDataStoreProperties());
 
                             distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
 
@@ -80,6 +86,7 @@ public class DistributedDataStoreIntegrationTest {
                             final boolean result =
                                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                                     ) {
+                                    @Override
                                     protected Boolean run() {
                                         return true;
                                     }
@@ -150,6 +157,7 @@ public class DistributedDataStoreIntegrationTest {
             {
 
                 new Within(duration("10 seconds")) {
+                    @Override
                     protected void run() {
                         try {
                             final DistributedDataStore distributedDataStore =
@@ -164,6 +172,7 @@ public class DistributedDataStoreIntegrationTest {
                                 new JavaTestKit.EventFilter<Boolean>(
                                     Logging.Info.class
                                 ) {
+                                    @Override
                                     protected Boolean run() {
                                         return true;
                                     }
index 69590e62fb1b5886fc0b5fb1223706d02c1706cd..cb473cb9360e03656ea83d0212ef18b70237a92e 100644 (file)
@@ -3,6 +3,7 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -63,12 +64,14 @@ public class DistributedDataStoreTest extends AbstractActorTest{
 
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testConstructor(){
         ActorSystem actorSystem = mock(ActorSystem.class);
 
         new DistributedDataStore(actorSystem, "config",
-            mock(ClusterWrapper.class), mock(Configuration.class), null);
+            mock(ClusterWrapper.class), mock(Configuration.class),
+            new DistributedDataStoreProperties());
 
         verify(actorSystem).actorOf(any(Props.class), eq("shardmanager-config"));
     }
index 499b4e1f3111d097cc0696aa7cd23367fc4c591d..e6f68c032a325f59bea291e2f17e129fd47da766 100644 (file)
@@ -42,11 +42,12 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindPrimary("inventory").toSerializable(), getRef());
@@ -66,11 +67,12 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindPrimary(Shard.DEFAULT_NAME).toSerializable(), getRef());
@@ -89,16 +91,18 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindLocalShard("inventory"), getRef());
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "find local") {
+                        @Override
                         protected String match(Object in) {
                             if (in instanceof LocalShardNotFound) {
                                 return ((LocalShardNotFound) in).getShardName();
@@ -124,16 +128,18 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", mockClusterWrapper,
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new FindLocalShard(Shard.DEFAULT_NAME), getRef());
 
                     final ActorRef out = new ExpectMsg<ActorRef>(duration("1 seconds"), "find local") {
+                        @Override
                         protected ActorRef match(Object in) {
                             if (in instanceof LocalShardFound) {
                                 return ((LocalShardFound) in).getPath();
@@ -158,12 +164,13 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             // the run() method needs to finish within 3 seconds
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
@@ -172,6 +179,7 @@ public class ShardManagerTest {
 
                     final String out = new ExpectMsg<String>(duration("1 seconds"), "primary found") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(PrimaryFound.SERIALIZABLE_CLASS)) {
                                 PrimaryFound f = PrimaryFound.fromSerializable(in);
@@ -196,12 +204,13 @@ public class ShardManagerTest {
         new JavaTestKit(system) {{
             final Props props = ShardManager
                 .props("config", new MockClusterWrapper(),
-                    new MockConfiguration(), null);
+                    new MockConfiguration(), new ShardContext());
             final TestActorRef<ShardManager> subject =
                 TestActorRef.create(system, props);
 
             // the run() method needs to finish within 3 seconds
             new Within(duration("1 seconds")) {
+                @Override
                 protected void run() {
 
                     MockClusterWrapper.sendMemberUp(subject, "member-2", getRef().path().toString());
index 7740b8e667a974c10b53d0e436edb41455e40cf5..fc45efcdea854bea791c44b12a4437c2c74bc2f1 100644 (file)
@@ -4,7 +4,8 @@ import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.testkit.JavaTestKit;
-import junit.framework.Assert;
+
+import org.junit.Assert;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
@@ -28,11 +29,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import static junit.framework.Assert.assertFalse;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ShardTest extends AbstractActorTest {
+
+    private static final ShardContext shardContext = new ShardContext();
+
     @Test
     public void testOnReceiveCreateTransactionChain() throws Exception {
         new JavaTestKit(getSystem()) {{
@@ -40,7 +44,7 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransactionChain");
 
@@ -49,6 +53,7 @@ public class ShardTest extends AbstractActorTest {
             final boolean result =
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -58,13 +63,15 @@ public class ShardTest extends AbstractActorTest {
 
             Assert.assertEquals(true, result);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(new CreateTransactionChain().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(CreateTransactionChainReply.SERIALIZABLE_CLASS)){
                                 CreateTransactionChainReply reply =
@@ -96,11 +103,12 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testRegisterChangeListener");
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -111,8 +119,10 @@ public class ShardTest extends AbstractActorTest {
                         getRef().path(), AsyncDataBroker.DataChangeScope.BASE),
                         getRef());
 
-                    final Boolean notificationEnabled = new ExpectMsg<Boolean>("enable notification") {
+                    final Boolean notificationEnabled = new ExpectMsg<Boolean>(
+                                                   duration("3 seconds"), "enable notification") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected Boolean match(Object in) {
                             if(in instanceof EnableNotification){
                                 return ((EnableNotification) in).isEnabled();
@@ -124,8 +134,9 @@ public class ShardTest extends AbstractActorTest {
 
                     assertFalse(notificationEnabled);
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in.getClass().equals(RegisterChangeListenerReply.class)) {
                                 RegisterChangeListenerReply reply =
@@ -154,15 +165,15 @@ public class ShardTest extends AbstractActorTest {
                 ShardIdentifier.builder().memberName("member-1")
                     .shardName("inventory").type("config").build();
 
-            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, null);
+            final Props props = Shard.props(identifier, Collections.EMPTY_MAP, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCreateTransaction");
 
-
             // Wait for a specific log message to show up
             final boolean result =
                 new JavaTestKit.EventFilter<Boolean>(Logging.Info.class
                 ) {
+                    @Override
                     protected Boolean run() {
                         return true;
                     }
@@ -172,7 +183,8 @@ public class ShardTest extends AbstractActorTest {
 
             Assert.assertEquals(true, result);
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -182,8 +194,9 @@ public class ShardTest extends AbstractActorTest {
                     subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(),
                         getRef());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
+                        @Override
                         protected String match(Object in) {
                             if (in instanceof CreateTransactionReply) {
                                 CreateTransactionReply reply =
@@ -200,8 +213,6 @@ public class ShardTest extends AbstractActorTest {
                         out.contains("akka://test/user/testCreateTransaction/shard-txn-1"));
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
@@ -216,11 +227,12 @@ public class ShardTest extends AbstractActorTest {
                     .shardName("inventory").type("config").build();
 
             peerAddresses.put(identifier, null);
-            final Props props = Shard.props(identifier, peerAddresses, null);
+            final Props props = Shard.props(identifier, peerAddresses, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testPeerAddressResolved");
 
-            new Within(duration("1 seconds")) {
+            new Within(duration("3 seconds")) {
+                @Override
                 protected void run() {
 
                     subject.tell(
@@ -229,8 +241,6 @@ public class ShardTest extends AbstractActorTest {
 
                     expectNoMsg();
                 }
-
-
             };
         }};
     }
index d468af6664981d08ad603b1a841fefbdaccc8d47..0262b8c1457a171c262f5da83dc755d9bcbb174d 100644 (file)
@@ -3,8 +3,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChain;
 import org.opendaylight.controller.cluster.datastore.messages.CloseTransactionChainReply;
@@ -12,87 +15,96 @@ import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction;
 import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
 import static org.junit.Assert.assertEquals;
 
 public class ShardTransactionChainTest extends AbstractActorTest {
 
-  private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
-
-  private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
-          MoreExecutors.sameThreadExecutor());
-
-  static {
-    store.onGlobalContextUpdated(TestModel.createTestContext());
-  }
-  @Test
-  public void testOnReceiveCreateTransaction() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransactionChain.props(store.createTransactionChain(), TestModel.createTestContext());
-      final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
-
-     new Within(duration("1 seconds")) {
-        @Override
-        protected void run() {
-
-          subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
-
-          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-            // do not put code outside this method, will run afterwards
-            @Override
-            protected String match(Object in) {
-              if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
-                return CreateTransactionReply.fromSerializable(in).getTransactionPath();
-              }else{
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertEquals("Unexpected transaction path " + out,
-              "akka://test/user/testCreateTransaction/shard-txn-1",
-              out);
-
-          // Will wait for the rest of the 3 seconds
-          expectNoMsg();
-        }
-
-
-      };
-    }};
-  }
-
-  @Test
-  public void testOnReceiveCloseTransactionChain() throws Exception {
-    new JavaTestKit(getSystem()) {{
-      final Props props = ShardTransactionChain.props(store.createTransactionChain(), TestModel.createTestContext());
-      final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
-
-      new Within(duration("1 seconds")) {
-        @Override
-        protected void run() {
-
-          subject.tell(new CloseTransactionChain().toSerializable(), getRef());
-
-          final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
-            // do not put code outside this method, will run afterwards
-            @Override
-            protected String match(Object in) {
-              if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
-                return "match";
-              } else {
-                throw noMatch();
-              }
-            }
-          }.get(); // this extracts the received message
-
-          assertEquals("match", out);
-          // Will wait for the rest of the 3 seconds
-          expectNoMsg();
-        }
-
-
-      };
-    }};
-  }
+    private static ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(MoreExecutors.sameThreadExecutor());
+
+    private static final InMemoryDOMDataStore store = new InMemoryDOMDataStore("OPER", storeExecutor,
+            MoreExecutors.sameThreadExecutor());
+
+    private static final SchemaContext testSchemaContext = TestModel.createTestContext();
+
+    private static final ShardContext shardContext = new ShardContext();
+
+    @BeforeClass
+    public static void staticSetup() {
+        store.onGlobalContextUpdated(testSchemaContext);
+    }
+
+    @Test
+    public void testOnReceiveCreateTransaction() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final Props props = ShardTransactionChain.props(store.createTransactionChain(),
+                    testSchemaContext, shardContext);
+            final ActorRef subject = getSystem().actorOf(props, "testCreateTransaction");
+
+            new Within(duration("1 seconds")) {
+                @Override
+                protected void run() {
+
+                    subject.tell(new CreateTransaction("txn-1", TransactionProxy.TransactionType.READ_ONLY.ordinal() ).toSerializable(), getRef());
+
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                        // do not put code outside this method, will run afterwards
+                        @Override
+                        protected String match(Object in) {
+                            if (in.getClass().equals(CreateTransactionReply.SERIALIZABLE_CLASS)) {
+                                return CreateTransactionReply.fromSerializable(in).getTransactionPath();
+                            }else{
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("Unexpected transaction path " + out,
+                            "akka://test/user/testCreateTransaction/shard-txn-1",
+                            out);
+
+                    // Will wait for the rest of the 3 seconds
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
+
+    @Test
+    public void testOnReceiveCloseTransactionChain() throws Exception {
+        new JavaTestKit(getSystem()) {{
+            final Props props = ShardTransactionChain.props(store.createTransactionChain(),
+                    testSchemaContext, shardContext);
+            final ActorRef subject = getSystem().actorOf(props, "testCloseTransactionChain");
+
+            new Within(duration("1 seconds")) {
+                @Override
+                protected void run() {
+
+                    subject.tell(new CloseTransactionChain().toSerializable(), getRef());
+
+                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                        // do not put code outside this method, will run afterwards
+                        @Override
+                        protected String match(Object in) {
+                            if (in.getClass().equals(CloseTransactionChainReply.SERIALIZABLE_CLASS)) {
+                                return "match";
+                            } else {
+                                throw noMatch();
+                            }
+                        }
+                    }.get(); // this extracts the received message
+
+                    assertEquals("match", out);
+                    // Will wait for the rest of the 3 seconds
+                    expectNoMsg();
+                }
+
+
+            };
+        }};
+    }
 }
index 16b73040a5b6e5375a24570fb5b1617240eadb04..127f9f55f89f0bedf78e2f748cd116de3819066a 100644 (file)
@@ -13,8 +13,11 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
@@ -23,13 +26,13 @@ import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 import org.opendaylight.controller.protobuff.messages.common.NormalizedNodeMessages;
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
 
 import java.util.Collections;
-
-import static org.junit.Assert.assertTrue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Covers negative test cases
@@ -51,20 +54,21 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("operational").build();
 
-    static {
+    private final ShardContext shardContext = new ShardContext();
+
+    @BeforeClass
+    public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
-
     @Test(expected = ReadFailedException.class)
     public void testNegativeReadWithReadOnlyTransactionClosed()
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-        final Props props =
-            ShardTransaction.props(store.newReadOnlyTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -78,32 +82,27 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                 ).build();
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        ((ShardReadTransaction) subject.underlyingActor())
-            .forUnitTestOnlyExplicitTransactionClose();
+        subject.underlyingActor().getDOMStoreTransaction().close();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
 
     @Test(expected = ReadFailedException.class)
-    public void testNegativeReadWithReadWriteOnlyTransactionClosed()
+    public void testNegativeReadWithReadWriteTransactionClosed()
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
-                "testNegativeReadWithReadWriteOnlyTransactionClosed");
+                "testNegativeReadWithReadWriteTransactionClosed");
 
         ShardTransactionMessages.ReadData readData =
             ShardTransactionMessages.ReadData.newBuilder()
@@ -111,33 +110,29 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                     NormalizedNodeMessages.InstanceIdentifier.newBuilder()
                         .build()
                 ).build();
+
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        ((ShardReadWriteTransaction) subject.underlyingActor())
-            .forUnitTestOnlyExplicitTransactionClose();
+        subject.underlyingActor().getDOMStoreTransaction().close();
 
         future = akka.pattern.Patterns.ask(subject, readData, 3000);
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
     @Test(expected = ReadFailedException.class)
-    public void testNegativeExistsWithReadWriteOnlyTransactionClosed()
+    public void testNegativeExistsWithReadWriteTransactionClosed()
         throws Throwable {
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
-                "testNegativeExistsWithReadWriteOnlyTransactionClosed");
+                "testNegativeExistsWithReadWriteTransactionClosed");
 
         ShardTransactionMessages.DataExists dataExists =
             ShardTransactionMessages.DataExists.newBuilder()
@@ -148,16 +143,12 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, dataExists, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
-        ((ShardReadWriteTransaction) subject.underlyingActor())
-            .forUnitTestOnlyExplicitTransactionClose();
+        subject.underlyingActor().getDOMStoreTransaction().close();
 
         future = akka.pattern.Patterns.ask(subject, dataExists, 3000);
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -165,10 +156,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -179,8 +169,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
@@ -192,22 +181,17 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             ).build();
 
         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
-
     @Test(expected = IllegalStateException.class)
     public void testNegativeReadWriteWithTransactionReady() throws Exception {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -218,8 +202,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
@@ -231,10 +214,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             ).build();
 
         future = akka.pattern.Patterns.ask(subject, writeData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -242,10 +222,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props, "testNegativeMergeTransactionReady");
@@ -255,8 +234,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.MergeData mergeData =
             ShardTransactionMessages.MergeData.newBuilder()
@@ -268,10 +246,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
             ).build();
 
         future = akka.pattern.Patterns.ask(subject, mergeData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 
 
@@ -280,10 +255,9 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
 
         final ActorRef shard =
-            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
-        final Props props =
-            ShardTransaction.props(store.newReadWriteTransaction(), shard,
-                TestModel.createTestContext());
+            getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                testSchemaContext, shardContext);
 
         final TestActorRef<ShardTransaction> subject = TestActorRef
             .create(getSystem(), props,
@@ -294,8 +268,7 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
 
         Future<Object> future =
             akka.pattern.Patterns.ask(subject, readyTransaction, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
 
         ShardTransactionMessages.DeleteData deleteData =
             ShardTransactionMessages.DeleteData.newBuilder()
@@ -304,9 +277,6 @@ public class ShardTransactionFailureTest extends AbstractActorTest {
                         .build()).build();
 
         future = akka.pattern.Patterns.ask(subject, deleteData, 3000);
-        assertTrue(future.isCompleted());
-        Await.result(future, Duration.Zero());
-
-
+        Await.result(future, Duration.create(3, TimeUnit.SECONDS));
     }
 }
index 8f5d0c28d603c39a008f9638ece377aacdf6f1f8..1dd824568a81820d21f50fd820d47eeef68c73e9 100644 (file)
@@ -5,9 +5,11 @@ import akka.actor.Props;
 import akka.actor.Terminated;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
+
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.junit.Assert;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.opendaylight.controller.cluster.datastore.exceptions.UnknownMessageException;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -32,11 +34,15 @@ import org.opendaylight.controller.cluster.datastore.modification.Modification;
 import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
 import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
+import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreConfigProperties;
 import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 
+import scala.concurrent.duration.Duration;
+
 import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -54,17 +60,20 @@ public class ShardTransactionTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("config").build();
 
+    private ShardContext shardContext = new ShardContext();
 
-    static {
+    @BeforeClass
+    public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
     @Test
     public void testOnReceiveReadData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testReadData");
 
             new Within(duration("1 seconds")) {
@@ -104,9 +113,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadDataWhenDataNotFound() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props( store.newReadOnlyTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props( store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testReadDataWhenDataNotFound");
 
             new Within(duration("1 seconds")) {
@@ -147,9 +157,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDataExistsPositive() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsPositive");
 
             new Within(duration("1 seconds")) {
@@ -190,9 +201,9 @@ public class ShardTransactionTest extends AbstractActorTest {
     public void testOnReceiveDataExistsNegative() throws Exception {
         new JavaTestKit(getSystem()) {{
             final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
-                    Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadOnlyTransaction(), shard, testSchemaContext);
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject = getSystem().actorOf(props, "testDataExistsNegative");
 
             new Within(duration("1 seconds")) {
@@ -267,9 +278,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveWriteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testWriteData");
 
@@ -307,9 +319,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveMergeData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadWriteTransaction(), shard, testSchemaContext);
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testMergeData");
 
@@ -348,9 +361,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveDeleteData() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props( store.newWriteOnlyTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props( store.newWriteOnlyTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testDeleteData");
 
@@ -387,9 +401,10 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveReadyTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props( store.newReadWriteTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props( store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testReadyTransaction");
 
@@ -425,24 +440,26 @@ public class ShardTransactionTest extends AbstractActorTest {
     @Test
     public void testOnReceiveCloseTransaction() throws Exception {
         new JavaTestKit(getSystem()) {{
-            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-            final Props props =
-                ShardTransaction.props(store.newReadWriteTransaction(), shard, TestModel.createTestContext());
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
             final ActorRef subject =
                 getSystem().actorOf(props, "testCloseTransaction");
 
             watch(subject);
 
-            new Within(duration("2 seconds")) {
+            new Within(duration("6 seconds")) {
                 @Override
                 protected void run() {
 
                     subject.tell(new CloseTransaction().toSerializable(), getRef());
 
-                    final String out = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String out = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         @Override
                         protected String match(Object in) {
+                            System.out.println("!!!IN match 1: "+(in!=null?in.getClass():"NULL"));
                             if (in.getClass().equals(CloseTransactionReply.SERIALIZABLE_CLASS)) {
                                 return "match";
                             } else {
@@ -453,10 +470,11 @@ public class ShardTransactionTest extends AbstractActorTest {
 
                     assertEquals("match", out);
 
-                    final String termination = new ExpectMsg<String>(duration("1 seconds"), "match hint") {
+                    final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
                         // do not put code outside this method, will run afterwards
                         @Override
                         protected String match(Object in) {
+                            System.out.println("!!!IN match 2: "+(in!=null?in.getClass():"NULL"));
                             if (in instanceof Terminated) {
                                 return "match";
                             } else {
@@ -465,33 +483,54 @@ public class ShardTransactionTest extends AbstractActorTest {
                         }
                     }.get(); // this extracts the received message
 
-
-                    expectNoMsg();
+                    assertEquals("match", termination);
                 }
-
-
             };
         }};
+    }
 
+    @Test(expected=UnknownMessageException.class)
+    public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, new ShardContext()));
+        final Props props = ShardTransaction.props(store.newReadOnlyTransaction(), shard,
+                testSchemaContext, shardContext);
+        final TestActorRef subject = TestActorRef.apply(props,getSystem());
+
+        subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
     }
 
+    @Test
+    public void testShardTransactionInactivity() {
+
+        shardContext = new ShardContext(InMemoryDOMDataStoreConfigProperties.getDefault(),
+                Duration.create(500, TimeUnit.MILLISECONDS));
 
-  @Test
-  public void testNegativePerformingWriteOperationOnReadTransaction() throws Exception {
-    try {
+        new JavaTestKit(getSystem()) {{
+            final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                    Collections.EMPTY_MAP, new ShardContext()));
+            final Props props = ShardTransaction.props(store.newReadWriteTransaction(), shard,
+                    testSchemaContext, shardContext);
+            final ActorRef subject =
+                getSystem().actorOf(props, "testShardTransactionInactivity");
 
-        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, null));
-        final Props props =
-            ShardTransaction.props(store.newReadOnlyTransaction(), shard, TestModel.createTestContext());
-         final TestActorRef subject = TestActorRef.apply(props,getSystem());
+            watch(subject);
 
-        subject.receive(new DeleteData(TestModel.TEST_PATH).toSerializable(), ActorRef.noSender());
-        Assert.assertFalse(true);
+            // The shard Tx actor should receive a ReceiveTimeout message and self-destruct.
 
+            final String termination = new ExpectMsg<String>(duration("3 seconds"), "match hint") {
+                // do not put code outside this method, will run afterwards
+                @Override
+                protected String match(Object in) {
+                    if (in instanceof Terminated) {
+                        return "match";
+                    } else {
+                        throw noMatch();
+                    }
+                }
+            }.get(); // this extracts the received message
 
-    } catch (Exception cs) {
-      assertEquals(UnknownMessageException.class.getSimpleName(), cs.getClass().getSimpleName());
-      assertTrue(cs.getMessage(), cs.getMessage().startsWith("Unknown message received "));
+            assertEquals("match", termination);
+        }};
     }
-  }
 }
index 34697977a5a4639e4b507dde478302834eae5ad5..672166c4424c7ea0a8fd5c7ddd2e07b057e572b2 100644 (file)
@@ -13,9 +13,12 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.ActorRef;
 import akka.actor.Props;
 import akka.testkit.TestActorRef;
+
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
@@ -31,6 +34,7 @@ import org.opendaylight.controller.protobuff.messages.persistent.PersistentMessa
 import org.opendaylight.controller.protobuff.messages.transaction.ShardTransactionMessages;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort;
 import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Duration;
@@ -59,19 +63,21 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         ShardIdentifier.builder().memberName("member-1")
             .shardName("inventory").type("config").build();
 
-    static {
+    private final ShardContext shardContext = new ShardContext();
+
+    @BeforeClass
+    public static void staticSetup() {
         store.onGlobalContextUpdated(testSchemaContext);
     }
 
-    private FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
+    private final FiniteDuration ASK_RESULT_DURATION = Duration.create(5000, TimeUnit.MILLISECONDS);
 
 
     @Test(expected = TestException.class)
     public void testNegativeAbortResultsInException() throws Exception {
 
-        final ActorRef shard =
-            getSystem()
-                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, shardContext));
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
@@ -93,18 +99,14 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
         assertTrue(future.isCompleted());
 
         Await.result(future, ASK_RESULT_DURATION);
-
-
-
     }
 
 
     @Test(expected = OptimisticLockFailedException.class)
     public void testNegativeCanCommitResultsInException() throws Exception {
 
-        final ActorRef shard =
-            getSystem()
-                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, shardContext));
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
@@ -135,9 +137,8 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testNegativePreCommitResultsInException() throws Exception {
 
-        final ActorRef shard =
-            getSystem()
-                .actorOf(Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null));
+        final ActorRef shard = getSystem().actorOf(Shard.props(SHARD_IDENTIFIER,
+                Collections.EMPTY_MAP, shardContext));
         final DOMStoreThreePhaseCommitCohort mockCohort = Mockito
             .mock(DOMStoreThreePhaseCommitCohort.class);
         final CompositeModification mockComposite =
@@ -166,15 +167,13 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
     @Test(expected = TestException.class)
     public void testNegativeCommitResultsInException() throws Exception {
 
-        final TestActorRef<Shard> subject = TestActorRef
-            .create(getSystem(),
-                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP,null),
+        final TestActorRef<Shard> subject = TestActorRef.create(getSystem(),
+                Shard.props(SHARD_IDENTIFIER, Collections.EMPTY_MAP, shardContext),
                 "testNegativeCommitResultsInException");
 
         final ActorRef shardTransaction =
-            getSystem().actorOf(
-                ShardTransaction.props(store.newReadWriteTransaction(), subject,
-                    TestModel.createTestContext()));
+            getSystem().actorOf(ShardTransaction.props(store.newReadWriteTransaction(), subject,
+                    testSchemaContext, shardContext));
 
         ShardTransactionMessages.WriteData writeData =
             ShardTransactionMessages.WriteData.newBuilder()
@@ -221,12 +220,8 @@ public class ThreePhaseCommitCohortFailureTest extends AbstractActorTest {
                 mockForwardCommitTransaction
                 , 3000);
         Await.result(future, ASK_RESULT_DURATION);
-
-
     }
 
     private class TestException extends Exception {
     }
-
-
 }
index 9b7039764f57ba56eb9469354446cad35a6b7537..93145bdd6d86360070dce5102dbd968c6ebc0629 100644 (file)
 
 package org.opendaylight.controller.cluster.datastore;
 
+import static org.mockito.Mockito.doReturn;
+
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
@@ -23,30 +26,39 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContext;
 public class TransactionChainProxyTest {
     ActorContext actorContext = Mockito.mock(ActorContext.class);
     SchemaContext schemaContext = Mockito.mock(SchemaContext.class);
+
+    @Before
+    public void setUp() {
+        doReturn(schemaContext).when(actorContext).getSchemaContext();
+    }
+
+    @SuppressWarnings("resource")
     @Test
     public void testNewReadOnlyTransaction() throws Exception {
 
-     DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newReadOnlyTransaction();
+     DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadOnlyTransaction();
          Assert.assertTrue(dst instanceof DOMStoreReadTransaction);
 
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testNewReadWriteTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newReadWriteTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newReadWriteTransaction();
         Assert.assertTrue(dst instanceof DOMStoreReadWriteTransaction);
 
     }
 
+    @SuppressWarnings("resource")
     @Test
     public void testNewWriteOnlyTransaction() throws Exception {
-        DOMStoreTransaction dst = new TransactionChainProxy(actorContext, schemaContext).newWriteOnlyTransaction();
+        DOMStoreTransaction dst = new TransactionChainProxy(actorContext).newWriteOnlyTransaction();
         Assert.assertTrue(dst instanceof DOMStoreWriteTransaction);
 
     }
 
     @Test(expected=UnsupportedOperationException.class)
     public void testClose() throws Exception {
-        new TransactionChainProxy(actorContext, schemaContext).close();
+        new TransactionChainProxy(actorContext).close();
     }
 }
index 6b11a24e9cedbb3a8234fb6e337f7750a413721f..f69ae88ec873ed044ab40c36a63e6f0b68b9db67 100644 (file)
@@ -99,6 +99,7 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         doReturn(getSystem()).when(mockActorContext).getActorSystem();
         doReturn(memberName).when(mockActorContext).getCurrentMemberName();
+        doReturn(schemaContext).when(mockActorContext).getSchemaContext();
 
         ShardStrategyFactory.setConfiguration(configuration);
     }
@@ -255,7 +256,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         doReturn(readDataReply(null)).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
@@ -285,7 +286,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -298,7 +299,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.read(TestModel.TEST_PATH));
     }
@@ -310,7 +311,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 anyString(), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         propagateReadFailedExceptionCause(invoker.invoke(transactionProxy));
     }
@@ -357,7 +358,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -384,7 +385,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, expectedNode);
 
@@ -400,7 +401,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testReadPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.read(TestModel.TEST_PATH);
     }
@@ -410,7 +411,7 @@ public class TransactionProxyTest extends AbstractActorTest {
         ActorRef actorRef = setupActorContextWithInitialCreateTransaction(READ_ONLY);
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         doReturn(dataExistsReply(false)).when(mockActorContext).executeRemoteOperationAsync(
                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
@@ -445,7 +446,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         transactionProxy.exists(TestModel.TEST_PATH).checkedGet(5, TimeUnit.SECONDS);
     }
@@ -458,7 +459,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 executeRemoteOperationAsync(any(ActorSelection.class), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         propagateReadFailedExceptionCause(transactionProxy.exists(TestModel.TEST_PATH));
     }
@@ -480,7 +481,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -507,7 +508,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqDataExists(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -520,7 +521,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testxistsPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.exists(TestModel.TEST_PATH);
     }
@@ -558,7 +559,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqWriteData(nodeToWrite), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -573,7 +574,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testWritePreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_ONLY, schemaContext);
+                READ_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH,
                 ImmutableNodes.containerNode(TestModel.TEST_QNAME));
@@ -583,7 +584,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testWriteAfterReadyPreConditionCheck() {
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.ready();
 
@@ -601,7 +602,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqMergeData(nodeToWrite), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -620,7 +621,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqDeleteData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -673,7 +674,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);
 
@@ -709,7 +710,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -742,7 +743,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.merge(TestModel.TEST_PATH, nodeToWrite);
 
@@ -765,7 +766,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 anyString(), any(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         NormalizedNode<?, ?> nodeToWrite = ImmutableNodes.containerNode(TestModel.TEST_QNAME);
 
@@ -799,7 +800,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                         isA(ReadyTransaction.SERIALIZABLE_CLASS), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                WRITE_ONLY, schemaContext);
+                WRITE_ONLY);
 
         transactionProxy.write(TestModel.TEST_PATH, nodeToWrite);
 
@@ -816,7 +817,7 @@ public class TransactionProxyTest extends AbstractActorTest {
     public void testGetIdentifier() {
         setupActorContextWithInitialCreateTransaction(READ_ONLY);
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                TransactionProxy.TransactionType.READ_ONLY, schemaContext);
+                TransactionProxy.TransactionType.READ_ONLY);
 
         Object id = transactionProxy.getIdentifier();
         assertNotNull("getIdentifier returned null", id);
@@ -832,7 +833,7 @@ public class TransactionProxyTest extends AbstractActorTest {
                 eq(actorSelection(actorRef)), eqReadData(), anyDuration());
 
         TransactionProxy transactionProxy = new TransactionProxy(mockActorContext,
-                READ_WRITE, schemaContext);
+                READ_WRITE);
 
         transactionProxy.read(TestModel.TEST_PATH);