From 9c0508d8b591e356f145d5d1a277c10965a647bb Mon Sep 17 00:00:00 2001 From: Moiz Raja Date: Mon, 23 Jun 2014 20:21:26 -0700 Subject: [PATCH] Implement DistributedDataStore#registerDataChangeListener Change-Id: I153c76b923dff7845321699d556f30f2ecadec57 Signed-off-by: Moiz Raja --- .../cluster/datastore/DataChangeListener.java | 30 +++ .../datastore/DistributedDataStore.java | 144 +++++++++++---- .../cluster/datastore/ListenerProxy.java | 28 +++ .../datastore/ListenerRegistrationProxy.java | 8 + .../controller/cluster/datastore/Shard.java | 172 ++++++++++-------- .../cluster/datastore/ShardManager.java | 4 + .../messages/RegisterChangeListener.java | 40 ++-- .../datastore/DistributedDataStoreTest.java | 4 +- .../cluster/datastore/ShardTest.java | 4 +- 9 files changed, 307 insertions(+), 127 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java new file mode 100644 index 0000000000..ba09d04025 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListener.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.Props; +import akka.actor.UntypedActor; +import akka.japi.Creator; + +public class DataChangeListener extends UntypedActor { + @Override public void onReceive(Object message) throws Exception { + throw new UnsupportedOperationException("onReceive"); + } + + public static Props props() { + return Props.create(new Creator() { + @Override + public DataChangeListener create() throws Exception { + return new DataChangeListener(); + } + + }); + + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java index c87f1abb21..29fc259bb7 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DistributedDataStore.java @@ -9,7 +9,14 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.ActorSystem; +import akka.util.Timeout; +import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; +import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.sal.core.spi.data.DOMStore; @@ -20,39 +27,114 @@ import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.opendaylight.yangtools.yang.model.api.SchemaContext; +import org.opendaylight.yangtools.yang.model.api.SchemaContextListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; + +import static akka.pattern.Patterns.ask; /** * */ -public class DistributedDataStore implements DOMStore { - private final ActorRef shardManager; - - public DistributedDataStore(ActorSystem actorSystem, String type) { - shardManager = actorSystem.actorOf(ShardManager.props(type)); - } - - @Override - public >> ListenerRegistration registerChangeListener(InstanceIdentifier path, L listener, AsyncDataBroker.DataChangeScope scope) { - return new ListenerRegistrationProxy(); - } - - @Override - public DOMStoreTransactionChain createTransactionChain() { - return new TransactionChainProxy(); - } - - @Override - public DOMStoreReadTransaction newReadOnlyTransaction() { - return new TransactionProxy(); - } - - @Override - public DOMStoreWriteTransaction newWriteOnlyTransaction() { - return new TransactionProxy(); - } - - @Override - public DOMStoreReadWriteTransaction newReadWriteTransaction() { - return new TransactionProxy(); - } +public class DistributedDataStore implements DOMStore, SchemaContextListener { + + private static final Logger + LOG = LoggerFactory.getLogger(DistributedDataStore.class); + + final FiniteDuration ASK_DURATION = Duration.create(5, TimeUnit.SECONDS); + final Duration AWAIT_DURATION = Duration.create(5, TimeUnit.SECONDS); + + private final ActorRef shardManager; + private final ActorSystem actorSystem; + private final String type; + + + public DistributedDataStore(ActorSystem actorSystem, String type) { + this.actorSystem = actorSystem; + this.type = type; + shardManager = actorSystem.actorOf(ShardManager.props(type)); + } + + @Override + public >> ListenerRegistration registerChangeListener( + InstanceIdentifier path, L listener, + AsyncDataBroker.DataChangeScope scope) { + + ActorSelection primary = findPrimary(); + + ActorRef dataChangeListenerActor = actorSystem.actorOf(DataChangeListener.props()); + + Object result = + getResult(primary, new RegisterChangeListener(path, dataChangeListenerActor.path(), + AsyncDataBroker.DataChangeScope.BASE), ASK_DURATION); + + RegisterChangeListenerReply reply = (RegisterChangeListenerReply) result; + return new ListenerRegistrationProxy(reply.getListenerRegistrationPath()); + } + + private ActorSelection findPrimary() { + Object result = getResult(shardManager, new FindPrimary(Shard.DEFAULT_NAME), ASK_DURATION); + + if(result instanceof PrimaryFound){ + PrimaryFound found = (PrimaryFound) result; + LOG.error("Primary found {}", found.getPrimaryPath()); + + return actorSystem.actorSelection(found.getPrimaryPath()); + } + throw new RuntimeException("primary was not found"); + } + + private Object getResult(ActorRef actor, Object message, FiniteDuration duration){ + Future future = + ask(actor, message, new Timeout(duration)); + + try { + return Await.result(future, AWAIT_DURATION); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private Object getResult(ActorSelection actor, Object message, FiniteDuration duration){ + Future future = + ask(actor, message, new Timeout(duration)); + + try { + return Await.result(future, AWAIT_DURATION); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + @Override + public DOMStoreTransactionChain createTransactionChain() { + return new TransactionChainProxy(); + } + + @Override + public DOMStoreReadTransaction newReadOnlyTransaction() { + return new TransactionProxy(); + } + + @Override + public DOMStoreWriteTransaction newWriteOnlyTransaction() { + return new TransactionProxy(); + } + + @Override + public DOMStoreReadWriteTransaction newReadWriteTransaction() { + return new TransactionProxy(); + } + + @Override public void onGlobalContextUpdated(SchemaContext schemaContext) { + shardManager.tell(new UpdateSchemaContext(schemaContext), null); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java new file mode 100644 index 0000000000..7c38ee5acb --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerProxy.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ + +package org.opendaylight.controller.cluster.datastore; + +import akka.actor.ActorSelection; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +public class ListenerProxy implements AsyncDataChangeListener>{ + private final ActorSelection listenerRegistrationActor; + + public ListenerProxy(ActorSelection listenerRegistrationActor) { + this.listenerRegistrationActor = listenerRegistrationActor; + } + + @Override public void onDataChanged( + AsyncDataChangeEvent> change) { + throw new UnsupportedOperationException("onDataChanged"); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java index c2fc8c0277..a548a885eb 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ListenerRegistrationProxy.java @@ -8,6 +8,7 @@ package org.opendaylight.controller.cluster.datastore; +import akka.actor.ActorPath; import org.opendaylight.yangtools.concepts.ListenerRegistration; /** @@ -17,6 +18,13 @@ import org.opendaylight.yangtools.concepts.ListenerRegistration; * The ListenerRegistrationProxy talks to a remote ListenerRegistration actor. */ public class ListenerRegistrationProxy implements ListenerRegistration { + private final ActorPath listenerRegistrationPath; + + public ListenerRegistrationProxy(ActorPath listenerRegistrationPath) { + + this.listenerRegistrationPath = listenerRegistrationPath; + } + @Override public Object getInstance() { throw new UnsupportedOperationException("getInstance"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index d75edc7922..5b4f7ef898 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -9,6 +9,7 @@ package org.opendaylight.controller.cluster.datastore; import akka.actor.ActorRef; +import akka.actor.ActorSelection; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; @@ -46,89 +47,112 @@ import java.util.concurrent.Executors; */ public class Shard extends UntypedProcessor { - public static final String DEFAULT_NAME = "default"; + public static final String DEFAULT_NAME = "default"; - private final ListeningExecutorService storeExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); + private final ListeningExecutorService storeExecutor = + MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); - private final InMemoryDOMDataStore store; + private final InMemoryDOMDataStore store; - private final Map modificationToCohort = new HashMap<>(); + private final Map + modificationToCohort = new HashMap<>(); - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + private final LoggingAdapter log = + Logging.getLogger(getContext().system(), this); - private Shard(String name){ - store = new InMemoryDOMDataStore(name, storeExecutor); - } - - public static Props props(final String name) { - return Props.create(new Creator() { + private Shard(String name) { + store = new InMemoryDOMDataStore(name, storeExecutor); + } - @Override - public Shard create() throws Exception { - return new Shard(name); - } + public static Props props(final String name) { + return Props.create(new Creator() { - }); - } + @Override + public Shard create() throws Exception { + return new Shard(name); + } - @Override - public void onReceive(Object message) throws Exception { - if (message instanceof CreateTransactionChain) { - createTransactionChain(); - } else if (message instanceof RegisterChangeListener) { - registerChangeListener((RegisterChangeListener) message); - } else if (message instanceof UpdateSchemaContext) { - updateSchemaContext((UpdateSchemaContext) message); - } else if (message instanceof ForwardedCommitTransaction ) { - handleForwardedCommit((ForwardedCommitTransaction) message); - } else if (message instanceof Persistent){ - commit((Persistent) message); + }); } - } - - private void commit(Persistent message) { - Modification modification = (Modification) message.payload(); - DOMStoreThreePhaseCommitCohort cohort = modificationToCohort.remove(modification); - if(cohort == null){ - log.error("Could not find cohort for modification : " + modification); - return; + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof CreateTransactionChain) { + createTransactionChain(); + } else if (message instanceof RegisterChangeListener) { + registerChangeListener((RegisterChangeListener) message); + } else if (message instanceof UpdateSchemaContext) { + updateSchemaContext((UpdateSchemaContext) message); + } else if (message instanceof ForwardedCommitTransaction) { + handleForwardedCommit((ForwardedCommitTransaction) message); + } else if (message instanceof Persistent) { + commit((Persistent) message); + } } - final ListenableFuture future = cohort.commit(); - final ActorRef sender = getSender(); - final ActorRef self = getSelf(); - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - sender.tell(new CommitTransactionReply(), self); - } catch (InterruptedException | ExecutionException e) { - log.error(e, "An exception happened when committing"); + + private void commit(Persistent message) { + Modification modification = (Modification) message.payload(); + DOMStoreThreePhaseCommitCohort cohort = + modificationToCohort.remove(modification); + if (cohort == null) { + log.error( + "Could not find cohort for modification : " + modification); + return; } - } - }, getContext().dispatcher()); - } - - private void handleForwardedCommit(ForwardedCommitTransaction message) { - log.info("received forwarded transaction"); - modificationToCohort.put(message.getModification(), message.getCohort()); - getSelf().forward(Persistent.create(message.getModification()), getContext()); - } - - private void updateSchemaContext(UpdateSchemaContext message) { - store.onGlobalContextUpdated(message.getSchemaContext()); - } - - private void registerChangeListener(RegisterChangeListener registerChangeListener) { - org.opendaylight.yangtools.concepts.ListenerRegistration>> registration = - store.registerChangeListener(registerChangeListener.getPath(), registerChangeListener.getListener(), registerChangeListener.getScope()); - ActorRef listenerRegistration = getContext().actorOf(ListenerRegistration.props(registration)); - getSender().tell(new RegisterChangeListenerReply(listenerRegistration.path()), getSelf()); - } - - private void createTransactionChain() { - DOMStoreTransactionChain chain = store.createTransactionChain(); - ActorRef transactionChain = getContext().actorOf(ShardTransactionChain.props(chain)); - getSender().tell(new CreateTransactionChainReply(transactionChain.path()), getSelf()); - } + final ListenableFuture future = cohort.commit(); + final ActorRef sender = getSender(); + final ActorRef self = getSelf(); + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + sender.tell(new CommitTransactionReply(), self); + } catch (InterruptedException | ExecutionException e) { + log.error(e, "An exception happened when committing"); + } + } + }, getContext().dispatcher()); + } + + private void handleForwardedCommit(ForwardedCommitTransaction message) { + log.info("received forwarded transaction"); + modificationToCohort + .put(message.getModification(), message.getCohort()); + getSelf().forward(Persistent.create(message.getModification()), + getContext()); + } + + private void updateSchemaContext(UpdateSchemaContext message) { + store.onGlobalContextUpdated(message.getSchemaContext()); + } + + private void registerChangeListener( + RegisterChangeListener registerChangeListener) { + + ActorSelection listenerRegistrationActor = getContext() + .system().actorSelection(registerChangeListener.getDataChangeListenerPath()); + + AsyncDataChangeListener> + listener = new ListenerProxy(listenerRegistrationActor); + + org.opendaylight.yangtools.concepts.ListenerRegistration>> + registration = + store.registerChangeListener(registerChangeListener.getPath(), + listener, registerChangeListener.getScope()); + ActorRef listenerRegistration = + getContext().actorOf(ListenerRegistration.props(registration)); + getSender() + .tell(new RegisterChangeListenerReply(listenerRegistration.path()), + getSelf()); + } + + private void createTransactionChain() { + DOMStoreTransactionChain chain = store.createTransactionChain(); + ActorRef transactionChain = + getContext().actorOf(ShardTransactionChain.props(chain)); + getSender() + .tell(new CreateTransactionChainReply(transactionChain.path()), + getSelf()); + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java index 8d8527a240..4e2369d375 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/ShardManager.java @@ -19,6 +19,7 @@ import akka.japi.Creator; import org.opendaylight.controller.cluster.datastore.messages.FindPrimary; import org.opendaylight.controller.cluster.datastore.messages.PrimaryFound; import org.opendaylight.controller.cluster.datastore.messages.PrimaryNotFound; +import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import java.util.HashMap; import java.util.List; @@ -92,6 +93,9 @@ public class ShardManager extends UntypedActor { } else { getSender().tell(new PrimaryNotFound(shardName), getSelf()); } + } else if(message instanceof UpdateSchemaContext){ + // FIXME : Notify all local shards of a context change + getContext().system().actorSelection(defaultShardPath).forward(message, getContext()); } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java index 0123a70147..7c9e4f0665 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/messages/RegisterChangeListener.java @@ -8,32 +8,34 @@ package org.opendaylight.controller.cluster.datastore.messages; +import akka.actor.ActorPath; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.yangtools.yang.data.api.InstanceIdentifier; -import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; public class RegisterChangeListener { - private final InstanceIdentifier path; - private final AsyncDataChangeListener> listener; - private final AsyncDataBroker.DataChangeScope scope; + private final InstanceIdentifier path; + private final ActorPath dataChangeListenerPath; + private final AsyncDataBroker.DataChangeScope scope; - public RegisterChangeListener(InstanceIdentifier path, AsyncDataChangeListener> listener, AsyncDataBroker.DataChangeScope scope) { - this.path = path; - this.listener = listener; - this.scope = scope; - } + public RegisterChangeListener(InstanceIdentifier path, + ActorPath dataChangeListenerPath, + AsyncDataBroker.DataChangeScope scope) { + this.path = path; + this.dataChangeListenerPath = dataChangeListenerPath; + this.scope = scope; + } - public InstanceIdentifier getPath() { - return path; - } + public InstanceIdentifier getPath() { + return path; + } - public AsyncDataChangeListener> getListener() { - return listener; - } - public AsyncDataBroker.DataChangeScope getScope() { - return scope; - } + public AsyncDataBroker.DataChangeScope getScope() { + return scope; + } + + public ActorPath getDataChangeListenerPath() { + return dataChangeListenerPath; + } } diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java index 45492fd714..2a9356e63d 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/DistributedDataStoreTest.java @@ -1,6 +1,7 @@ package org.opendaylight.controller.cluster.datastore; import junit.framework.Assert; +import org.opendaylight.controller.md.cluster.datastore.model.TestModel; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; @@ -19,6 +20,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{ @org.junit.Before public void setUp() throws Exception { distributedDataStore = new DistributedDataStore(getSystem(), "config"); + distributedDataStore.onGlobalContextUpdated(TestModel.createTestContext()); } @org.junit.After @@ -29,7 +31,7 @@ public class DistributedDataStoreTest extends AbstractActorTest{ @org.junit.Test public void testRegisterChangeListener() throws Exception { ListenerRegistration registration = - distributedDataStore.registerChangeListener(InstanceIdentifier.builder().build(), new AsyncDataChangeListener>() { + distributedDataStore.registerChangeListener(TestModel.TEST_PATH, new AsyncDataChangeListener>() { @Override public void onDataChanged(AsyncDataChangeEvent> change) { throw new UnsupportedOperationException("onDataChanged"); diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java index a9d8042ce2..48365fa1a0 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/ShardTest.java @@ -63,7 +63,7 @@ public class ShardTest extends AbstractActorTest{ subject.tell(new UpdateSchemaContext(TestModel.createTestContext()), getRef()); - subject.tell(new RegisterChangeListener(InstanceIdentifier.builder().build(), noOpDataChangeListener() , AsyncDataBroker.DataChangeScope.BASE), getRef()); + subject.tell(new RegisterChangeListener(TestModel.TEST_PATH, getRef().path() , AsyncDataBroker.DataChangeScope.BASE), getRef()); final String out = new ExpectMsg("match hint") { // do not put code outside this method, will run afterwards @@ -97,4 +97,4 @@ public class ShardTest extends AbstractActorTest{ } }; } -} \ No newline at end of file +} -- 2.36.6