--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Optional;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EventListener;
+import java.util.Map.Entry;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.ListenerRegistrationMessage;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class AbstractDataListenerSupport<L extends EventListener, R extends ListenerRegistrationMessage,
+ D extends DelayedListenerRegistration<L, R>, LR extends ListenerRegistration<L>>
+ extends LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ArrayList<D> delayedListenerRegistrations = new ArrayList<>();
+ private final ArrayList<D> delayedListenerOnAllRegistrations = new ArrayList<>();
+ private final Collection<ActorSelection> actors = new ArrayList<>();
+
+ protected AbstractDataListenerSupport(Shard shard) {
+ super(shard);
+ }
+
+ @Override
+ void onLeadershipChange(boolean isLeader, boolean hasLeader) {
+ log.debug("{}: onLeadershipChange, isLeader: {}, hasLeader : {}", persistenceId(), isLeader, hasLeader);
+
+ final EnableNotification msg = new EnableNotification(isLeader);
+ for(ActorSelection dataChangeListener : actors) {
+ dataChangeListener.tell(msg, getSelf());
+ }
+
+ if(hasLeader) {
+ for(D reg : delayedListenerOnAllRegistrations) {
+ reg.createDelegate(this);
+ }
+
+ delayedListenerOnAllRegistrations.clear();
+ delayedListenerOnAllRegistrations.trimToSize();
+ }
+
+ if(isLeader) {
+ for(D reg : delayedListenerRegistrations) {
+ reg.createDelegate(this);
+ }
+
+ delayedListenerRegistrations.clear();
+ delayedListenerRegistrations.trimToSize();
+ }
+ }
+
+ @Override
+ void onMessage(R message, boolean isLeader, boolean hasLeader) {
+ log.debug("{}: {} for {}, leader: {}", persistenceId(), logName(), message.getPath(), isLeader);
+
+ final ListenerRegistration<L> registration;
+ if((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
+ final Entry<LR, Optional<DataTreeCandidate>> res = createDelegate(message);
+ registration = res.getKey();
+ } else {
+ log.debug("{}: Shard is not the leader - delaying registration", persistenceId());
+
+ D delayedReg = newDelayedListenerRegistration(message);
+ if(message.isRegisterOnAllInstances()) {
+ delayedListenerOnAllRegistrations.add(delayedReg);
+ } else {
+ delayedListenerRegistrations.add(delayedReg);
+ }
+
+ registration = delayedReg;
+ }
+
+ ActorRef registrationActor = newRegistrationActor(registration);
+
+ log.debug("{}: {} sending reply, listenerRegistrationPath = {} ", persistenceId(), logName(),
+ registrationActor.path());
+
+ tellSender(newRegistrationReplyMessage(registrationActor));
+ }
+
+ protected Logger log() {
+ return log;
+ }
+
+ protected void addListenerActor(ActorSelection actor) {
+ actors.add(actor);
+ }
+
+ protected abstract D newDelayedListenerRegistration(R message);
+
+ protected abstract ActorRef newRegistrationActor(ListenerRegistration<L> registration);
+
+ protected abstract Object newRegistrationReplyMessage(ActorRef registrationActor);
+
+ protected abstract String logName();
+}
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-final class DataChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterChangeListener,
- DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
- Optional<DataTreeCandidate>> {
- private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class);
- private final List<DelayedListenerRegistration> delayedListenerRegistrations = new ArrayList<>();
- private final List<ActorSelection> dataChangeListeners = new ArrayList<>();
- private final List<DelayedListenerRegistration> delayedRegisterOnAllListeners = new ArrayList<>();
+final class DataChangeListenerSupport extends AbstractDataListenerSupport<
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener,
+ DelayedDataChangeListenerRegistration, DataChangeListenerRegistration<
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>> {
DataChangeListenerSupport(final Shard shard) {
super(shard);
}
- @Override
- void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
- LOG.debug("onLeadershipChange, isLeader: {}, hasLeader : {}", isLeader, hasLeader);
-
- for (ActorSelection dataChangeListener : dataChangeListeners) {
- dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
- }
-
- if(hasLeader) {
- for (DelayedListenerRegistration reg : delayedRegisterOnAllListeners) {
- registerDelayedListeners(reg);
- }
- delayedRegisterOnAllListeners.clear();
- }
-
- if (isLeader) {
- for (DelayedListenerRegistration reg: delayedListenerRegistrations) {
- registerDelayedListeners(reg);
- }
-
- delayedListenerRegistrations.clear();
- }
- }
-
- private void registerDelayedListeners(DelayedListenerRegistration reg) {
- if(!reg.isClosed()) {
- final Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
- Optional<DataTreeCandidate>> res = createDelegate(reg.getRegisterChangeListener());
- reg.setDelegate(res.getKey());
- getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue());
- }
- }
-
- @Override
- void onMessage(final RegisterChangeListener message, final boolean isLeader, boolean hasLeader) {
-
- LOG.debug("{}: registerDataChangeListener for {}, isLeader: {}, hasLeader : {}",
- persistenceId(), message.getPath(), isLeader, hasLeader);
-
- final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration;
- if ((hasLeader && message.isRegisterOnAllInstances()) || isLeader) {
- final Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
- Optional<DataTreeCandidate>> res = createDelegate(message);
- registration = res.getKey();
-
- getShard().getDataStore().notifyOfInitialData(res.getKey(), res.getValue());
- } else {
- LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
-
- DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message);
- if(message.isRegisterOnAllInstances()) {
- delayedRegisterOnAllListeners.add(delayedReg);
- } else {
- delayedListenerRegistrations.add(delayedReg);
- }
- registration = delayedReg;
- }
-
- ActorRef listenerRegistration = createActor(DataChangeListenerRegistrationActor.props(registration));
-
- LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- persistenceId(), listenerRegistration.path());
-
- tellSender(new RegisterChangeListenerReply(listenerRegistration));
- }
-
@Override
Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
Optional<DataTreeCandidate>> createDelegate(final RegisterChangeListener message) {
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
if(!message.isRegisterOnAllInstances()) {
- dataChangeListeners.add(dataChangeListenerPath);
+ addListenerActor(dataChangeListenerPath);
}
AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> listener =
new DataChangeListenerProxy(dataChangeListenerPath);
- LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
+ log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
- return getShard().getDataStore().registerChangeListener(message.getPath(), listener,
- message.getScope());
+ Entry<DataChangeListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>>,
+ Optional<DataTreeCandidate>> regEntry = getShard().getDataStore().registerChangeListener(
+ message.getPath(), listener, message.getScope());
+
+ getShard().getDataStore().notifyOfInitialData(regEntry.getKey(), regEntry.getValue());
+
+ return regEntry;
+ }
+
+ @Override
+ protected DelayedDataChangeListenerRegistration newDelayedListenerRegistration(RegisterChangeListener message) {
+ return new DelayedDataChangeListenerRegistration(message);
+ }
+
+ @Override
+ protected ActorRef newRegistrationActor(
+ ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> registration) {
+ return createActor(DataChangeListenerRegistrationActor.props(registration));
+ }
+
+ @Override
+ protected Object newRegistrationReplyMessage(ActorRef registrationActor) {
+ return new RegisterChangeListenerReply(registrationActor);
+ }
+
+ @Override
+ protected String logName() {
+ return "registerDataChangeListener";
}
}
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
Future<Object> future = actorContext.executeOperationAsync(shard,
- new RegisterDataTreeChangeListener(path, dataChangeListenerActor),
+ new RegisterDataTreeChangeListener(path, dataChangeListenerActor,
+ getInstance() instanceof ClusteredDOMDataTreeChangeListener),
actorContext.getDatastoreContext().getShardInitializationTimeout());
future.onComplete(new OnComplete<Object>(){
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import com.google.common.base.Optional;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Map.Entry;
import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener,
- ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> {
- private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
- private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
- private final Collection<ActorSelection> actors = new ArrayList<>();
+final class DataTreeChangeListenerSupport extends AbstractDataListenerSupport<DOMDataTreeChangeListener,
+ RegisterDataTreeChangeListener, DelayedDataTreeListenerRegistration, ListenerRegistration<DOMDataTreeChangeListener>> {
DataTreeChangeListenerSupport(final Shard shard) {
super(shard);
}
@Override
- void onLeadershipChange(final boolean isLeader, boolean hasLeader) {
- final EnableNotification msg = new EnableNotification(isLeader);
- for (ActorSelection dataChangeListener : actors) {
- dataChangeListener.tell(msg, getSelf());
- }
-
- if (isLeader) {
- for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
- reg.createDelegate(this);
- }
- delayedRegistrations.clear();
- delayedRegistrations.trimToSize();
- }
- }
-
- @Override
- void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader, boolean hasLeader) {
- LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", persistenceId(), registerTreeChangeListener.getPath(), isLeader);
-
- final ListenerRegistration<DOMDataTreeChangeListener> registration;
- if (!isLeader) {
- LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId());
-
- DelayedDataTreeListenerRegistration delayedReg =
- new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
- delayedRegistrations.add(delayedReg);
- registration = delayedReg;
- } else {
- final Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> res =
- createDelegate(registerTreeChangeListener);
- registration = res.getKey();
- getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(),
- registration.getInstance(), res.getValue());
- }
-
- ActorRef listenerRegistration = createActor(DataTreeChangeListenerRegistrationActor.props(registration));
-
- LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
- persistenceId(), listenerRegistration.path());
-
- tellSender(new RegisterDataTreeChangeListenerReply(listenerRegistration));
- }
-
- @Override
- Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(final RegisterDataTreeChangeListener message) {
+ Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> createDelegate(
+ final RegisterDataTreeChangeListener message) {
ActorSelection dataChangeListenerPath = selectActor(message.getDataTreeChangeListenerPath());
// Notify the listener if notifications should be enabled or not
// Now store a reference to the data change listener so it can be notified
// at a later point if notifications should be enabled or disabled
- actors.add(dataChangeListenerPath);
+ if(!message.isRegisterOnAllInstances()) {
+ addListenerActor(dataChangeListenerPath);
+ }
DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
- LOG.debug("{}: Registering for path {}", persistenceId(), message.getPath());
+ log().debug("{}: Registering for path {}", persistenceId(), message.getPath());
+
+ Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> regEntry =
+ getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
+
+ getShard().getDataStore().notifyOfInitialData(message.getPath(),
+ regEntry.getKey().getInstance(), regEntry.getValue());
+
+ return regEntry;
+ }
+
+ @Override
+ protected DelayedDataTreeListenerRegistration newDelayedListenerRegistration(RegisterDataTreeChangeListener message) {
+ return new DelayedDataTreeListenerRegistration(message);
+ }
- return getShard().getDataStore().registerTreeChangeListener(message.getPath(), listener);
+ @Override
+ protected ActorRef newRegistrationActor(ListenerRegistration<DOMDataTreeChangeListener> registration) {
+ return createActor(DataTreeChangeListenerRegistrationActor.props(registration));
+ }
+
+ @Override
+ protected Object newRegistrationReplyMessage(ActorRef registrationActor) {
+ return new RegisterDataTreeChangeListenerReply(registrationActor);
+ }
+
+ @Override
+ protected String logName() {
+ return "registerTreeChangeListener";
}
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
+import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+
+final class DelayedDataChangeListenerRegistration extends DelayedListenerRegistration<
+ AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>, RegisterChangeListener> {
+
+ DelayedDataChangeListenerRegistration(final RegisterChangeListener registerChangeListener) {
+ super(registerChangeListener);
+ }
+}
\ No newline at end of file
*/
package org.opendaylight.controller.cluster.datastore;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import java.util.Map.Entry;
-import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
-import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
/**
* Intermediate proxy registration returned to the user when we cannot
* instantiate the registration immediately. It provides a bridge to
* a real registration which may materialize at some point in the future.
*/
-final class DelayedDataTreeListenerRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
- private final RegisterDataTreeChangeListener registerTreeChangeListener;
- private volatile ListenerRegistration<DOMDataTreeChangeListener> delegate;
- @GuardedBy("this")
- private boolean closed;
+final class DelayedDataTreeListenerRegistration
+ extends DelayedListenerRegistration<DOMDataTreeChangeListener, RegisterDataTreeChangeListener> {
DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
- this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
- }
-
- synchronized void createDelegate(final LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> factory) {
- if (!closed) {
- final Entry<ListenerRegistration<DOMDataTreeChangeListener>, Optional<DataTreeCandidate>> res =
- factory.createDelegate(registerTreeChangeListener);
- this.delegate = res.getKey();
- factory.getShard().getDataStore().notifyOfInitialData(registerTreeChangeListener.getPath(),
- this.delegate.getInstance(), res.getValue());
- }
- }
-
- @Override
- public DOMDataTreeChangeListener getInstance() {
- final ListenerRegistration<DOMDataTreeChangeListener> d = delegate;
- return d == null ? null : d.getInstance();
- }
-
- @Override
- public synchronized void close() {
- if (!closed) {
- closed = true;
- if (delegate != null) {
- delegate.close();
- }
- }
+ super(registerTreeChangeListener);
}
}
/*
- * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 which accompanies this distribution,
*/
package org.opendaylight.controller.cluster.datastore;
-import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
-import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import com.google.common.base.Optional;
+import java.util.EventListener;
+import java.util.Map.Entry;
+import javax.annotation.concurrent.GuardedBy;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
-import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
-final class DelayedListenerRegistration implements
- ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>>> {
+abstract class DelayedListenerRegistration<L extends EventListener, R> implements ListenerRegistration<L> {
+ private final R registrationMessage;
+ private volatile ListenerRegistration<L> delegate;
- private volatile boolean closed;
+ @GuardedBy("this")
+ private boolean closed;
- private final RegisterChangeListener registerChangeListener;
-
- private volatile ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> delegate;
-
- DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) {
- this.registerChangeListener = registerChangeListener;
+ protected DelayedListenerRegistration(R registrationMessage) {
+ this.registrationMessage = registrationMessage;
}
- void setDelegate( final ListenerRegistration<AsyncDataChangeListener<YangInstanceIdentifier,
- NormalizedNode<?, ?>>> registration) {
- this.delegate = registration;
+ R getRegistrationMessage() {
+ return registrationMessage;
}
- boolean isClosed() {
- return closed;
+ ListenerRegistration<L> getDelegate() {
+ return delegate;
}
- RegisterChangeListener getRegisterChangeListener() {
- return registerChangeListener;
+ synchronized <LR extends ListenerRegistration<L>> void createDelegate(
+ final LeaderLocalDelegateFactory<R, LR, Optional<DataTreeCandidate>> factory) {
+ if (!closed) {
+ final Entry<LR, Optional<DataTreeCandidate>> res = factory.createDelegate(registrationMessage);
+ this.delegate = res.getKey();
+ }
}
@Override
- public AsyncDataChangeListener<YangInstanceIdentifier, NormalizedNode<?, ?>> getInstance() {
- return delegate != null ? delegate.getInstance() : null;
+ public L getInstance() {
+ final ListenerRegistration<L> d = delegate;
+ return d == null ? null : (L)d.getInstance();
}
@Override
- public void close() {
- closed = true;
- if(delegate != null) {
- delegate.close();
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+ if (delegate != null) {
+ delegate.close();
+ }
}
}
-}
\ No newline at end of file
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+public interface ListenerRegistrationMessage {
+ YangInstanceIdentifier getPath();
+
+ boolean isRegisterOnAllInstances();
+}
import org.opendaylight.controller.protobuff.messages.registration.ListenerRegistrationMessages;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
-public class RegisterChangeListener implements SerializableMessage {
+public class RegisterChangeListener implements SerializableMessage, ListenerRegistrationMessage {
public static final Class<ListenerRegistrationMessages.RegisterChangeListener> SERIALIZABLE_CLASS =
ListenerRegistrationMessages.RegisterChangeListener.class;
this.registerOnAllInstances = registerOnAllInstances;
}
+ @Override
public YangInstanceIdentifier getPath() {
return path;
}
return dataChangeListener.path();
}
+ @Override
public boolean isRegisterOnAllInstances() {
return registerOnAllInstances;
}
* Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard
* leader.
*/
-public final class RegisterDataTreeChangeListener implements Externalizable {
+public final class RegisterDataTreeChangeListener implements Externalizable, ListenerRegistrationMessage {
private static final long serialVersionUID = 1L;
private ActorRef dataTreeChangeListenerPath;
private YangInstanceIdentifier path;
+ private boolean registerOnAllInstances;
- public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) {
+ public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath,
+ final boolean registerOnAllInstances) {
this.path = Preconditions.checkNotNull(path);
this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath);
+ this.registerOnAllInstances = registerOnAllInstances;
}
+ @Override
public YangInstanceIdentifier getPath() {
return path;
}
return dataTreeChangeListenerPath;
}
+ @Override
+ public boolean isRegisterOnAllInstances() {
+ return registerOnAllInstances;
+ }
+
@Override
public void writeExternal(final ObjectOutput out) throws IOException {
out.writeObject(dataTreeChangeListenerPath);
SerializationUtils.serializePath(path, out);
+ out.writeBoolean(registerOnAllInstances);
}
@Override
public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
dataTreeChangeListenerPath = (ActorRef) in.readObject();
path = SerializationUtils.deserializePath(in);
+ registerOnAllInstances = in.readBoolean();
}
}
import org.opendaylight.controller.cluster.datastore.identifiers.ShardIdentifier;
import org.opendaylight.controller.cluster.datastore.modification.MutableCompositeModification;
import org.opendaylight.controller.cluster.datastore.modification.WriteModification;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
import org.opendaylight.controller.cluster.raft.utils.InMemoryJournal;
import org.opendaylight.controller.cluster.raft.utils.InMemorySnapshotStore;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
shardJournalRecoveryLogBatchSize(3).shardSnapshotBatchCount(5000).
shardHeartbeatIntervalInMillis(100);
+ protected final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
@Before
public void setUp() {
InMemorySnapshotStore.clear();
public void tearDown() {
InMemorySnapshotStore.clear();
InMemoryJournal.clear();
+ actorFactory.close();
}
protected DatastoreContext newDatastoreContext() {
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.cluster.datastore.utils.DoNothingActor;
import org.opendaylight.controller.md.cluster.datastore.model.TestModel;
+import org.opendaylight.controller.md.sal.dom.api.ClusteredDOMDataTreeChangeListener;
import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
import scala.concurrent.ExecutionContextExecutor;
RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("isRegisterOnAllInstances", false, registerMsg.isRegisterOnAllInstances());
reply(new RegisterDataTreeChangeListenerReply(getRef()));
}};
}
+ @Test(timeout=10000)
+ public void testSuccessfulRegistrationForClusteredListener() {
+ new JavaTestKit(getSystem()) {{
+ ActorContext actorContext = new ActorContext(getSystem(), getRef(),
+ mock(ClusterWrapper.class), mock(Configuration.class));
+
+ ClusteredDOMDataTreeChangeListener mockClusteredListener = mock(ClusteredDOMDataTreeChangeListener.class);
+
+ final DataTreeChangeListenerProxy<ClusteredDOMDataTreeChangeListener> proxy =
+ new DataTreeChangeListenerProxy<>(actorContext, mockClusteredListener);
+
+ final YangInstanceIdentifier path = YangInstanceIdentifier.of(TestModel.TEST_QNAME);
+ new Thread() {
+ @Override
+ public void run() {
+ proxy.init("shard-1", path);
+ }
+
+ }.start();
+
+ FiniteDuration timeout = duration("5 seconds");
+ FindLocalShard findLocalShard = expectMsgClass(timeout, FindLocalShard.class);
+ Assert.assertEquals("getShardName", "shard-1", findLocalShard.getShardName());
+
+ reply(new LocalShardFound(getRef()));
+
+ RegisterDataTreeChangeListener registerMsg = expectMsgClass(timeout, RegisterDataTreeChangeListener.class);
+ Assert.assertEquals("getPath", path, registerMsg.getPath());
+ Assert.assertEquals("isRegisterOnAllInstances", true, registerMsg.isRegisterOnAllInstances());
+ }};
+ }
+
@Test(timeout=10000)
public void testLocalShardNotFound() {
new JavaTestKit(getSystem()) {{
int expectedEvents, boolean isLeader) {
MockDataTreeChangeListener listener = new MockDataTreeChangeListener(expectedEvents);
ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener));
- support.onMessage(new RegisterDataTreeChangeListener(path, dclActor), isLeader, true);
+ support.onMessage(new RegisterDataTreeChangeListener(path, dclActor, false), isLeader, true);
return listener;
}
final ActorRef dclActor = getSystem().actorOf(DataTreeChangeListenerActor.props(listener),
"testRegisterDataTreeChangeListener-DataTreeChangeListener");
- shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor), getRef());
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, false), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("3 seconds"),
RegisterDataTreeChangeListenerReply.class);
assertEquals("Got first ElectionTimeout", true,
onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
- shard.tell(new RegisterDataTreeChangeListener(path, dclActor), getRef());
+ shard.tell(new RegisterDataTreeChangeListener(path, dclActor, false), getRef());
final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterDataTreeChangeListenerReply.class);
assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
}
@Test
- public void testClusteredDataChangeListernerDelayedRegistration() throws Exception {
+ public void testClusteredDataChangeListenerDelayedRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- dataStoreContextBuilder.persistent(false);
- final CountDownLatch onFirstElectionTimeout = new CountDownLatch(1);
- final CountDownLatch onChangeListenerRegistered = new CountDownLatch(1);
- final Creator<Shard> creator = new Creator<Shard>() {
- private static final long serialVersionUID = 1L;
- boolean firstElectionTimeout = true;
-
- @Override
- public Shard create() throws Exception {
- return new Shard(newShardBuilder()) {
- @Override
- public void onReceiveCommand(final Object message) throws Exception {
- if(message instanceof ElectionTimeout && firstElectionTimeout) {
- firstElectionTimeout = false;
- final ActorRef self = getSelf();
- new Thread() {
- @Override
- public void run() {
- Uninterruptibles.awaitUninterruptibly(
- onChangeListenerRegistered, 5, TimeUnit.SECONDS);
- self.tell(message, self);
- }
- }.start();
-
- onFirstElectionTimeout.countDown();
- } else {
- super.onReceiveCommand(message);
- }
- }
- };
- }
- };
+ String testName = "testClusteredDataChangeListenerDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
- "testDataChangeListenerOnFollower-DataChangeListener");
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(creator)).withDispatcher(Dispatchers.DefaultDispatcherId()).
- withDispatcher(Dispatchers.DefaultDispatcherId()),"testDataChangeListenerOnFollower");
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
- assertEquals("Got first ElectionTimeout", true,
- onFirstElectionTimeout.await(5, TimeUnit.SECONDS));
-
- shard.tell(new FindLeader(), getRef());
- final FindLeaderReply findLeadeReply =
- expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertNull("Expected the shard not to be the leader", findLeadeReply.getLeaderActor());
+ waitUntilNoLeader(shard);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- onChangeListenerRegistered.countDown();
+ shard.tell(new ElectionTimeout(), ActorRef.noSender());
listener.waitForChangeEvents();
-
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
}};
}
@Test
- public void testClusteredDataChangeListernerRegistration() throws Exception {
- dataStoreContextBuilder.persistent(false).build();
+ public void testClusteredDataChangeListenerRegistration() throws Exception {
new ShardTestKit(getSystem()) {{
- final ShardIdentifier member1ShardID = ShardIdentifier.builder().memberName("member-1")
- .shardName("inventory").type("config").build();
-
- final ShardIdentifier member2ShardID = ShardIdentifier.builder().memberName("member-2")
- .shardName("inventory").type("config").build();
- final Creator<Shard> followerShardCreator = new Creator<Shard>() {
- private static final long serialVersionUID = 1L;
+ String testName = "testClusteredDataChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+ final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+ Shard.builder().id(followerShardID).
+ datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+ peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+ Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
- @Override
- public Shard create() throws Exception {
- return new Shard(Shard.builder().id(member1ShardID).datastoreContext(newDatastoreContext()).
- peerAddresses(Collections.singletonMap(member2ShardID.toString(),
- "akka://test/user/" + member2ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {
- @Override
- public void onReceiveCommand(final Object message) throws Exception {
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataChangeListener listener = new MockDataChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataChangeListener.props(listener),
+ actorFactory.generateActorId(testName + "-DataChangeListener"));
- if(!(message instanceof ElectionTimeout)) {
- super.onReceiveCommand(message);
- }
- }
- };
- }
- };
+ followerShard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
+ final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterChangeListenerReply.class);
+ assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
- final Creator<Shard> leaderShardCreator = new Creator<Shard>() {
- private static final long serialVersionUID = 1L;
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
- @Override
- public Shard create() throws Exception {
- return new Shard(Shard.builder().id(member2ShardID).datastoreContext(newDatastoreContext()).
- peerAddresses(Collections.singletonMap(member1ShardID.toString(),
- "akka://test/user/" + member1ShardID.toString())).schemaContext(SCHEMA_CONTEXT)) {};
- }
- };
+ listener.waitForChangeEvents();
+ }};
+ }
+ @Test
+ public void testClusteredDataTreeChangeListenerDelayedRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataTreeChangeListenerDelayedRegistration";
+ dataStoreContextBuilder.shardElectionTimeoutFactor(1000);
- final TestActorRef<Shard> shard = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(followerShardCreator)),
- member1ShardID.toString());
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
- final TestActorRef<Shard> shardLeader = TestActorRef.create(getSystem(),
- Props.create(new DelegatingShardCreator(leaderShardCreator)).withDispatcher(Dispatchers.DefaultDispatcherId()),
- member2ShardID.toString());
- // Sleep to let election happen
- Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ final TestActorRef<Shard> shard = actorFactory.createTestActor(
+ newShardBuilder().props().withDispatcher(Dispatchers.DefaultDispatcherId()),
+ actorFactory.generateActorId(testName + "-shard"));
- shard.tell(new FindLeader(), getRef());
- final FindLeaderReply findLeaderReply =
- expectMsgClass(duration("5 seconds"), FindLeaderReply.class);
- assertEquals("Shard leader does not match", shardLeader.path().toString(), findLeaderReply.getLeaderActor());
+ waitUntilNoLeader(shard);
final YangInstanceIdentifier path = TestModel.TEST_PATH;
- final MockDataChangeListener listener = new MockDataChangeListener(1);
- final ActorRef dclActor = getSystem().actorOf(DataChangeListener.props(listener),
- "testDataChangeListenerOnFollower-DataChangeListener");
- shard.tell(new RegisterChangeListener(path, dclActor, AsyncDataBroker.DataChangeScope.BASE, true), getRef());
- final RegisterChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
- RegisterChangeListenerReply.class);
- assertNotNull("getListenerRegistratioznPath", reply.getListenerRegistrationPath());
+ shard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
writeToStore(shard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+ shard.tell(new ElectionTimeout(), ActorRef.noSender());
+
listener.waitForChangeEvents();
+ }};
+ }
- dclActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
- shard.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ @Test
+ public void testClusteredDataTreeChangeListenerRegistration() throws Exception {
+ new ShardTestKit(getSystem()) {{
+ String testName = "testClusteredDataTreeChangeListenerRegistration";
+ final ShardIdentifier followerShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-follower")).shardName("inventory").type("config").build();
+
+ final ShardIdentifier leaderShardID = ShardIdentifier.builder().memberName(
+ actorFactory.generateActorId(testName + "-leader")).shardName("inventory").type("config").build();
+
+ final TestActorRef<Shard> followerShard = actorFactory.createTestActor(
+ Shard.builder().id(followerShardID).
+ datastoreContext(dataStoreContextBuilder.shardElectionTimeoutFactor(1000).build()).
+ peerAddresses(Collections.singletonMap(leaderShardID.toString(),
+ "akka://test/user/" + leaderShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), followerShardID.toString());
+
+ final TestActorRef<Shard> leaderShard = actorFactory.createTestActor(
+ Shard.builder().id(leaderShardID).datastoreContext(newDatastoreContext()).
+ peerAddresses(Collections.singletonMap(followerShardID.toString(),
+ "akka://test/user/" + followerShardID.toString())).schemaContext(SCHEMA_CONTEXT).props().
+ withDispatcher(Dispatchers.DefaultDispatcherId()), leaderShardID.toString());
+
+ leaderShard.tell(new ElectionTimeout(), ActorRef.noSender());
+ String leaderPath = waitUntilLeader(followerShard);
+ assertEquals("Shard leader path", leaderShard.path().toString(), leaderPath);
+
+ final YangInstanceIdentifier path = TestModel.TEST_PATH;
+ final MockDataTreeChangeListener listener = new MockDataTreeChangeListener(1);
+ final ActorRef dclActor = actorFactory.createActor(DataTreeChangeListenerActor.props(listener),
+ actorFactory.generateActorId(testName + "-DataTreeChangeListener"));
+
+ followerShard.tell(new RegisterDataTreeChangeListener(TestModel.TEST_PATH, dclActor, true), getRef());
+ final RegisterDataTreeChangeListenerReply reply = expectMsgClass(duration("5 seconds"),
+ RegisterDataTreeChangeListenerReply.class);
+ assertNotNull("getListenerRegistrationPath", reply.getListenerRegistrationPath());
+
+ writeToStore(followerShard, path, ImmutableNodes.containerNode(TestModel.TEST_QNAME));
+
+ listener.waitForChangeEvents();
}};
}
}
- public void waitUntilLeader(ActorRef shard) {
+ public String waitUntilLeader(ActorRef shard) {
FiniteDuration duration = Duration.create(100, TimeUnit.MILLISECONDS);
for(int i = 0; i < 20 * 5; i++) {
Future<Object> future = Patterns.ask(shard, new FindLeader(), new Timeout(duration));
try {
FindLeaderReply resp = (FindLeaderReply)Await.result(future, duration);
if(resp.getLeaderActor() != null) {
- return;
+ return resp.getLeaderActor();
}
} catch(TimeoutException e) {
} catch(Exception e) {
}
Assert.fail("Leader not found for shard " + shard.path());
+ return null;
}
public void waitUntilNoLeader(ActorRef shard) {