Implement DistributedDataStore#registerDataChangeListener 48/8348/1
authorMoiz Raja <moraja@cisco.com>
Tue, 24 Jun 2014 03:21:26 +0000 (20:21 -0700)
committerMoiz Raja <moraja@cisco.com>
Wed, 25 Jun 2014 18:53:51 +0000 (11:53 -0700)
Change-Id: I91ac204f2578c43734a30029a0f68dc9ddd775cb
Signed-off-by: Moiz Raja <moraja@cisco.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java
new file mode 100644 (file)
index 0000000..ba09d04
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.actor.UntypedActor;
+import akka.japi.Creator;
+
+public class DataChangeListener extends UntypedActor {
+    @Override public void onReceive(Object message) throws Exception {
+        throw new UnsupportedOperationException("onReceive");
+    }
+
+    public static Props props() {
+        return Props.create(new Creator<DataChangeListener>() {
+            @Override
+            public DataChangeListener create() throws Exception {
+                return new DataChangeListener();
+            }
+
+        });
+
+    }
+}
index c87f1abb21c74cf585d2fcfe6a4057b0c7143e99..29fc259bb75935d25f13dcb8f7a0865f74f145cd 100644 (file)
@@ -9,7 +9,14 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
+import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.controller.sal.core.spi.data.DOMStore;
@@ -20,39 +27,114 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
 import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.model.api.SchemaContext;
+import org.opendaylight.yangtools.yang.model.api.SchemaContextListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+
+import static akka.pattern.Patterns.ask;
 
 /**
  *
  */
