output = new DataOutputStream(stream);
}
- public NormalizedNodeOutputStreamWriter(DataOutput output) throws IOException {
+ public NormalizedNodeOutputStreamWriter(DataOutput output) {
this.output = Preconditions.checkNotNull(output);
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChangedReply;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Proxy actor which acts as a facade to the user-provided listener. Responsible for decapsulating
+ * DataTreeChanged messages and dispatching their context to the user.
+ */
+final class DataTreeChangeListenerActor extends AbstractUntypedActor {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerActor.class);
+ private final DOMDataTreeChangeListener listener;
+ private boolean notificationsEnabled = false;
+
+ private DataTreeChangeListenerActor(final DOMDataTreeChangeListener listener) {
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ protected void handleReceive(final Object message) {
+ if (message instanceof DataTreeChanged) {
+ dataChanged((DataTreeChanged)message);
+ } else if (message instanceof EnableNotification) {
+ enableNotification((EnableNotification) message);
+ }
+ }
+
+ private void dataChanged(final DataTreeChanged message) {
+ // Do nothing if notifications are not enabled
+ if (!notificationsEnabled) {
+ LOG.debug("Notifications not enabled for listener {} - dropping change notification", listener);
+ return;
+ }
+
+ LOG.debug("Sending change notification {} to listener {}", message.getChanges(), listener);
+
+ try {
+ this.listener.onDataTreeChanged(message.getChanges());
+ } catch (Exception e) {
+ LOG.error("Error notifying listener {}", this.listener, e);
+ }
+
+ // TODO: do we really need this?
+ // It seems the sender is never null but it doesn't hurt to check. If the caller passes in
+ // a null sender (ActorRef.noSender()), akka translates that to the deadLetters actor.
+ if (getSender() != null && !getContext().system().deadLetters().equals(getSender())) {
+ getSender().tell(DataTreeChangedReply.getInstance(), getSelf());
+ }
+ }
+
+ private void enableNotification(final EnableNotification message) {
+ notificationsEnabled = message.isEnabled();
+ LOG.debug("{} notifications for listener {}", (notificationsEnabled ? "Enabled" : "Disabled"),
+ listener);
+ }
+
+ public static Props props(final DOMDataTreeChangeListener listener) {
+ return Props.create(new DataTreeChangeListenerCreator(listener));
+ }
+
+ private static final class DataTreeChangeListenerCreator implements Creator<DataTreeChangeListenerActor> {
+ private static final long serialVersionUID = 1L;
+ private final DOMDataTreeChangeListener listener;
+
+ DataTreeChangeListenerCreator(final DOMDataTreeChangeListener listener) {
+ this.listener = Preconditions.checkNotNull(listener);
+ }
+
+ @Override
+ public DataTreeChangeListenerActor create() {
+ return new DataTreeChangeListenerActor(listener);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import akka.actor.PoisonPill;
+import akka.dispatch.OnComplete;
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.exceptions.LocalShardNotFoundException;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.utils.ActorContext;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.AbstractListenerRegistration;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+
+/**
+ * Proxy class for holding required state to lazily instantiate a listener registration with an
+ * asynchronously-discovered actor.
+ *
+ * @param <T> listener type
+ */
+final class DataTreeChangeListenerProxy<T extends DOMDataTreeChangeListener> extends AbstractListenerRegistration<T> {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerProxy.class);
+ private final ActorRef dataChangeListenerActor;
+ private final ActorContext actorContext;
+
+ @GuardedBy("this")
+ private ActorSelection listenerRegistrationActor;
+
+ public DataTreeChangeListenerProxy(final ActorContext actorContext, final T listener) {
+ super(listener);
+ this.actorContext = Preconditions.checkNotNull(actorContext);
+ this.dataChangeListenerActor = actorContext.getActorSystem().actorOf(
+ DataTreeChangeListenerActor.props(getInstance()).withDispatcher(actorContext.getNotificationDispatcherPath()));
+ }
+
+ @Override
+ protected synchronized void removeRegistration() {
+ if (listenerRegistrationActor != null) {
+ listenerRegistrationActor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), ActorRef.noSender());
+ listenerRegistrationActor = null;
+ }
+
+ dataChangeListenerActor.tell(PoisonPill.getInstance(), ActorRef.noSender());
+ }
+
+ void init(final String shardName, final YangInstanceIdentifier treeId) {
+ Future<ActorRef> findFuture = actorContext.findLocalShardAsync(shardName);
+ findFuture.onComplete(new OnComplete<ActorRef>() {
+ @Override
+ public void onComplete(final Throwable failure, final ActorRef shard) {
+ if (failure instanceof LocalShardNotFoundException) {
+ LOG.debug("No local shard found for {} - DataTreeChangeListener {} at path {} " +
+ "cannot be registered", shardName, getInstance(), treeId);
+ } else if (failure != null) {
+ LOG.error("Failed to find local shard {} - DataTreeChangeListener {} at path {} " +
+ "cannot be registered: {}", shardName, getInstance(), treeId, failure);
+ } else {
+ doRegistration(shard, treeId);
+ }
+ }
+ }, actorContext.getClientDispatcher());
+ }
+
+ private void setListenerRegistrationActor(final ActorSelection actor) {
+ if (actor == null) {
+ LOG.debug("Ignoring null actor on {}", this);
+ return;
+ }
+
+ synchronized (this) {
+ if (!isClosed()) {
+ this.listenerRegistrationActor = actor;
+ return;
+ }
+ }
+
+ // This registration has already been closed, notify the actor
+ actor.tell(CloseDataTreeChangeListenerRegistration.getInstance(), null);
+ }
+
+ private void doRegistration(final ActorRef shard, final YangInstanceIdentifier path) {
+
+ Future<Object> future = actorContext.executeOperationAsync(shard,
+ new RegisterDataTreeChangeListener(path, dataChangeListenerActor),
+ actorContext.getDatastoreContext().getShardInitializationTimeout());
+
+ future.onComplete(new OnComplete<Object>(){
+ @Override
+ public void onComplete(final Throwable failure, final Object result) {
+ if (failure != null) {
+ LOG.error("Failed to register DataTreeChangeListener {} at path {}",
+ getInstance(), path.toString(), failure);
+ } else {
+ RegisterDataTreeChangeListenerReply reply = (RegisterDataTreeChangeListenerReply) result;
+ setListenerRegistrationActor(actorContext.actorSelection(
+ reply.getListenerRegistrationPath().path()));
+ }
+ }
+ }, actorContext.getClientDispatcher());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistration;
+import org.opendaylight.controller.cluster.datastore.messages.CloseDataTreeChangeListenerRegistrationReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Actor co-located with a shard. It exists only to terminate the registration when
+ * asked to do so via {@link CloseDataTreeChangeListenerRegistration}.
+ */
+public final class DataTreeChangeListenerRegistrationActor extends AbstractUntypedActor {
+ private final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+ public DataTreeChangeListenerRegistrationActor(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+ this.registration = Preconditions.checkNotNull(registration);
+ }
+
+ @Override
+ protected void handleReceive(Object message) throws Exception {
+ if (message instanceof CloseDataTreeChangeListenerRegistration) {
+ registration.close();
+ getSender().tell(CloseDataTreeChangeListenerRegistrationReply.getInstance(), getSelf());
+ getSelf().tell(PoisonPill.getInstance(), getSelf());
+ }
+ }
+
+ public static Props props(final ListenerRegistration<DOMDataTreeChangeListener> registration) {
+ return Props.create(new DataTreeChangeListenerRegistrationCreator(registration));
+ }
+
+ private static final class DataTreeChangeListenerRegistrationCreator implements Creator<DataTreeChangeListenerRegistrationActor> {
+ private static final long serialVersionUID = 1L;
+ final ListenerRegistration<DOMDataTreeChangeListener> registration;
+
+ DataTreeChangeListenerRegistrationCreator(ListenerRegistration<DOMDataTreeChangeListener> registration) {
+ this.registration = Preconditions.checkNotNull(registration);
+ }
+
+ @Override
+ public DataTreeChangeListenerRegistrationActor create() {
+ return new DataTreeChangeListenerRegistrationActor(registration);
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.EnableNotification;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListenerReply;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class DataTreeChangeListenerSupport extends LeaderLocalDelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> {
+ private static final Logger LOG = LoggerFactory.getLogger(DataTreeChangeListenerSupport.class);
+ private final ArrayList<DelayedDataTreeListenerRegistration> delayedRegistrations = new ArrayList<>();
+ private final Collection<ActorSelection> actors = new ArrayList<>();
+ private final Shard shard;
+
+ DataTreeChangeListenerSupport(final Shard shard) {
+ this.shard = Preconditions.checkNotNull(shard);
+ }
+
+ @Override
+ void onLeadershipChange(final boolean isLeader) {
+ if (isLeader) {
+ for (DelayedDataTreeListenerRegistration reg : delayedRegistrations) {
+ reg.createDelegate(this);
+ }
+ delayedRegistrations.clear();
+ delayedRegistrations.trimToSize();
+ }
+
+ final EnableNotification msg = new EnableNotification(isLeader);
+ for (ActorSelection dataChangeListener : actors) {
+ dataChangeListener.tell(msg, shard.getSelf());
+ }
+ }
+
+ @Override
+ void onMessage(final RegisterDataTreeChangeListener registerTreeChangeListener, final boolean isLeader) {
+ LOG.debug("{}: registerTreeChangeListener for {}, leader: {}", shard.persistenceId(), registerTreeChangeListener.getPath(), isLeader);
+
+ final ListenerRegistration<DOMDataTreeChangeListener> registration;
+ if (!isLeader) {
+ LOG.debug("{}: Shard is not the leader - delaying registration", shard.persistenceId());
+
+ DelayedDataTreeListenerRegistration delayedReg =
+ new DelayedDataTreeListenerRegistration(registerTreeChangeListener);
+ delayedRegistrations.add(delayedReg);
+ registration = delayedReg;
+ } else {
+ registration = createDelegate(registerTreeChangeListener);
+ }
+
+ ActorRef listenerRegistration = shard.getContext().actorOf(
+ DataTreeChangeListenerRegistrationActor.props(registration));
+
+ LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ",
+ shard.persistenceId(), listenerRegistration.path());
+
+ shard.getSender().tell(new RegisterDataTreeChangeListenerReply(listenerRegistration), shard.getSelf());
+ }
+
+ @Override
+ ListenerRegistration<DOMDataTreeChangeListener> createDelegate(final RegisterDataTreeChangeListener message) {
+ ActorSelection dataChangeListenerPath = shard.getContext().system().actorSelection(
+ message.getDataTreeChangeListenerPath().path());
+
+ // Notify the listener if notifications should be enabled or not
+ // If this shard is the leader then it will enable notifications else
+ // it will not
+ dataChangeListenerPath.tell(new EnableNotification(true), shard.getSelf());
+
+ // Now store a reference to the data change listener so it can be notified
+ // at a later point if notifications should be enabled or disabled
+ actors.add(dataChangeListenerPath);
+
+ DOMDataTreeChangeListener listener = new ForwardingDataTreeChangeListener(dataChangeListenerPath);
+
+ LOG.debug("{}: Registering for path {}", shard.persistenceId(), message.getPath());
+
+ return shard.getDataStore().registerTreeChangeListener(message.getPath(), listener);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.concepts.ListenerRegistration;
+
+/**
+ * Intermediate proxy registration returned to the user when we cannot
+ * instantiate the registration immediately. It provides a bridge to
+ * a real registration which may materialize at some point in the future.
+ */
+final class DelayedDataTreeListenerRegistration implements ListenerRegistration<DOMDataTreeChangeListener> {
+ private final RegisterDataTreeChangeListener registerTreeChangeListener;
+ private volatile ListenerRegistration<DOMDataTreeChangeListener> delegate;
+ @GuardedBy("this")
+ private boolean closed;
+
+ DelayedDataTreeListenerRegistration(final RegisterDataTreeChangeListener registerTreeChangeListener) {
+ this.registerTreeChangeListener = Preconditions.checkNotNull(registerTreeChangeListener);
+ }
+
+ synchronized void createDelegate(final DelegateFactory<RegisterDataTreeChangeListener, ListenerRegistration<DOMDataTreeChangeListener>> factory) {
+ if (!closed) {
+ this.delegate = factory.createDelegate(registerTreeChangeListener);
+ }
+ }
+
+ @Override
+ public DOMDataTreeChangeListener getInstance() {
+ final ListenerRegistration<DOMDataTreeChangeListener> d = delegate;
+ return d == null ? null : d.getInstance();
+ }
+
+ @Override
+ public synchronized void close() {
+ if (!closed) {
+ closed = true;
+ if (delegate != null) {
+ delegate.close();
+ }
+ }
+ }
+}
+
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Base class for factories instantiating delegates.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class DelegateFactory<M, D> {
+ abstract D createDelegate(M message);
+}
import org.opendaylight.controller.cluster.datastore.utils.Dispatchers;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataBroker;
import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
import org.opendaylight.controller.sal.core.spi.data.DOMStore;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreReadWriteTransaction;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransactionChain;
+import org.opendaylight.controller.sal.core.spi.data.DOMStoreTreeChangePublisher;
import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
*
*/
public class DistributedDataStore implements DOMStore, SchemaContextListener,
- DatastoreContextConfigAdminOverlay.Listener, AutoCloseable {
+ DatastoreContextConfigAdminOverlay.Listener, DOMStoreTreeChangePublisher, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DistributedDataStore.class);
private static final String UNKNOWN_TYPE = "unknown";
return listenerRegistrationProxy;
}
+ @Override
+ public <L extends DOMDataTreeChangeListener> ListenerRegistration<L> registerTreeChangeListener(YangInstanceIdentifier treeId, L listener) {
+ Preconditions.checkNotNull(treeId, "treeId should not be null");
+ Preconditions.checkNotNull(listener, "listener should not be null");
+
+ final String shardName = ShardStrategyFactory.getStrategy(treeId).findShard(treeId);
+ LOG.debug("Registering tree listener: {} for tree: {} shard: {}", listener, treeId, shardName);
+
+ final DataTreeChangeListenerProxy<L> listenerRegistrationProxy =
+ new DataTreeChangeListenerProxy<L>(actorContext, listener);
+ listenerRegistrationProxy.init(shardName, treeId);
+
+ return listenerRegistrationProxy;
+ }
+
@Override
public DOMStoreTransactionChain createTransactionChain() {
return new TransactionChainProxy(actorContext);
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+import com.google.common.base.Preconditions;
+import akka.actor.ActorRef;
+import akka.actor.ActorSelection;
+import java.util.Collection;
+import org.opendaylight.controller.cluster.datastore.messages.DataTreeChanged;
+import org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * Internal implementation of a {@link DOMDataTreeChangeListener} which
+ * encapsulates received notifications into a {@link DataTreeChanged}
+ * message and forwards them towards the client's {@link DataTreeChangeListenerActor}.
+ */
+final class ForwardingDataTreeChangeListener implements DOMDataTreeChangeListener {
+ private final ActorSelection actor;
+
+ ForwardingDataTreeChangeListener(final ActorSelection actor) {
+ this.actor = Preconditions.checkNotNull(actor, "actor should not be null");
+ }
+
+ @Override
+ public void onDataTreeChanged(Collection<DataTreeCandidate> changes) {
+ actor.tell(new DataTreeChanged(changes), ActorRef.noSender());
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore;
+
+/**
+ * Base class for factories instantiating delegates which are local to the
+ * shard leader.
+ *
+ * <D> delegate type
+ * <M> message type
+ */
+abstract class LeaderLocalDelegateFactory<M, D> extends DelegateFactory<M, D> {
+ /**
+ * Invoked whenever the local shard's leadership role changes.
+ *
+ * @param isLeader true if the shard has become leader, false if it has
+ * become a follower.
+ */
+ abstract void onLeadershipChange(boolean isLeader);
+ abstract void onMessage(M message, boolean isLeader);
+}
import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply;
+import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener;
import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext;
import org.opendaylight.controller.cluster.datastore.modification.Modification;
import org.opendaylight.controller.cluster.datastore.modification.ModificationPayload;
private final String txnDispatcherPath;
+ private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this);
+
protected Shard(final ShardIdentifier name, final Map<String, String> peerAddresses,
final DatastoreContext datastoreContext, final SchemaContext schemaContext) {
super(name.toString(), new HashMap<>(peerAddresses), Optional.of(datastoreContext.getShardRaftConfig()));
store = InMemoryDOMDataStoreFactory.create(name.toString(), null,
datastoreContext.getDataStoreProperties());
- if(schemaContext != null) {
+ if (schemaContext != null) {
store.onGlobalContextUpdated(schemaContext);
}
closeTransactionChain(CloseTransactionChain.fromSerializable(message));
} else if (message instanceof RegisterChangeListener) {
registerChangeListener((RegisterChangeListener) message);
+ } else if (message instanceof RegisterDataTreeChangeListener) {
+ treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader());
} else if (message instanceof UpdateSchemaContext) {
updateSchemaContext((UpdateSchemaContext) message);
} else if (message instanceof PeerAddressResolved) {
@Override
protected void onStateChanged() {
boolean isLeader = isLeader();
+ treeChangeSupport.onLeadershipChange(isLeader);
+
for (ActorSelection dataChangeListener : dataChangeListeners) {
dataChangeListener.tell(new EnableNotification(isLeader), getSelf());
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistration implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final CloseDataTreeChangeListenerRegistration INSTANCE = new CloseDataTreeChangeListenerRegistration();
+
+ private CloseDataTreeChangeListenerRegistration() {
+ }
+
+ public static CloseDataTreeChangeListenerRegistration getInstance() {
+ return INSTANCE;
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class CloseDataTreeChangeListenerRegistrationReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final CloseDataTreeChangeListenerRegistrationReply INSTANCE = new CloseDataTreeChangeListenerRegistrationReply();
+
+ private CloseDataTreeChangeListenerRegistrationReply() {
+ // Use getInstance() instead
+ }
+
+ public static CloseDataTreeChangeListenerRegistrationReply getInstance() {
+ return INSTANCE;
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import org.opendaylight.yangtools.yang.data.api.schema.tree.DataTreeCandidate;
+
+/**
+ * A message about a DataTree having been changed. The message is not
+ * serializable on purpose. For delegating the change across cluster nodes,
+ * this needs to be intercepted by a local agent and forwarded as
+ * a {@link DataTreeDelta}.
+ */
+public final class DataTreeChanged {
+ private final Collection<DataTreeCandidate> changes;
+
+ public DataTreeChanged(final Collection<DataTreeCandidate> changes) {
+ this.changes = Preconditions.checkNotNull(changes);
+ }
+
+ /**
+ * Return the data changes.
+ *
+ * @return Change events
+ */
+ public Collection<DataTreeCandidate> getChanges() {
+ return changes;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import java.io.ObjectStreamException;
+import java.io.Serializable;
+
+public final class DataTreeChangedReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private static final DataTreeChangedReply INSTANCE = new DataTreeChangedReply();
+
+ private DataTreeChangedReply() {
+ // Use getInstance() instead
+ }
+
+ public static DataTreeChangedReply getInstance() {
+ return INSTANCE;
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ return INSTANCE;
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.opendaylight.controller.cluster.datastore.utils.SerializationUtils;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Request a {@link org.opendaylight.controller.md.sal.dom.api.DOMDataTreeChangeListener} registration be made on the shard
+ * leader.
+ */
+public final class RegisterDataTreeChangeListener implements Externalizable {
+ private static final long serialVersionUID = 1L;
+ private ActorRef dataTreeChangeListenerPath;
+ private YangInstanceIdentifier path;
+
+ public RegisterDataTreeChangeListener(final YangInstanceIdentifier path, final ActorRef dataTreeChangeListenerPath) {
+ this.path = Preconditions.checkNotNull(path);
+ this.dataTreeChangeListenerPath = Preconditions.checkNotNull(dataTreeChangeListenerPath);
+ }
+
+ public YangInstanceIdentifier getPath() {
+ return path;
+ }
+
+ public ActorRef getDataTreeChangeListenerPath() {
+ return dataTreeChangeListenerPath;
+ }
+
+ @Override
+ public void writeExternal(final ObjectOutput out) throws IOException {
+ out.writeObject(dataTreeChangeListenerPath);
+ SerializationUtils.serializePath(path, out);
+ }
+
+ @Override
+ public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException {
+ dataTreeChangeListenerPath = (ActorRef) in.readObject();
+ path = SerializationUtils.deserializePath(in);
+ }
+}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.messages;
+
+import akka.actor.ActorRef;
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+
+/**
+ * Successful reply to a {@link RegisterDataTreeChangeListener} request.
+ */
+public final class RegisterDataTreeChangeListenerReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private final ActorRef listenerRegistrationPath;
+
+ public RegisterDataTreeChangeListenerReply(final ActorRef listenerRegistrationPath) {
+ this.listenerRegistrationPath = Preconditions.checkNotNull(listenerRegistrationPath);
+ }
+
+ public ActorRef getListenerRegistrationPath() {
+ return listenerRegistrationPath;
+ }
+}