Enhancements to actor naming, logging and monitoring 26/8426/5
authorMoiz Raja <moraja@cisco.com>
Fri, 27 Jun 2014 20:45:05 +0000 (13:45 -0700)
committerEd Warnicke <eaw@cisco.com>
Fri, 4 Jul 2014 20:26:54 +0000 (20:26 +0000)
- Actor names have now been changed to be more meaningful. This will be helpful when trying to follow the logging.
- Added logging for when the actor is created and when it is terminated

Change-Id: I825270779ce19c319807c5a3c56d4885f8cc0996
Signed-off-by: Moiz Raja <moraja@cisco.com>
24 files changed:
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/AbstractUntypedActor.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ActorSystemFactory.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistration.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChain.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionChainProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TransactionProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransaction.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/CreateTransactionReply.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/BasicIntegrationTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerRegistrationProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionChainTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTransactionTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ThreePhaseCommitCohortProxyTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/TransactionProxyTest.java

index aae468fbc60da9cf9149e2f3d39dfd780017be52..0f10258e9ef77c9d2a1347f5118e64fd508517c2 100644 (file)
@@ -11,11 +11,21 @@ package org.opendaylight.controller.cluster.datastore;
 import akka.actor.UntypedActor;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.datastore.messages.Monitor;
 
 public abstract class AbstractUntypedActor extends UntypedActor {
     protected final LoggingAdapter LOG =
         Logging.getLogger(getContext().system(), this);
 
+
+    public AbstractUntypedActor(){
+        LOG.debug("Actor created {}", getSelf());
+        getContext().
+            system().
+            actorSelection("user/termination-monitor").
+            tell(new Monitor(getSelf()), getSelf());
+    }
+
     @Override public void onReceive(Object message) throws Exception {
         LOG.debug("Received message {}", message);
         handleReceive(message);
index c562e6f50f173f27b943ab358c7828c039eecaf1..baf04fe43b771fddc1c009d11aed056dea6bbe24 100644 (file)
@@ -9,12 +9,23 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.base.Function;
 import com.typesafe.config.ConfigFactory;
 
+import javax.annotation.Nullable;
+
 public class ActorSystemFactory {
-    private static final ActorSystem actorSystem =
-        ActorSystem.create("opendaylight-cluster", ConfigFactory
-            .load().getConfig("ODLCluster"));
+    private static final ActorSystem actorSystem = (new Function<Void, ActorSystem>(){
+
+        @Nullable @Override public ActorSystem apply(@Nullable Void aVoid) {
+                ActorSystem system =
+                    ActorSystem.create("opendaylight-cluster", ConfigFactory
+                        .load().getConfig("ODLCluster"));
+                system.actorOf(Props.create(TerminationMonitor.class), "termination-monitor");
+                return system;
+        }
+    }).apply(null);
 
     public static final ActorSystem getInstance(){
         return actorSystem;
index c2eab0df440689ef22fb6743a03449c640a8c605..dca97354876812601d69c6bc9764779608710d09 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.PoisonPill;
 import akka.actor.Props;
 import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
@@ -16,34 +17,40 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
-public class DataChangeListenerRegistration extends AbstractUntypedActor{
+public class DataChangeListenerRegistration extends AbstractUntypedActor {
 
-  private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration;
+    private final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+        registration;
 
-  public DataChangeListenerRegistration(
-      org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
-    this.registration = registration;
-  }
+    public DataChangeListenerRegistration(
+        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        this.registration = registration;
+    }
+
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if (message instanceof CloseDataChangeListenerRegistration) {
+            closeListenerRegistration(
+                (CloseDataChangeListenerRegistration) message);
+        }
+    }
+
+    public static Props props(
+        final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+        return Props.create(new Creator<DataChangeListenerRegistration>() {
+
+            @Override
+            public DataChangeListenerRegistration create() throws Exception {
+                return new DataChangeListenerRegistration(registration);
+            }
+        });
+    }
 
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if(message instanceof CloseDataChangeListenerRegistration){
-      closeListenerRegistration((CloseDataChangeListenerRegistration) message);
+    private void closeListenerRegistration(
+        CloseDataChangeListenerRegistration message) {
+        registration.close();
+        getSender()
+            .tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
+        getSelf().tell(PoisonPill.getInstance(), getSelf());
     }
-  }
-
-  public static Props props(final org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration){
-    return Props.create(new Creator<DataChangeListenerRegistration>(){
-
-      @Override
-      public DataChangeListenerRegistration create() throws Exception {
-        return new DataChangeListenerRegistration(registration);
-      }
-    });
-  }
-
-  private void closeListenerRegistration(CloseDataChangeListenerRegistration message){
-    registration.close();
-    getSender().tell(new CloseDataChangeListenerRegistrationReply(), getSelf());
-  }
 }
index 89cc9695251d18b2efc747b0de2b2882431a8de5..83737cfac5b4133d051839f2a1b4b29245f80f80 100644 (file)
@@ -8,7 +8,9 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
 import org.opendaylight.controller.cluster.datastore.messages.CloseDataChangeListenerRegistration;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
@@ -25,13 +27,15 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 public class DataChangeListenerRegistrationProxy implements ListenerRegistration {
     private final ActorSelection listenerRegistrationActor;
     private final AsyncDataChangeListener listener;
+    private final ActorRef dataChangeListenerActor;
 
     public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
     DataChangeListenerRegistrationProxy(
         ActorSelection listenerRegistrationActor,
-        L listener) {
+        L listener, ActorRef dataChangeListenerActor) {
         this.listenerRegistrationActor = listenerRegistrationActor;
         this.listener = listener;
+        this.dataChangeListenerActor = dataChangeListenerActor;
     }
 
     @Override
@@ -42,5 +46,6 @@ public class DataChangeListenerRegistrationProxy implements ListenerRegistration
     @Override
     public void close() {
         listenerRegistrationActor.tell(new CloseDataChangeListenerRegistration(), null);
+        dataChangeListenerActor.tell(PoisonPill.getInstance(), null);
     }
 }
index 58b22a9970cb452dd51f44e84b8b189f3f97e341..4401104a85971c77c1b9a9333c727348f5398656 100644 (file)
@@ -29,6 +29,9 @@ import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 /**
  *
  */
@@ -41,8 +44,20 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
     private final String type;
     private final ActorContext actorContext;
 
+
+    /**
+     * Executor used to run FutureTask's
+     *
+     * This is typically used when we need to make a request to an actor and
+     * wait for it's response and the consumer needs to be provided a Future.
+     *
+     * FIXME : Make the thread pool configurable
+     */
+    private final ExecutorService executor =
+        Executors.newFixedThreadPool(10);
+
     public DistributedDataStore(ActorSystem actorSystem, String type) {
-        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type))), type);
+        this(new ActorContext(actorSystem, actorSystem.actorOf(ShardManager.props(type), "shardmanager-" + type)), type);
     }
 
     public DistributedDataStore(ActorContext actorContext, String type) {
@@ -66,29 +81,32 @@ public class DistributedDataStore implements DOMStore, SchemaContextListener, Au
         );
 
         RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