-public class DistributedDataStore implements DOMStore {
-  private final ActorRef shardManager;
-
-  public DistributedDataStore(ActorSystem actorSystem, String type) {
-    shardManager = actorSystem.actorOf(ShardManager.props(type));
-  }
-
-  @Override
-  public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) {
-    return new ListenerRegistrationProxy();
-  }
-
-  @Override
-  public DOMStoreTransactionChain createTransactionChain() {
-    return new TransactionChainProxy();
-  }
-
-  @Override
-  public DOMStoreReadTransaction newReadOnlyTransaction() {
-    return new TransactionProxy();
-  }
-
-  @Override
-  public DOMStoreWriteTransaction newWriteOnlyTransaction() {
-    return new TransactionProxy();
-  }
-
-  @Override
-  public DOMStoreReadWriteTransaction newReadWriteTransaction() {
-    return new TransactionProxy();
-  }
+public class DistributedDataStore implements DOMStore, SchemaContextListener {
+
+    private static final Logger
+        LOG = LoggerFactory.getLogger(DistributedDataStore.class);
+
+    final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS);
+    final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS);
+
+    private final ActorRef shardManager;
+    private final ActorSystem actorSystem;
+    private final String type;
+
+
+    public DistributedDataStore(ActorSystem actorSystem, String type) {
+        this.actorSystem = actorSystem;
+        this.type = type;
+        shardManager = actorSystem.actorOf(ShardManager.props(type));
+    }
+
+    @Override
+    public <L extends AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> ListenerRegistration<L> registerChangeListener(
+        InstanceIdentifier path, L listener,
+        AsyncDataBroker.DataChangeScope scope) {
+
+        ActorSelection primary = findPrimary();
+
+        ActorRef dataChangeListenerActor = actorSystem.actorOf(DataChangeListener.props());
+
+        Object result =
+            getResult(primary, new RegisterChangeListener(path, dataChangeListenerActor.path(),
+                AsyncDataBroker.DataChangeScope.BASE), ASK_DURATION);
+
+        RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result;
+        return new ListenerRegistrationProxy(reply.getListenerRegistrationPath());
+    }
+
+    private ActorSelection findPrimary() {
+        Object result = getResult(shardManager, new FindPrimary(Shard.DEFAULT_NAME), ASK_DURATION);
+
+        if(result instanceof PrimaryFound){
+            PrimaryFound found = (PrimaryFound) result;
+            LOG.error("Primary found {}", found.getPrimaryPath());
+
+            return actorSystem.actorSelection(found.getPrimaryPath());
+        }
+        throw new RuntimeException("primary was not found");
+    }
+
+    private Object getResult(ActorRef actor, Object message, FiniteDuration duration){
+        Future<Object> future =
+            ask(actor, message, new Timeout(duration));
+
+        try {
+            return Await.result(future, AWAIT_DURATION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private Object getResult(ActorSelection actor, Object message, FiniteDuration duration){
+        Future<Object> future =
+            ask(actor, message, new Timeout(duration));
+
+        try {
+            return Await.result(future, AWAIT_DURATION);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    @Override
+    public DOMStoreTransactionChain createTransactionChain() {
+        return new TransactionChainProxy();
+    }
+
+    @Override
+    public DOMStoreReadTransaction newReadOnlyTransaction() {
+        return new TransactionProxy();
+    }
+
+    @Override
+    public DOMStoreWriteTransaction newWriteOnlyTransaction() {
+        return new TransactionProxy();
+    }
+
+    @Override
+    public DOMStoreReadWriteTransaction newReadWriteTransaction() {
+        return new TransactionProxy();
+    }
+
+    @Override public void onGlobalContextUpdated(SchemaContext schemaContext) {
+        shardManager.tell(new UpdateSchemaContext(schemaContext), null);
+    }
 }
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java
new file mode 100644 (file)
index 0000000..7c38ee5
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2014 Cisco Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorSelection;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+public class ListenerProxy implements AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>{
+    private final ActorSelection listenerRegistrationActor;
+
+    public ListenerProxy(ActorSelection listenerRegistrationActor) {
+        this.listenerRegistrationActor = listenerRegistrationActor;
+    }
+
+    @Override public void onDataChanged(
+        AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
+        throw new UnsupportedOperationException("onDataChanged");
+    }
+}
index c2fc8c0277472108abc4f72b5012445ef4937807..a548a885eb2c73600e013ccf3a0cd46fe34950d9 100644 (file)
@@ -8,6 +8,7 @@
 
 package org.opendaylight.controller.cluster.datastore;
 
+import akka.actor.ActorPath;
 import org.opendaylight.yangtools.concepts.ListenerRegistration;
 
 /**
@@ -17,6 +18,13 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration;
  * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor.
  */
 public class ListenerRegistrationProxy implements ListenerRegistration {
+    private final ActorPath listenerRegistrationPath;
+
+    public ListenerRegistrationProxy(ActorPath listenerRegistrationPath) {
+
+        this.listenerRegistrationPath = listenerRegistrationPath;
+    }
+
     @Override
     public Object getInstance() {
         throw new UnsupportedOperationException("getInstance");
index d75edc7922f54ea46d72e4fd62fc4bc0e0aa3143..5b4f7ef8989711dffdf4cdd8fbe7d483165f23a7 100644 (file)
@@ -9,6 +9,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
 import akka.actor.Props;
 import akka.event.Logging;
 import akka.event.LoggingAdapter;
@@ -46,89 +47,112 @@ import java.util.concurrent.Executors;
  */
 public class Shard extends UntypedProcessor {
 
-  public static final String DEFAULT_NAME = "default";
+    public static final String DEFAULT_NAME = "default";
 
-  private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
+    private final ListeningExecutorService storeExecutor =
+        MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));
 
-  private final InMemoryDOMDataStore store;
+    private final InMemoryDOMDataStore store;
 
-  private final Map<Modification, DOMStoreThreePhaseCommitCohort> modificationToCohort = new HashMap<>();
+    private final Map<Modification, DOMStoreThreePhaseCommitCohort>
+        modificationToCohort = new HashMap<>();
 
-  private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
+    private final LoggingAdapter log =
+        Logging.getLogger(getContext().system(), this);
 
-  private Shard(String name){
-    store = new InMemoryDOMDataStore(name, storeExecutor);
-  }
-
-  public static Props props(final String name) {
-    return Props.create(new Creator<Shard>() {
+    private Shard(String name) {
+        store = new InMemoryDOMDataStore(name, storeExecutor);
+    }
 
-      @Override
-      public Shard create() throws Exception {
-        return new Shard(name);
-      }
+    public static Props props(final String name) {
+        return Props.create(new Creator<Shard>() {
 
-    });
-  }
+            @Override
+            public Shard create() throws Exception {
+                return new Shard(name);
+            }
 
-  @Override
-  public void onReceive(Object message) throws Exception {
-    if (message instanceof CreateTransactionChain) {
-      createTransactionChain();
-    } else if (message instanceof RegisterChangeListener) {
-      registerChangeListener((RegisterChangeListener) message);
-    } else if (message instanceof UpdateSchemaContext) {
-      updateSchemaContext((UpdateSchemaContext) message);
-    } else if (message instanceof ForwardedCommitTransaction ) {
-      handleForwardedCommit((ForwardedCommitTransaction) message);
-    } else if (message instanceof Persistent){
-      commit((Persistent) message);
+        });
     }
-  }
-
-  private void commit(Persistent message) {
-    Modification modification = (Modification) message.payload();
-    DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification);
-    if(cohort == null){
-      log.error("Could not find cohort for modification : " + modification);
-      return;
+
+    @Override
+    public void onReceive(Object message) throws Exception {
+        if (message instanceof CreateTransactionChain) {
+            createTransactionChain();
+        } else if (message instanceof RegisterChangeListener) {
+            registerChangeListener((RegisterChangeListener) message);
+        } else if (message instanceof UpdateSchemaContext) {
+            updateSchemaContext((UpdateSchemaContext) message);
+        } else if (message instanceof ForwardedCommitTransaction) {
+            handleForwardedCommit((ForwardedCommitTransaction) message);
+        } else if (message instanceof Persistent) {
+            commit((Persistent) message);
+        }
     }
-    final ListenableFuture<Void> future = cohort.commit();
-    final ActorRef sender = getSender();
-    final ActorRef self = getSelf();
-    future.addListener(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          future.get();
-          sender.tell(new CommitTransactionReply(), self);
-        } catch (InterruptedException | ExecutionException e) {
-          log.error(e, "An exception happened when committing");
+
+    private void commit(Persistent message) {
+        Modification modification = (Modification) message.payload();
+        DOMStoreThreePhaseCommitCohort cohort =
+            modificationToCohort.remove(modification);
+        if (cohort == null) {
+            log.error(
+                "Could not find cohort for modification : " + modification);
+            return;
         }
-      }
-    }, getContext().dispatcher());
-  }
-
-  private void handleForwardedCommit(ForwardedCommitTransaction message) {
-    log.info("received forwarded transaction");
-    modificationToCohort.put(message.getModification(), message.getCohort());
-    getSelf().forward(Persistent.create(message.getModification()), getContext());
-  }
-
-  private void updateSchemaContext(UpdateSchemaContext message) {
-    store.onGlobalContextUpdated(message.getSchemaContext());
-  }
-
-  private void registerChangeListener(RegisterChangeListener registerChangeListener) {
-    org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>> registration =
-            store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope());
-    ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration));
-    getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf());
-  }
-
-  private void createTransactionChain() {
-    DOMStoreTransactionChain chain = store.createTransactionChain();
-    ActorRef transactionChain = getContext().actorOf(ShardTransactionChain.props(chain));
-    getSender().tell(new CreateTransactionChainReply(transactionChain.path()), getSelf());
-  }
+        final ListenableFuture<Void> future = cohort.commit();
+        final ActorRef sender = getSender();
+        final ActorRef self = getSelf();
+        future.addListener(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    future.get();
+                    sender.tell(new CommitTransactionReply(), self);
+                } catch (InterruptedException | ExecutionException e) {
+                    log.error(e, "An exception happened when committing");
+                }
+            }
+        }, getContext().dispatcher());
+    }
+
+    private void handleForwardedCommit(ForwardedCommitTransaction message) {
+        log.info("received forwarded transaction");
+        modificationToCohort
+            .put(message.getModification(), message.getCohort());
+        getSelf().forward(Persistent.create(message.getModification()),
+            getContext());
+    }
+
+    private void updateSchemaContext(UpdateSchemaContext message) {
+        store.onGlobalContextUpdated(message.getSchemaContext());
+    }
+
+    private void registerChangeListener(
+        RegisterChangeListener registerChangeListener) {
+
+        ActorSelection listenerRegistrationActor = getContext()
+            .system().actorSelection(registerChangeListener.getDataChangeListenerPath());
+
+        AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>
+            listener = new ListenerProxy(listenerRegistrationActor);
+
+        org.opendaylight.yangtools.concepts.ListenerRegistration<AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>>
+            registration =
+            store.registerChangeListener(registerChangeListener.getPath(),
+                listener, registerChangeListener.getScope());
+        ActorRef listenerRegistration =
+            getContext().actorOf(ListenerRegistration.props(registration));
+        getSender()
+            .tell(new RegisterChangeListenerReply(listenerRegistration.path()),
+                getSelf());
+    }
+
+    private void createTransactionChain() {
+        DOMStoreTransactionChain chain = store.createTransactionChain();
+        ActorRef transactionChain =
+            getContext().actorOf(ShardTransactionChain.props(chain));
+        getSender()
+            .tell(new CreateTransactionChainReply(transactionChain.path()),
+                getSelf());
+    }
 }
