From c38bb5d90ebdb8e867cecc5b0dea0bfafe8ae621 Mon Sep 17 00:00:00 2001 From: Tom Pantelis Date: Tue, 11 Aug 2015 00:05:38 -0400 Subject: [PATCH] Bug 4105: Add EntityOwnershipListenerActor and support Change-Id: Idbeef3e23ab45a11afe5fce56a55fe5d6945729a Signed-off-by: Tom Pantelis --- .../EntityOwnershipListenerActor.java | 68 +++++++++ .../EntityOwnershipListenerSupport.java | 112 ++++++++++++++ .../messages/EntityOwnershipChanged.java | 45 ++++++ .../EntityOwnershipListenerActorTest.java | 72 +++++++++ .../EntityOwnershipListenerSupportTest.java | 142 ++++++++++++++++++ 5 files changed, 439 insertions(+) create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActor.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/EntityOwnershipChanged.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActorTest.java create mode 100644 opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActor.java new file mode 100644 index 0000000000..f62f7492f6 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActor.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import akka.actor.Props; +import akka.japi.Creator; +import com.google.common.base.Preconditions; +import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * An actor which is responsible for notifying an EntityOwnershipListener of changes. + * + * @author Thomas Pantelis + */ +class EntityOwnershipListenerActor extends AbstractUntypedActor { + private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerActor.class); + + private final EntityOwnershipListener listener; + + private EntityOwnershipListenerActor(EntityOwnershipListener listener) { + this.listener = listener; + } + + @Override + protected void handleReceive(Object message) { + if(message instanceof EntityOwnershipChanged) { + onEntityOwnershipChanged((EntityOwnershipChanged)message); + } + } + + private void onEntityOwnershipChanged(EntityOwnershipChanged change) { + LOG.debug("Notifying EntityOwnershipListener {}: {}", listener, change); + + try { + listener.ownershipChanged(change.getEntity(), change.wasOwner(), change.isOwner()); + } catch (Exception e) { + LOG.error("Error notifying listener {}", listener, e); + } + } + + static Props props(EntityOwnershipListener listener) { + return Props.create(new EntityOwnershipListenerCreator(listener)); + } + + private static final class EntityOwnershipListenerCreator implements Creator { + private static final long serialVersionUID = 1L; + + private final EntityOwnershipListener listener; + + EntityOwnershipListenerCreator(EntityOwnershipListener listener) { + this.listener = Preconditions.checkNotNull(listener); + } + + @Override + public EntityOwnershipListenerActor create() { + return new EntityOwnershipListenerActor(listener); + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java new file mode 100644 index 0000000000..5220ea29e2 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.PoisonPill; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import java.util.Collection; +import java.util.IdentityHashMap; +import java.util.Map; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages EntityOwnershipListener registrations and notifications for the EntityOwnershipShard. + * + * @author Thomas Pantelis + */ +class EntityOwnershipListenerSupport { + private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class); + + private final ActorContext actorContext; + private final Map listenerActorMap = new IdentityHashMap<>(); + private final Multimap entityListenerMap = HashMultimap.create(); + + EntityOwnershipListenerSupport(ActorContext actorContext) { + this.actorContext = actorContext; + } + + void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) { + LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity); + + if(entityListenerMap.put(entity, listener)) { + ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener); + if(listenerEntry == null) { + listenerActorMap.put(listener, new ListenerActorRefEntry()); + } else { + listenerEntry.referenceCount++; + } + } + } + + void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) { + LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity); + + if(entityListenerMap.remove(entity, listener)) { + ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener); + + LOG.debug("Found {}", listenerEntry); + + listenerEntry.referenceCount--; + if(listenerEntry.referenceCount <= 0) { + listenerActorMap.remove(listener); + + if(listenerEntry.actorRef != null) { + LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef); + listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender()); + } + } + } + } + + void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) { + Collection listeners = entityListenerMap.get(entity); + if(listeners.isEmpty()) { + return; + } + + EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner); + for(EntityOwnershipListener listener: listeners) { + ActorRef listenerActor = listenerActorFor(listener); + + LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor,changed); + + listenerActor.tell(changed, ActorRef.noSender()); + } + } + + private ActorRef listenerActorFor(EntityOwnershipListener listener) { + return listenerActorMap.get(listener).actorFor(listener); + } + + private class ListenerActorRefEntry { + ActorRef actorRef; + int referenceCount = 1; + + ActorRef actorFor(EntityOwnershipListener listener) { + if(actorRef == null) { + actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener)); + + LOG.debug("Created EntityOwnershipListenerActor {} for listener {}", actorRef, listener); + } + + return actorRef; + } + + @Override + public String toString() { + return "ListenerActorRefEntry [actorRef=" + actorRef + ", referenceCount=" + referenceCount + "]"; + } + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/EntityOwnershipChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/EntityOwnershipChanged.java new file mode 100644 index 0000000000..86fd9f70cd --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/EntityOwnershipChanged.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership.messages; + +import com.google.common.base.Preconditions; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; + +/** + * Message encapsulating an entity ownership change. + * + * @author Thomas Pantelis + */ +public class EntityOwnershipChanged { + private final Entity entity; + private final boolean wasOwner; + private final boolean isOwner; + + public EntityOwnershipChanged(Entity entity, boolean wasOwner, boolean isOwner) { + this.entity = Preconditions.checkNotNull(entity, "entity can't be null"); + this.wasOwner = wasOwner; + this.isOwner = isOwner; + } + + public Entity getEntity() { + return entity; + } + + public boolean wasOwner() { + return wasOwner; + } + + public boolean isOwner() { + return isOwner; + } + + @Override + public String toString() { + return "EntityOwnershipChanged [entity=" + entity + ", wasOwner=" + wasOwner + ", isOwner=" + isOwner + "]"; + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActorTest.java new file mode 100644 index 0000000000..50ed12f21b --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActorTest.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.testkit.TestActorRef; +import org.junit.After; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; + +/** + * Unit tests for EntityOwnershipListenerActor. + * + * @author Thomas Pantelis + */ +public class EntityOwnershipListenerActorTest extends AbstractActorTest { + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + + @After + public void tearDown() { + actorFactory.close(); + } + + @Test + public void testOnEntityOwnershipChanged() { + EntityOwnershipListener mockListener = mock(EntityOwnershipListener.class); + + TestActorRef listenerActor = actorFactory.createTestActor( + EntityOwnershipListenerActor.props(mockListener), actorFactory.generateActorId("listener")); + + Entity entity = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1"))); + boolean wasOwner = false; + boolean isOwner = true; + listenerActor.tell(new EntityOwnershipChanged(entity, wasOwner, isOwner), ActorRef.noSender()); + + verify(mockListener, timeout(5000)).ownershipChanged(entity, wasOwner, isOwner); + } + + @Test + public void testOnEntityOwnershipChangedWithListenerEx() { + EntityOwnershipListener mockListener = mock(EntityOwnershipListener.class); + + Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1"))); + doThrow(new RuntimeException("mock")).when(mockListener).ownershipChanged(entity1, false, true); + Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2"))); + doNothing().when(mockListener).ownershipChanged(entity2, true, false); + + TestActorRef listenerActor = actorFactory.createTestActor( + EntityOwnershipListenerActor.props(mockListener), actorFactory.generateActorId("listener")); + + listenerActor.tell(new EntityOwnershipChanged(entity1, false, true), ActorRef.noSender()); + listenerActor.tell(new EntityOwnershipChanged(entity2, true, false), ActorRef.noSender()); + + verify(mockListener, timeout(5000)).ownershipChanged(entity2, true, false); + } +} diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java new file mode 100644 index 0000000000..dbc5c2c0c0 --- /dev/null +++ b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2015 Brocade Communications Systems, Inc. and others. All rights reserved. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License v1.0 which accompanies this distribution, + * and is available at http://www.eclipse.org/legal/epl-v10.html + */ +package org.opendaylight.controller.cluster.datastore.entityownership; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import akka.actor.ActorRef; +import akka.actor.Props; +import akka.actor.UntypedActorContext; +import akka.testkit.JavaTestKit; +import akka.testkit.TestActorRef; +import com.google.common.util.concurrent.Uninterruptibles; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Test; +import org.opendaylight.controller.cluster.datastore.AbstractActorTest; +import org.opendaylight.controller.cluster.raft.TestActorFactory; +import org.opendaylight.controller.cluster.raft.utils.DoNothingActor; +import org.opendaylight.controller.md.sal.common.api.clustering.Entity; +import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier; +import scala.collection.Iterator; +import scala.collection.immutable.Iterable; + +/** + * Unit tests for EntityOwnershipListenerSupport. + * + * @author Thomas Pantelis + */ +public class EntityOwnershipListenerSupportTest extends AbstractActorTest { + private final TestActorFactory actorFactory = new TestActorFactory(getSystem()); + + @After + public void tearDown() { + actorFactory.close(); + } + + @Test + public void testNotifyEntityOwnershipListeners() { + TestActorRef actor = actorFactory.createTestActor( + Props.create(DoNothingActor.class), actorFactory.generateActorId("test")); + + UntypedActorContext actorContext = actor.underlyingActor().getContext(); + EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext); + + EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1"); + EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2"); + Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1"))); + Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2"))); + + // Add EntityOwnershipListener registrations. + + support.addEntityOwnershipListener(entity1, mockListener1); + support.addEntityOwnershipListener(entity2, mockListener1); + support.addEntityOwnershipListener(entity1, mockListener2); + + // Notify entity1 changed and verify both listeners are notified. + + support.notifyEntityOwnershipListeners(entity1, false, true); + + verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true); + verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true); + assertEquals("# of listener actors", 2, actorContext.children().size()); + + // Notify entity2 changed and verify only mockListener1 is notified. + + support.notifyEntityOwnershipListeners(entity2, false, true); + + verify(mockListener1, timeout(5000)).ownershipChanged(entity2, false, true); + verify(mockListener2, never()).ownershipChanged(entity2, false, true); + assertEquals("# of listener actors", 2, actorContext.children().size()); + + // Notify entity3 changed and verify neither listener is notified. + + Entity entity3 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id3"))); + support.notifyEntityOwnershipListeners(entity3, false, true); + + Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + + verify(mockListener1, never()).ownershipChanged(entity3, false, true); + verify(mockListener2, never()).ownershipChanged(entity3, false, true); + + reset(mockListener1, mockListener2); + + // Unregister mockListener1 for entity1, issue a change and verify only mockListener2 is notified. + + support.removeEntityOwnershipListener(entity1, mockListener1); + + support.notifyEntityOwnershipListeners(entity1, false, true); + + verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true); + verify(mockListener1, never()).ownershipChanged(entity1, false, true); + + // Completely unregister both listeners and verify their listener actors are destroyed. + + Iterable listenerActors = actorContext.children(); + assertEquals("# of listener actors", 2, listenerActors.size()); + + Iterator iter = listenerActors.iterator(); + ActorRef listenerActor1 = iter.next(); + ActorRef listenerActor2 = iter.next(); + + JavaTestKit kit1 = new JavaTestKit(getSystem()); + kit1.watch(listenerActor1); + + JavaTestKit kit2 = new JavaTestKit(getSystem()); + kit2.watch(listenerActor2); + + support.removeEntityOwnershipListener(entity2, mockListener1); + support.removeEntityOwnershipListener(entity1, mockListener2); + + kit1.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor1); + kit2.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor2); + assertEquals("# of listener actors", 0, actorContext.children().size()); + + // Re-register mockListener1 for entity1 and verify it is notified. + + reset(mockListener1, mockListener2); + + support.addEntityOwnershipListener(entity1, mockListener1); + + support.notifyEntityOwnershipListeners(entity1, false, true); + + verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true); + verify(mockListener2, never()).ownershipChanged(entity2, false, true); + + // Quickly register and unregister mockListener2 - expecting no exceptions. + + support.addEntityOwnershipListener(entity1, mockListener2); + support.removeEntityOwnershipListener(entity1, mockListener2); + } +} -- 2.36.6