-        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener);
+        return new DataChangeListenerRegistrationProxy(actorContext.actorSelection(reply.getListenerRegistrationPath()), listener, dataChangeListenerActor);
     }
 
 
 
     @Override
     public DOMStoreTransactionChain createTransactionChain() {
-        return new TransactionChainProxy(actorContext);
+        return new TransactionChainProxy(actorContext, executor);
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_ONLY,
+            executor);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.WRITE_ONLY,
+            executor);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE);
+        return new TransactionProxy(actorContext, TransactionProxy.TransactionType.READ_WRITE,
+            executor);
     }
 
     @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
index 221e874fd507b6ce1e52016ae09f33d9a0c5d514..3425608d235ddce1555dac19f6bc3e985b65f7f9 100644 (file)
@@ -105,19 +105,19 @@ public class Shard extends UntypedProcessor {
         } else if (message instanceof Persistent) {
             commit((Modification) ((Persistent) message).payload());
         } else if (message instanceof CreateTransaction) {
-            createTransaction();
+            createTransaction((CreateTransaction) message);
         } else if(message instanceof NonPersistent){
             commit((Modification) ((NonPersistent) message).payload());
         }
     }
 
-    private void createTransaction() {
+    private void createTransaction(CreateTransaction createTransaction) {
         DOMStoreReadWriteTransaction transaction =
             store.newReadWriteTransaction();
         ActorRef transactionActor = getContext().actorOf(
-            ShardTransaction.props(transaction, getSelf()));
+            ShardTransaction.props(transaction, getSelf()), "shard-" + createTransaction.getTransactionId());
         getSender()
-            .tell(new CreateTransactionReply(transactionActor.path()),
+            .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()),
                 getSelf());
     }
 