index 8d8527a240fab4ebdeed00aae39882f108944b19..4e2369d3758596bd1217670f8f3ec5a2438db36d 100644 (file)
@@ -19,6 +19,7 @@ import akka.japi.Creator;
 import org.opendaylight.controller.cluster.datastore.messages.FindPrimary;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound;
 import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound;
+import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
 
 import java.util.HashMap;
 import java.util.List;
@@ -92,6 +93,9 @@ public class ShardManager extends UntypedActor {
       } else {
         getSender().tell(new PrimaryNotFound(shardName), getSelf());
       }
+    } else if(message instanceof UpdateSchemaContext){
+        // FIXME : Notify all local shards of a context change
+        getContext().system().actorSelection(defaultShardPath).forward(message, getContext());
     }
   }
 
index 0123a701471e29ef5a81ad41dc3fb2304e4cbf62..7c9e4f0665a2710e2ed4b28f4792e6a043a48800 100644 (file)
@@ -8,32 +8,34 @@
 
 package org.opendaylight.controller.cluster.datastore.messages;
 
+import akka.actor.ActorPath;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
 import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
 
 public class RegisterChangeListener {
-  private final InstanceIdentifier path;
-  private final AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener;
-  private final AsyncDataBroker.DataChangeScope scope;
+    private final InstanceIdentifier path;
+    private final ActorPath dataChangeListenerPath;
+    private final AsyncDataBroker.DataChangeScope scope;
 
 
-  public RegisterChangeListener(InstanceIdentifier path, AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> listener, AsyncDataBroker.DataChangeScope scope) {
-    this.path = path;
-    this.listener = listener;
-    this.scope = scope;
-  }
+    public RegisterChangeListener(InstanceIdentifier path,
+        ActorPath dataChangeListenerPath,
+        AsyncDataBroker.DataChangeScope scope) {
+        this.path = path;
+        this.dataChangeListenerPath = dataChangeListenerPath;
+        this.scope = scope;
+    }
 
-  public InstanceIdentifier getPath() {
-    return path;
-  }
+    public InstanceIdentifier getPath() {
+        return path;
+    }
 
-  public AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>> getListener() {
-    return listener;
-  }
 
-  public AsyncDataBroker.DataChangeScope getScope() {
-    return scope;
-  }
+    public AsyncDataBroker.DataChangeScope getScope() {
+        return scope;
+    }
+
+    public ActorPath getDataChangeListenerPath() {
+        return dataChangeListenerPath;
+    }
 }
