From 189586eeeeeeb8a9b6ed7398450d198f1864c307 Mon Sep 17 00:00:00 2001 From: Robert Varga Date: Fri, 27 Mar 2015 12:10:04 +0100 Subject: [PATCH] CDS: Move DataChangeListener into a support class This patch follows up the pattern set by DataTreeChangeListener and moves the related code out. Change-Id: Ib945dc308c2264f21c0a7df8152c1f9f8f908a43 Signed-off-by: Robert Varga --- .../datastore/DataChangeListenerSupport.java | 101 ++++++++++++++ .../DelayedListenerRegistration.java | 55 ++++++++ .../controller/cluster/datastore/Shard.java | 125 +----------------- 3 files changed, 161 insertions(+), 120 deletions(-) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java new file mode 100644 index 0000000000..7a033cf21f --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DataChangeListenerSupport.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class DataChangeListenerSupport extends LeaderLocalDelegateFactory>>> { + private static final Logger LOG = LoggerFactory.getLogger(DataChangeListenerSupport.class); + private final List delayedListenerRegistrations = new ArrayList<>(); + private final List dataChangeListeners = new ArrayList<>(); + private final Shard shard; + + DataChangeListenerSupport(final Shard shard) { + this.shard = Preconditions.checkNotNull(shard); + } + + @Override + void onLeadershipChange(final boolean isLeader) { + for (ActorSelection dataChangeListener : dataChangeListeners) { + dataChangeListener.tell(new EnableNotification(isLeader), shard.getSelf()); + } + + if (isLeader) { + for (DelayedListenerRegistration reg: delayedListenerRegistrations) { + if(!reg.isClosed()) { + reg.setDelegate(createDelegate(reg.getRegisterChangeListener())); + } + } + + delayedListenerRegistrations.clear(); + } + } + + @Override + void onMessage(final RegisterChangeListener message, final boolean isLeader) { + + LOG.debug("{}: registerDataChangeListener for {}, leader: {}", shard.persistenceId(), message.getPath(), isLeader); + + ListenerRegistration>> registration; + if (isLeader) { + registration = createDelegate(message); + } else { + LOG.debug("{}: Shard is not the leader - delaying registration", shard.persistenceId()); + + DelayedListenerRegistration delayedReg = new DelayedListenerRegistration(message); + delayedListenerRegistrations.add(delayedReg); + registration = delayedReg; + } + + ActorRef listenerRegistration = shard.getContext().actorOf( + DataChangeListenerRegistration.props(registration)); + + LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", + shard.persistenceId(), listenerRegistration.path()); + + shard.getSender().tell(new RegisterChangeListenerReply(listenerRegistration), shard.getSelf()); + } + + @Override + ListenerRegistration>> createDelegate( + final RegisterChangeListener message) { + ActorSelection dataChangeListenerPath = shard.getContext().system().actorSelection( + message.getDataChangeListenerPath()); + + // Notify the listener if notifications should be enabled or not + // If this shard is the leader then it will enable notifications else + // it will not + dataChangeListenerPath.tell(new EnableNotification(true), shard.getSelf()); + + // Now store a reference to the data change listener so it can be notified + // at a later point if notifications should be enabled or disabled + dataChangeListeners.add(dataChangeListenerPath); + + AsyncDataChangeListener> listener = + new DataChangeListenerProxy(dataChangeListenerPath); + + LOG.debug("{}: Registering for path {}", shard.persistenceId(), message.getPath()); + + return shard.getDataStore().registerChangeListener(message.getPath(), listener, + message.getScope()); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java new file mode 100644 index 0000000000..8eb595df03 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/DelayedListenerRegistration.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore; + +import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; +import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; + +final class DelayedListenerRegistration implements + ListenerRegistration>> { + + private volatile boolean closed; + + private final RegisterChangeListener registerChangeListener; + + private volatile ListenerRegistration>> delegate; + + DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) { + this.registerChangeListener = registerChangeListener; + } + + void setDelegate( final ListenerRegistration>> registration) { + this.delegate = registration; + } + + boolean isClosed() { + return closed; + } + + RegisterChangeListener getRegisterChangeListener() { + return registerChangeListener; + } + + @Override + public AsyncDataChangeListener> getInstance() { + return delegate != null ? delegate.getInstance() : null; + } + + @Override + public void close() { + closed = true; + if(delegate != null) { + delegate.close(); + } + } +} \ No newline at end of file diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java index 6868cc15cd..9cd52b219a 100644 --- a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/Shard.java @@ -18,13 +18,11 @@ import akka.serialization.Serialization; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -50,12 +48,10 @@ import org.opendaylight.controller.cluster.datastore.messages.CommitTransactionR import org.opendaylight.controller.cluster.datastore.messages.CreateSnapshot; import org.opendaylight.controller.cluster.datastore.messages.CreateTransaction; import org.opendaylight.controller.cluster.datastore.messages.CreateTransactionReply; -import org.opendaylight.controller.cluster.datastore.messages.EnableNotification; import org.opendaylight.controller.cluster.datastore.messages.ForwardedReadyTransaction; import org.opendaylight.controller.cluster.datastore.messages.PeerAddressResolved; import org.opendaylight.controller.cluster.datastore.messages.ReadyTransactionReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListener; -import org.opendaylight.controller.cluster.datastore.messages.RegisterChangeListenerReply; import org.opendaylight.controller.cluster.datastore.messages.RegisterDataTreeChangeListener; import org.opendaylight.controller.cluster.datastore.messages.UpdateSchemaContext; import org.opendaylight.controller.cluster.datastore.modification.Modification; @@ -72,13 +68,11 @@ import org.opendaylight.controller.cluster.raft.messages.AppendEntriesReply; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationByteStringPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.CompositeModificationPayload; import org.opendaylight.controller.cluster.raft.protobuff.client.messages.Payload; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeListener; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStore; import org.opendaylight.controller.md.sal.dom.store.impl.InMemoryDOMDataStoreFactory; import org.opendaylight.controller.sal.core.spi.data.DOMStoreThreePhaseCommitCohort; import org.opendaylight.controller.sal.core.spi.data.DOMStoreTransaction; import org.opendaylight.controller.sal.core.spi.data.DOMStoreWriteTransaction; -import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; import org.opendaylight.yangtools.yang.data.api.schema.NormalizedNode; import org.opendaylight.yangtools.yang.model.api.SchemaContext; @@ -108,11 +102,6 @@ public class Shard extends RaftActor { private final ShardStats shardMBean; - private final List dataChangeListeners = Lists.newArrayList(); - - private final List delayedListenerRegistrations = - Lists.newArrayList(); - private DatastoreContext datastoreContext; private SchemaContext schemaContext; @@ -143,6 +132,7 @@ public class Shard extends RaftActor { private final String txnDispatcherPath; private final DataTreeChangeListenerSupport treeChangeSupport = new DataTreeChangeListenerSupport(this); + private final DataChangeListenerSupport changeSupport = new DataChangeListenerSupport(this); protected Shard(final ShardIdentifier name, final Map peerAddresses, final DatastoreContext datastoreContext, final SchemaContext schemaContext) { @@ -274,7 +264,7 @@ public class Shard extends RaftActor { } else if (CloseTransactionChain.SERIALIZABLE_CLASS.isInstance(message)) { closeTransactionChain(CloseTransactionChain.fromSerializable(message)); } else if (message instanceof RegisterChangeListener) { - registerChangeListener((RegisterChangeListener) message); + changeSupport.onMessage((RegisterChangeListener) message, isLeader()); } else if (message instanceof RegisterDataTreeChangeListener) { treeChangeSupport.onMessage((RegisterDataTreeChangeListener) message, isLeader()); } else if (message instanceof UpdateSchemaContext) { @@ -653,58 +643,7 @@ public class Shard extends RaftActor { store.onGlobalContextUpdated(schemaContext); } - private void registerChangeListener(final RegisterChangeListener registerChangeListener) { - - LOG.debug("{}: registerDataChangeListener for {}", persistenceId(), registerChangeListener.getPath()); - - ListenerRegistration>> registration; - if(isLeader()) { - registration = doChangeListenerRegistration(registerChangeListener); - } else { - LOG.debug("{}: Shard is not the leader - delaying registration", persistenceId()); - - DelayedListenerRegistration delayedReg = - new DelayedListenerRegistration(registerChangeListener); - delayedListenerRegistrations.add(delayedReg); - registration = delayedReg; - } - - ActorRef listenerRegistration = getContext().actorOf( - DataChangeListenerRegistration.props(registration)); - - LOG.debug("{}: registerDataChangeListener sending reply, listenerRegistrationPath = {} ", - persistenceId(), listenerRegistration.path()); - - getSender().tell(new RegisterChangeListenerReply(listenerRegistration), getSelf()); - } - - private ListenerRegistration>> doChangeListenerRegistration( - final RegisterChangeListener registerChangeListener) { - - ActorSelection dataChangeListenerPath = getContext().system().actorSelection( - registerChangeListener.getDataChangeListenerPath()); - - // Notify the listener if notifications should be enabled or not - // If this shard is the leader then it will enable notifications else - // it will not - dataChangeListenerPath.tell(new EnableNotification(true), getSelf()); - - // Now store a reference to the data change listener so it can be notified - // at a later point if notifications should be enabled or disabled - dataChangeListeners.add(dataChangeListenerPath); - - AsyncDataChangeListener> listener = - new DataChangeListenerProxy(dataChangeListenerPath); - - LOG.debug("{}: Registering for path {}", persistenceId(), registerChangeListener.getPath()); - - return store.registerChangeListener(registerChangeListener.getPath(), listener, - registerChangeListener.getScope()); - } - - private boolean isMetricsCaptureEnabled(){ + private boolean isMetricsCaptureEnabled() { CommonConfig config = new CommonConfig(getContext().system().settings().config()); return config.isMetricCaptureEnabled(); } @@ -831,24 +770,11 @@ public class Shard extends RaftActor { @Override protected void onStateChanged() { boolean isLeader = isLeader(); + changeSupport.onLeadershipChange(isLeader); treeChangeSupport.onLeadershipChange(isLeader); - for (ActorSelection dataChangeListener : dataChangeListeners) { - dataChangeListener.tell(new EnableNotification(isLeader), getSelf()); - } - - if(isLeader) { - for(DelayedListenerRegistration reg: delayedListenerRegistrations) { - if(!reg.isClosed()) { - reg.setDelegate(doChangeListenerRegistration(reg.getRegisterChangeListener())); - } - } - - delayedListenerRegistrations.clear(); - } - // If this actor is no longer the leader close all the transaction chains - if(!isLeader) { + if (!isLeader) { if(LOG.isDebugEnabled()) { LOG.debug( "{}: onStateChanged: Closing all transaction chains because shard {} is no longer the leader", @@ -902,45 +828,4 @@ public class Shard extends RaftActor { ShardStats getShardMBean() { return shardMBean; } - - private static class DelayedListenerRegistration implements - ListenerRegistration>> { - - private volatile boolean closed; - - private final RegisterChangeListener registerChangeListener; - - private volatile ListenerRegistration>> delegate; - - DelayedListenerRegistration(final RegisterChangeListener registerChangeListener) { - this.registerChangeListener = registerChangeListener; - } - - void setDelegate( final ListenerRegistration>> registration) { - this.delegate = registration; - } - - boolean isClosed() { - return closed; - } - - RegisterChangeListener getRegisterChangeListener() { - return registerChangeListener; - } - - @Override - public AsyncDataChangeListener> getInstance() { - return delegate != null ? delegate.getInstance() : null; - } - - @Override - public void close() { - closed = true; - if(delegate != null) { - delegate.close(); - } - } - } } -- 2.36.6