@@ -139,6 +139,7 @@ public class Shard extends UntypedProcessor {
                     future.get();
                     sender.tell(new CommitTransactionReply(), self);
                 } catch (InterruptedException | ExecutionException e) {
+                    // FIXME : Handle this properly
                     log.error(e, "An exception happened when committing");
                 }
             }
@@ -146,7 +147,6 @@ public class Shard extends UntypedProcessor {
     }
 
     private void handleForwardedCommit(ForwardedCommitTransaction message) {
-        log.info("received forwarded transaction");
         modificationToCohort
             .put(message.getModification(), message.getCohort());
         if(persistent) {
index 79e90c3fc9f36436f12975b38fd2d89a356cf2d6..250ef49e6f0b73ca33ad0a41eb36c661b9560038 100644 (file)
@@ -68,7 +68,7 @@ public class ShardManager extends AbstractUntypedActor {
    *             configuration or operational
    */
   private ShardManager(String type){
-    ActorRef actor = getContext().actorOf(Shard.props(Shard.DEFAULT_NAME + "-" + type));
+    ActorRef actor = getContext().actorOf(Shard.props("shard-" + Shard.DEFAULT_NAME + "-" + type), "shard-" + Shard.DEFAULT_NAME + "-" + type);
     defaultShardPath = actor.path();
   }
 
index ff02bfbcce520be7a791a555cba318520d90e978..e3d1e2d9d42e4968f4b81ca0ea33d40c2b7b07f7 100644 (file)
@@ -190,7 +190,7 @@ public class ShardTransaction extends AbstractUntypedActor {
     private void readyTransaction(ReadyTransaction message) {
         DOMStoreThreePhaseCommitCohort cohort = transaction.ready();
         ActorRef cohortActor = getContext().actorOf(
-            ThreePhaseCommitCohort.props(cohort, shardActor, modification));
+            ThreePhaseCommitCohort.props(cohort, shardActor, modification), "cohort");
         getSender()
             .tell(new ReadyTransactionReply(cohortActor.path()), getSelf());
 
index 57c935b0ad8d045d3325fc28d412503d744b4a65..1092e9a793d82443b2bda82eeb81fbebb7360675 100644 (file)
@@ -21,33 +21,42 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 /**
  * The ShardTransactionChain Actor represents a remote TransactionChain
  */
-public class ShardTransactionChain extends AbstractUntypedActor{
-
-  private final DOMStoreTransactionChain chain;
-
-  public ShardTransactionChain(DOMStoreTransactionChain chain) {
-    this.chain = chain;
-  }
-
-  @Override
-  public void handleReceive(Object message) throws Exception {
-    if(message instanceof CreateTransaction){
-      DOMStoreReadWriteTransaction transaction = chain.newReadWriteTransaction();
-      ActorRef transactionActor = getContext().actorOf(ShardTransaction.props(chain, transaction, getContext().parent()));
-      getSender().tell(new CreateTransactionReply(transactionActor.path()), getSelf());
-    } else if (message instanceof CloseTransactionChain){
-      chain.close();
-      getSender().tell(new CloseTransactionChainReply(), getSelf());
+public class ShardTransactionChain extends AbstractUntypedActor {
+
+    private final DOMStoreTransactionChain chain;
+
+    public ShardTransactionChain(DOMStoreTransactionChain chain) {
+        this.chain = chain;
     }
-  }
 
-  public static Props props(final DOMStoreTransactionChain chain){
-    return Props.create(new Creator<ShardTransactionChain>(){
+    @Override
+    public void handleReceive(Object message) throws Exception {
+        if (message instanceof CreateTransaction) {
+            CreateTransaction createTransaction = (CreateTransaction) message;
+            createTransaction(createTransaction);
+        } else if (message instanceof CloseTransactionChain) {
+            chain.close();
+            getSender().tell(new CloseTransactionChainReply(), getSelf());
+        }
+    }
 
-      @Override
-      public ShardTransactionChain create() throws Exception {
-        return new ShardTransactionChain(chain);
-      }
-    });
-  }
+    private void createTransaction(CreateTransaction createTransaction) {
+        DOMStoreReadWriteTransaction transaction =
+            chain.newReadWriteTransaction();
+        ActorRef transactionActor = getContext().actorOf(ShardTransaction
+            .props(chain, transaction, getContext().parent()), "shard-" + createTransaction.getTransactionId());
+        getSender()
+            .tell(new CreateTransactionReply(transactionActor.path(), createTransaction.getTransactionId()),
+                getSelf());
+    }
+
+    public static Props props(final DOMStoreTransactionChain chain) {
+        return Props.create(new Creator<ShardTransactionChain>() {
+
+            @Override
+            public ShardTransactionChain create() throws Exception {
+                return new ShardTransactionChain(chain);
+            }
+        });
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/TerminationMonitor.java
new file mode 100644 (file)
index 0000000..e6ac7f8
--- /dev/null
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Terminated;
+import akka.actor.UntypedActor;
+import akka.event.Logging;
+import akka.event.LoggingAdapter;
+import org.opendaylight.controller.cluster.datastore.messages.Monitor;
+
+public class TerminationMonitor extends UntypedActor{
+    protected final LoggingAdapter LOG =
+        Logging.getLogger(getContext().system(), this);
+
+    public TerminationMonitor(){
+        LOG.info("Created TerminationMonitor");
+    }
+
+    @Override public void onReceive(Object message) throws Exception {
+        if(message instanceof Terminated){
+            Terminated terminated = (Terminated) message;
+            LOG.debug("Actor terminated : {}", terminated.actor());
+        } else if(message instanceof Monitor){
+            Monitor monitor = (Monitor) message;
+            getContext().watch(monitor.getActorRef());
+        }
+    }
+}
index d12dc2b55a17f0ebaf1a3999778b109efbd60a0f..279ecba40977e1293a9fbad090cd1a0dfddff2a4 100644 (file)
@@ -30,7 +30,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * ThreePhaseCommitCohortProxy represents a set of remote cohort proxies
@@ -43,13 +42,19 @@ public class ThreePhaseCommitCohortProxy implements
 
     private final ActorContext actorContext;
     private final List<ActorPath> cohortPaths;
-    //FIXME : Use a thread pool here
-    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    private final ExecutorService executor;
+    private final String transactionId;
 
 
-    public ThreePhaseCommitCohortProxy(ActorContext actorContext, List<ActorPath> cohortPaths) {
+    public ThreePhaseCommitCohortProxy(ActorContext actorContext,
+        List<ActorPath> cohortPaths,
+        String transactionId,
+        ExecutorService executor) {
+
         this.actorContext = actorContext;
         this.cohortPaths = cohortPaths;
+        this.transactionId = transactionId;
+        this.executor = executor;
     }
 
     @Override public ListenableFuture<Boolean> canCommit() {
@@ -86,7 +91,7 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Boolean>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;
     }
@@ -136,7 +141,7 @@ public class ThreePhaseCommitCohortProxy implements
         ListenableFutureTask<Void>
             future = ListenableFutureTask.create(call);
 
-        executorService.submit(future);
+        executor.submit(future);
 
         return future;
 
index 91e903f9e8993bfb68b83e0d28514124b163b258..71b61ffaa0bbfd81121b87ee616e3cce2e71ac07 100644 (file)
@@ -14,32 +14,36 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransactio
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
 import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 
+import java.util.concurrent.ExecutorService;
+
 /**
  * TransactionChainProxy acts as a proxy for a DOMStoreTransactionChain created on a remote shard
  */
 public class TransactionChainProxy implements DOMStoreTransactionChain{
     private final ActorContext actorContext;
+    private final ExecutorService transactionExecutor;
 
-    public TransactionChainProxy(ActorContext actorContext) {
+    public TransactionChainProxy(ActorContext actorContext, ExecutorService transactionExecutor) {
         this.actorContext = actorContext;
+        this.transactionExecutor = transactionExecutor;
     }
 
     @Override
     public DOMStoreReadTransaction newReadOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_ONLY);
+            TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
     }
 
     @Override
     public DOMStoreReadWriteTransaction newReadWriteTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.WRITE_ONLY);
+            TransactionProxy.TransactionType.WRITE_ONLY, transactionExecutor);
     }
 
     @Override
     public DOMStoreWriteTransaction newWriteOnlyTransaction() {
         return new TransactionProxy(actorContext,
-            TransactionProxy.TransactionType.READ_WRITE);
+            TransactionProxy.TransactionType.READ_WRITE, transactionExecutor);
     }
 
     @Override
index 00196ebd078e37f9778f94af9e8ab3a47dd9bb53..74245c42592ca3d6743d0aef3b48bb2e6ace2b45 100644 (file)
@@ -34,7 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicLong;
  * </p>
  */
 public class TransactionProxy implements DOMStoreReadWriteTransaction {
-
     public enum TransactionType {
         READ_ONLY,
         WRITE_ONLY,
@@ -63,16 +62,20 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
     private final ActorContext actorContext;
     private final Map<String, ActorSelection> remoteTransactionPaths = new HashMap<>();
     private final String identifier;
+    private final ExecutorService executor;
 
     public TransactionProxy(
         ActorContext actorContext,
-        TransactionType transactionType) {
+        TransactionType transactionType,
+        ExecutorService executor
+        ) {
 
-        this.identifier = "transaction-" + counter.getAndIncrement();
+        this.identifier = "txn-" + counter.getAndIncrement();
         this.transactionType = transactionType;
         this.actorContext = actorContext;
+        this.executor = executor;
 
-        Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(), ActorContext.ASK_DURATION);
+        Object response = actorContext.executeShardOperation(Shard.DEFAULT_NAME, new CreateTransaction(identifier), ActorContext.ASK_DURATION);
         if(response instanceof CreateTransactionReply){
             CreateTransactionReply reply = (CreateTransactionReply) response;
             remoteTransactionPaths.put(Shard.DEFAULT_NAME, actorContext.actorSelection(reply.getTransactionPath()));
@@ -105,8 +108,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
         ListenableFutureTask<Optional<NormalizedNode<?, ?>>>
             future = ListenableFutureTask.create(call);
 
-        //FIXME : Use a thread pool here
-        Executors.newSingleThreadExecutor().submit(future);
+        executor.submit(future);
 
         return future;
     }
@@ -145,7 +147,7 @@ public class TransactionProxy implements DOMStoreReadWriteTransaction {
             }
         }
 
-        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths);
+        return new ThreePhaseCommitCohortProxy(actorContext, cohortPaths, identifier, executor);
     }
 
     @Override
index e0cdd3cc2b6876ff1e34a155b9e4f18336f0de7a..611064169655185fe0423bfc879b751d625a538b 100644 (file)
@@ -9,5 +9,14 @@
 package org.opendaylight.controller.cluster.datastore.messages;
 
 public class CreateTransaction {
+    private final String transactionId;
 
+    public CreateTransaction(String transactionId){
+
+        this.transactionId = transactionId;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
 }
index 4faf9d370d56a4dc4bb3edf97de469fb33f95362..46b7194c84c1f13b9503623d43ad1e2aef7a50aa 100644 (file)
@@ -11,13 +11,20 @@ package org.opendaylight.controller.cluster.datastore.messages;
 import akka.actor.ActorPath;
 
 public class CreateTransactionReply {
-  private final ActorPath transactionPath;
+    private final ActorPath transactionPath;
+    private final String transactionId;
 
-  public CreateTransactionReply(ActorPath transactionPath) {
-    this.transactionPath = transactionPath;
-  }
+    public CreateTransactionReply(ActorPath transactionPath,
+        String transactionId) {
+        this.transactionPath = transactionPath;
+        this.transactionId = transactionId;
+    }
 
-  public ActorPath getTransactionPath() {
-    return transactionPath;
-  }
+    public ActorPath getTransactionPath() {
+        return transactionPath;
+    }
+
+    public String getTransactionId() {
+        return transactionId;
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/Monitor.java
new file mode 100644 (file)
index 0000000..567f14a
--- /dev/null
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+
+public class Monitor {
+    private final ActorRef actorRef;
+
+    public Monitor(ActorRef actorRef){
+
+        this.actorRef = actorRef;
+    }
+
+    public ActorRef getActorRef() {
+        return actorRef;
+    }
+}
index 74c858e4a6b329ccb6536ddd7f4a1d42bec2885e..dfefc5ed579b36163f882485f38f577b53832b8e 100644 (file)
@@ -77,7 +77,7 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Assert.assertNotNull(transactionChain);
 
-                    transactionChain.tell(new CreateTransaction(), getRef());
+                    transactionChain.tell(new CreateTransaction("txn-1"), getRef());
 
                     final ActorSelection transaction =
                         new ExpectMsg<ActorSelection>("CreateTransactionReply") {
@@ -152,6 +152,10 @@ public class BasicIntegrationTest extends AbstractActorTest {
 
                     Assert.assertTrue(preCommitDone);
 
+                    // FIXME : When we commit on the cohort it "kills" the Transaction.
+                    // This in turn kills the child of Transaction as well.
+                    // The order in which we receive the terminated event for both
+                    // these actors is not fixed which may cause this test to fail
                     cohort.tell(new CommitTransaction(), getRef());
 
                     final Boolean terminatedCohort =
index be7be1723d668a56645d1d0d19c51267dc7f1969..33b5d956115c7c4be6b20d47df82d4b11cce8c65 100644 (file)
@@ -17,6 +17,8 @@ import java.util.List;
 
 public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
 
+    private ActorRef dataChangeListenerActor = getSystem().actorOf(Props.create(DoNothingActor.class));
+
     private static class MockDataChangeListener implements
         AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> {
 
@@ -36,7 +38,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
         DataChangeListenerRegistrationProxy proxy =
             new DataChangeListenerRegistrationProxy(
                 getSystem().actorSelection(actorRef.path()),
-                listener);
+                listener, dataChangeListenerActor);
 
         Assert.assertEquals(listener, proxy.getInstance());
 
@@ -50,7 +52,7 @@ public class DataChangeListenerRegistrationProxyTest extends AbstractActorTest{
         DataChangeListenerRegistrationProxy proxy =
             new DataChangeListenerRegistrationProxy(
                 getSystem().actorSelection(actorRef.path()),
-                new MockDataChangeListener());
+                new MockDataChangeListener(), dataChangeListenerActor);
 
         proxy.close();
 
index 3a74a4ca7656dc01ae650d011595335474297de1..5f82b40140eb11074c03419121ebd511c08d59e4 100644 (file)
@@ -39,7 +39,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
         // Make CreateTransactionReply as the default response. Will need to be
         // tuned if a specific test requires some other response
         mockActorContext.setExecuteShardOperationResponse(
-            new CreateTransactionReply(doNothingActorRef.path()));
+            new CreateTransactionReply(doNothingActorRef.path(), "txn-1 "));
     }
 
     @org.junit.After
index b9ab8a3282df0ebb1455c73eee8c162575ab0af5..ed447e004fd9b91a7ad7da1561fdb83bf3cbb2d7 100644 (file)
@@ -18,6 +18,7 @@ import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListene
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ShardTest extends AbstractActorTest {
@@ -47,9 +48,10 @@ public class ShardTest extends AbstractActorTest {
                         }
                     }.get(); // this extracts the received message
 
-                    assertTrue(out.matches(
-                        "akka:\\/\\/test\\/user\\/testCreateTransactionChain\\/\\$.*"));
-                    // Will wait for the rest of the 3 seconds
+                    assertEquals("Unexpected transaction path " + out,
+                        "akka://test/user/testCreateTransactionChain/$a",
+                        out);
+
                     expectNoMsg();
                 }
 
@@ -115,7 +117,7 @@ public class ShardTest extends AbstractActorTest {
                         new UpdateSchemaContext(TestModel.createTestContext()),
                         getRef());
 
-                    subject.tell(new CreateTransaction(),
+                    subject.tell(new CreateTransaction("txn-1"),
                         getRef());
 
                     final String out = new ExpectMsg<String>("match hint") {
@@ -132,9 +134,9 @@ public class ShardTest extends AbstractActorTest {
                         }
                     }.get(); // this extracts the received message
 
-                    assertTrue(out.matches(
-                        "akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
-                    // Will wait for the rest of the 3 seconds
+                    assertEquals("Unexpected transaction path " + out,
+                        "akka://test/user/testCreateTransaction/shard-txn-1",
+                        out);
                     expectNoMsg();
                 }
 
index bc3a1046566b2259012916c85e70ae933d921b3d..b07cbfd87ce268e3707064d81c2c5596af683af5 100644 (file)
@@ -14,7 +14,6 @@ import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 public class ShardTransactionChainTest extends AbstractActorTest {
 
@@ -34,7 +33,7 @@ public class ShardTransactionChainTest extends AbstractActorTest {
       new Within(duration("1 seconds")) {
         protected void run() {
 
-          subject.tell(new CreateTransaction(), getRef());
+          subject.tell(new CreateTransaction("txn-1"), getRef());
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
@@ -47,8 +46,11 @@ public class ShardTransactionChainTest extends AbstractActorTest {
             }
           }.get(); // this extracts the received message
 
-          assertTrue(out.matches("akka:\\/\\/test\\/user\\/testCreateTransaction\\/\\$.*"));
-          // Will wait for the rest of the 3 seconds
+            assertEquals("Unexpected transaction path " + out,
+                "akka://test/user/testCreateTransaction/shard-txn-1",
+                out);
+
+            // Will wait for the rest of the 3 seconds
           expectNoMsg();
         }
 
@@ -88,4 +90,4 @@ public class ShardTransactionChainTest extends AbstractActorTest {
       };
     }};
   }
-}
\ No newline at end of file
+}
index af3da575714c57d3b39786313a089d63d6419615..8ff785c8797eecb166c0381ab3e7a92980fa8396 100644 (file)
@@ -14,6 +14,8 @@ import org.opendaylight.controller.cluster.datastore.utils.MessageCollectorActor
 import org.opendaylight.controller.cluster.datastore.utils.MockActorContext;
 
 import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -23,6 +25,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
     private Props props;
     private ActorRef actorRef;
     private MockActorContext actorContext;
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
 
     @Before
     public void setUp(){
@@ -32,7 +35,7 @@ public class ThreePhaseCommitCohortProxyTest extends AbstractActorTest {
 
         proxy =
             new ThreePhaseCommitCohortProxy(actorContext,
-                Arrays.asList(actorRef.path()));
+                Arrays.asList(actorRef.path()), "txn-1", executor);
 
     }
 
index 8b70e00da923d92a806c59530c2111b5cfe87b7d..a8df49f5ca4514ceb9ea1dfc25809cf789647150 100644 (file)
@@ -23,21 +23,26 @@ import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 import org.opendaylight.yangtools.yang.data.impl.schema.ImmutableNodes;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 public class TransactionProxyTest extends AbstractActorTest {
 
+    private ExecutorService transactionExecutor =
+        Executors.newSingleThreadExecutor();
+
     @Test
     public void testRead() throws Exception {
         final Props props = Props.create(DoNothingActor.class);
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
 
         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
@@ -63,12 +68,12 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
 
         ListenableFuture<Optional<NormalizedNode<?, ?>>> read =
@@ -94,12 +99,12 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
         transactionProxy.write(TestModel.TEST_PATH,
             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
@@ -126,12 +131,12 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
         transactionProxy.merge(TestModel.TEST_PATH,
             ImmutableNodes.containerNode(TestModel.NAME_QNAME));
@@ -158,12 +163,12 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
         transactionProxy.delete(TestModel.TEST_PATH);
 
@@ -189,12 +194,12 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef doNothingActorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(doNothingActorRef.path()));
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(doNothingActorRef));
         actorContext.setExecuteRemoteOperationResponse(new ReadyTransactionReply(doNothingActorRef.path()));
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
 
         DOMStoreThreePhaseCommitCohort ready = transactionProxy.ready();
@@ -213,12 +218,11 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef doNothingActorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(
-            new CreateTransactionReply(doNothingActorRef.path()));
+        actorContext.setExecuteShardOperationResponse( createTransactionReply(doNothingActorRef) );
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
         Assert.assertNotNull(transactionProxy.getIdentifier());
     }
@@ -229,12 +233,12 @@ public class TransactionProxyTest extends AbstractActorTest {
         final ActorRef actorRef = getSystem().actorOf(props);
 
         final MockActorContext actorContext = new MockActorContext(this.getSystem());
-        actorContext.setExecuteShardOperationResponse(new CreateTransactionReply(actorRef.path()));
+        actorContext.setExecuteShardOperationResponse(createTransactionReply(actorRef));
         actorContext.setExecuteRemoteOperationResponse("message");
 
         TransactionProxy transactionProxy =
             new TransactionProxy(actorContext,
-                TransactionProxy.TransactionType.READ_ONLY);
+                TransactionProxy.TransactionType.READ_ONLY, transactionExecutor);
 
         transactionProxy.close();
 
@@ -253,4 +257,8 @@ public class TransactionProxyTest extends AbstractActorTest {
 
         Assert.assertTrue(listMessages.get(0) instanceof CloseTransaction);
     }
+
+    private CreateTransactionReply createTransactionReply(ActorRef actorRef){
+        return new CreateTransactionReply(actorRef.path(), "txn-1");
+    }
 }