index 45492fd71495cbe235b852a055aca0fe5818cc99..2a9356e63d495d15d20b6be45960e5ac9a6bc697 100644 (file)
@@ -1,6 +1,7 @@
 package org.opendaylight.controller.cluster.datastore;
 
 import junit.framework.Assert;
+import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent;
 import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
@@ -19,6 +20,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
     @org.junit.Before
     public void setUp() throws Exception {
         distributedDataStore = new DistributedDataStore(getSystem(), "config");
+        distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext());
     }
 
     @org.junit.After
@@ -29,7 +31,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{
     @org.junit.Test
     public void testRegisterChangeListener() throws Exception {
         ListenerRegistration registration =
-                distributedDataStore.registerChangeListener(InstanceIdentifier.builder().build(), new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
+                distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener<InstanceIdentifier, NormalizedNode<?, ?>>() {
             @Override
             public void onDataChanged(AsyncDataChangeEvent<InstanceIdentifier, NormalizedNode<?, ?>> change) {
                 throw new UnsupportedOperationException("onDataChanged");
index a9d8042ce238ffdc3be211a7625f50be957dc9f1..48365fa1a06a90c87131ab6bda0e71db78595ee6 100644 (file)
@@ -63,7 +63,7 @@ public class ShardTest extends AbstractActorTest{
 
           subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef());
 
-          subject.tell(new RegisterChangeListener(InstanceIdentifier.builder().build(), noOpDataChangeListener() , AsyncDataBroker.DataChangeScope.BASE), getRef());
+          subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, getRef().path() , AsyncDataBroker.DataChangeScope.BASE), getRef());
 
           final String out = new ExpectMsg<String>("match hint") {
             // do not put code outside this method, will run afterwards
@@ -97,4 +97,4 @@ public class ShardTest extends AbstractActorTest{
       }
     };
   }
-}
\ No newline at end of file
+}