Bug 4105: Add EntityOwnershipListenerActor and support 95/26795/1
authorTom Pantelis <tpanteli@brocade.com>
Tue, 11 Aug 2015 04:05:38 +0000 (00:05 -0400)
committerTom Pantelis <tpanteli@brocade.com>
Thu, 10 Sep 2015 19:12:47 +0000 (15:12 -0400)
Change-Id: Idbeef3e23ab45a11afe5fce56a55fe5d6945729a
Signed-off-by: Tom Pantelis <tpanteli@brocade.com>
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActor.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/EntityOwnershipChanged.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActorTest.java [new file with mode: 0644]
opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java [new file with mode: 0644]

diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActor.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActor.java
new file mode 100644 (file)
index 0000000..f62f749
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.Props;
+import akka.japi.Creator;
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.cluster.common.actor.AbstractUntypedActor;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An actor which is responsible for notifying an EntityOwnershipListener of changes.
+ *
+ * @author Thomas Pantelis
+ */
+class EntityOwnershipListenerActor extends AbstractUntypedActor {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerActor.class);
+
+    private final EntityOwnershipListener listener;
+
+    private EntityOwnershipListenerActor(EntityOwnershipListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    protected void handleReceive(Object message) {
+        if(message instanceof EntityOwnershipChanged) {
+            onEntityOwnershipChanged((EntityOwnershipChanged)message);
+        }
+    }
+
+    private void onEntityOwnershipChanged(EntityOwnershipChanged change) {
+        LOG.debug("Notifying EntityOwnershipListener {}: {}", listener, change);
+
+        try {
+            listener.ownershipChanged(change.getEntity(), change.wasOwner(), change.isOwner());
+        } catch (Exception e) {
+            LOG.error("Error notifying listener {}", listener, e);
+        }
+    }
+
+    static Props props(EntityOwnershipListener listener) {
+        return Props.create(new EntityOwnershipListenerCreator(listener));
+    }
+
+    private static final class EntityOwnershipListenerCreator implements Creator<EntityOwnershipListenerActor> {
+        private static final long serialVersionUID = 1L;
+
+        private final EntityOwnershipListener listener;
+
+        EntityOwnershipListenerCreator(EntityOwnershipListener listener) {
+            this.listener = Preconditions.checkNotNull(listener);
+        }
+
+        @Override
+        public EntityOwnershipListenerActor create() {
+            return new EntityOwnershipListenerActor(listener);
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupport.java
new file mode 100644 (file)
index 0000000..5220ea2
--- /dev/null
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import akka.actor.ActorContext;
+import akka.actor.ActorRef;
+import akka.actor.PoisonPill;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages EntityOwnershipListener registrations and notifications for the EntityOwnershipShard.
+ *
+ * @author Thomas Pantelis
+ */
+class EntityOwnershipListenerSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(EntityOwnershipListenerSupport.class);
+
+    private final ActorContext actorContext;
+    private final Map<EntityOwnershipListener, ListenerActorRefEntry> listenerActorMap = new IdentityHashMap<>();
+    private final Multimap<Entity, EntityOwnershipListener> entityListenerMap = HashMultimap.create();
+
+    EntityOwnershipListenerSupport(ActorContext actorContext) {
+        this.actorContext = actorContext;
+    }
+
+    void addEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
+        LOG.debug("Adding EntityOwnershipListener {} for {}", listener, entity);
+
+        if(entityListenerMap.put(entity, listener)) {
+            ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+            if(listenerEntry == null) {
+                listenerActorMap.put(listener, new ListenerActorRefEntry());
+            } else {
+                listenerEntry.referenceCount++;
+            }
+        }
+    }
+
+    void removeEntityOwnershipListener(Entity entity, EntityOwnershipListener listener) {
+        LOG.debug("Removing EntityOwnershipListener {} for {}", listener, entity);
+
+        if(entityListenerMap.remove(entity, listener)) {
+            ListenerActorRefEntry listenerEntry = listenerActorMap.get(listener);
+
+            LOG.debug("Found {}", listenerEntry);
+
+            listenerEntry.referenceCount--;
+            if(listenerEntry.referenceCount <= 0) {
+                listenerActorMap.remove(listener);
+
+                if(listenerEntry.actorRef != null) {
+                    LOG.debug("Killing EntityOwnershipListenerActor {}", listenerEntry.actorRef);
+                    listenerEntry.actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
+                }
+            }
+        }
+    }
+
+    void notifyEntityOwnershipListeners(Entity entity, boolean wasOwner, boolean isOwner) {
+        Collection<EntityOwnershipListener> listeners = entityListenerMap.get(entity);
+        if(listeners.isEmpty()) {
+            return;
+        }
+
+        EntityOwnershipChanged changed = new EntityOwnershipChanged(entity, wasOwner, isOwner);
+        for(EntityOwnershipListener listener: listeners) {
+            ActorRef listenerActor = listenerActorFor(listener);
+
+            LOG.debug("Notifying EntityOwnershipListenerActor {} with {}", listenerActor,changed);
+
+            listenerActor.tell(changed, ActorRef.noSender());
+        }
+    }
+
+    private ActorRef listenerActorFor(EntityOwnershipListener listener) {
+        return listenerActorMap.get(listener).actorFor(listener);
+    }
+
+    private class ListenerActorRefEntry {
+        ActorRef actorRef;
+        int referenceCount = 1;
+
+        ActorRef actorFor(EntityOwnershipListener listener) {
+            if(actorRef == null) {
+                actorRef = actorContext.actorOf(EntityOwnershipListenerActor.props(listener));
+
+                LOG.debug("Created EntityOwnershipListenerActor {} for listener {}", actorRef, listener);
+            }
+
+            return actorRef;
+        }
+
+        @Override
+        public String toString() {
+            return "ListenerActorRefEntry [actorRef=" + actorRef + ", referenceCount=" + referenceCount + "]";
+        }
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/EntityOwnershipChanged.java b/opendaylight/md-sal/sal-distributed-datastore/src/main/java/org/opendaylight/controller/cluster/datastore/entityownership/messages/EntityOwnershipChanged.java
new file mode 100644 (file)
index 0000000..86fd9f7
--- /dev/null
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership.messages;
+
+import com.google.common.base.Preconditions;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+
+/**
+ * Message encapsulating an entity ownership change.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipChanged {
+    private final Entity entity;
+    private final boolean wasOwner;
+    private final boolean isOwner;
+
+    public EntityOwnershipChanged(Entity entity, boolean wasOwner, boolean isOwner) {
+        this.entity = Preconditions.checkNotNull(entity, "entity can't be null");
+        this.wasOwner = wasOwner;
+        this.isOwner = isOwner;
+    }
+
+    public Entity getEntity() {
+        return entity;
+    }
+
+    public boolean wasOwner() {
+        return wasOwner;
+    }
+
+    public boolean isOwner() {
+        return isOwner;
+    }
+
+    @Override
+    public String toString() {
+        return "EntityOwnershipChanged [entity=" + entity + ", wasOwner=" + wasOwner + ", isOwner=" + isOwner + "]";
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActorTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerActorTest.java
new file mode 100644 (file)
index 0000000..50ed12f
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.testkit.TestActorRef;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.datastore.entityownership.messages.EntityOwnershipChanged;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+
+/**
+ * Unit tests for EntityOwnershipListenerActor.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipListenerActorTest extends AbstractActorTest {
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+    @After
+    public void tearDown() {
+        actorFactory.close();
+    }
+
+    @Test
+    public void testOnEntityOwnershipChanged() {
+        EntityOwnershipListener mockListener = mock(EntityOwnershipListener.class);
+
+        TestActorRef<EntityOwnershipListenerActor> listenerActor = actorFactory.createTestActor(
+                EntityOwnershipListenerActor.props(mockListener), actorFactory.generateActorId("listener"));
+
+        Entity entity = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
+        boolean wasOwner = false;
+        boolean isOwner = true;
+        listenerActor.tell(new EntityOwnershipChanged(entity, wasOwner, isOwner), ActorRef.noSender());
+
+        verify(mockListener, timeout(5000)).ownershipChanged(entity, wasOwner, isOwner);
+    }
+
+    @Test
+    public void testOnEntityOwnershipChangedWithListenerEx() {
+        EntityOwnershipListener mockListener = mock(EntityOwnershipListener.class);
+
+        Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
+        doThrow(new RuntimeException("mock")).when(mockListener).ownershipChanged(entity1, false, true);
+        Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2")));
+        doNothing().when(mockListener).ownershipChanged(entity2, true, false);
+
+        TestActorRef<EntityOwnershipListenerActor> listenerActor = actorFactory.createTestActor(
+                EntityOwnershipListenerActor.props(mockListener), actorFactory.generateActorId("listener"));
+
+        listenerActor.tell(new EntityOwnershipChanged(entity1, false, true), ActorRef.noSender());
+        listenerActor.tell(new EntityOwnershipChanged(entity2, true, false), ActorRef.noSender());
+
+        verify(mockListener, timeout(5000)).ownershipChanged(entity2, true, false);
+    }
+}
diff --git a/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java b/opendaylight/md-sal/sal-distributed-datastore/src/test/java/org/opendaylight/controller/cluster/datastore/entityownership/EntityOwnershipListenerSupportTest.java
new file mode 100644 (file)
index 0000000..dbc5c2c
--- /dev/null
@@ -0,0 +1,142 @@
+/*
+ * Copyright (c) 2015 Brocade Communications Systems, Inc. and others.  All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+package org.opendaylight.controller.cluster.datastore.entityownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.actor.UntypedActorContext;
+import akka.testkit.JavaTestKit;
+import akka.testkit.TestActorRef;
+import com.google.common.util.concurrent.Uninterruptibles;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
+import org.junit.Test;
+import org.opendaylight.controller.cluster.datastore.AbstractActorTest;
+import org.opendaylight.controller.cluster.raft.TestActorFactory;
+import org.opendaylight.controller.cluster.raft.utils.DoNothingActor;
+import org.opendaylight.controller.md.sal.common.api.clustering.Entity;
+import org.opendaylight.controller.md.sal.common.api.clustering.EntityOwnershipListener;
+import org.opendaylight.yangtools.yang.common.QName;
+import org.opendaylight.yangtools.yang.data.api.YangInstanceIdentifier;
+import scala.collection.Iterator;
+import scala.collection.immutable.Iterable;
+
+/**
+ * Unit tests for EntityOwnershipListenerSupport.
+ *
+ * @author Thomas Pantelis
+ */
+public class EntityOwnershipListenerSupportTest extends AbstractActorTest {
+    private final TestActorFactory actorFactory = new TestActorFactory(getSystem());
+
+    @After
+    public void tearDown() {
+        actorFactory.close();
+    }
+
+    @Test
+    public void testNotifyEntityOwnershipListeners() {
+        TestActorRef<DoNothingActor> actor = actorFactory.createTestActor(
+                Props.create(DoNothingActor.class), actorFactory.generateActorId("test"));
+
+        UntypedActorContext actorContext = actor.underlyingActor().getContext();
+        EntityOwnershipListenerSupport support = new EntityOwnershipListenerSupport(actorContext);
+
+        EntityOwnershipListener mockListener1 = mock(EntityOwnershipListener.class, "EntityOwnershipListener1");
+        EntityOwnershipListener mockListener2 = mock(EntityOwnershipListener.class, "EntityOwnershipListener2");
+        Entity entity1 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id1")));
+        Entity entity2 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id2")));
+
+        // Add EntityOwnershipListener registrations.
+
+        support.addEntityOwnershipListener(entity1, mockListener1);
+        support.addEntityOwnershipListener(entity2, mockListener1);
+        support.addEntityOwnershipListener(entity1, mockListener2);
+
+        // Notify entity1 changed and verify both listeners are notified.
+
+        support.notifyEntityOwnershipListeners(entity1, false, true);
+
+        verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
+        verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
+        assertEquals("# of listener actors", 2, actorContext.children().size());
+
+        // Notify entity2 changed and verify only mockListener1 is notified.
+
+        support.notifyEntityOwnershipListeners(entity2, false, true);
+
+        verify(mockListener1, timeout(5000)).ownershipChanged(entity2, false, true);
+        verify(mockListener2, never()).ownershipChanged(entity2, false, true);
+        assertEquals("# of listener actors", 2, actorContext.children().size());
+
+        // Notify entity3 changed and verify neither listener is notified.
+
+        Entity entity3 = new Entity("test", YangInstanceIdentifier.of(QName.create("test", "id3")));
+        support.notifyEntityOwnershipListeners(entity3, false, true);
+
+        Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+        verify(mockListener1, never()).ownershipChanged(entity3, false, true);
+        verify(mockListener2, never()).ownershipChanged(entity3, false, true);
+
+        reset(mockListener1, mockListener2);
+
+        // Unregister mockListener1 for entity1, issue a change and verify only mockListener2 is notified.
+
+        support.removeEntityOwnershipListener(entity1, mockListener1);
+
+        support.notifyEntityOwnershipListeners(entity1, false, true);
+
+        verify(mockListener2, timeout(5000)).ownershipChanged(entity1, false, true);
+        verify(mockListener1, never()).ownershipChanged(entity1, false, true);
+
+        // Completely unregister both listeners and verify their listener actors are destroyed.
+
+        Iterable<ActorRef> listenerActors = actorContext.children();
+        assertEquals("# of listener actors", 2, listenerActors.size());
+
+        Iterator<ActorRef> iter = listenerActors.iterator();
+        ActorRef listenerActor1 = iter.next();
+        ActorRef listenerActor2 = iter.next();
+
+        JavaTestKit kit1 = new JavaTestKit(getSystem());
+        kit1.watch(listenerActor1);
+
+        JavaTestKit kit2 = new JavaTestKit(getSystem());
+        kit2.watch(listenerActor2);
+
+        support.removeEntityOwnershipListener(entity2, mockListener1);
+        support.removeEntityOwnershipListener(entity1, mockListener2);
+
+        kit1.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor1);
+        kit2.expectTerminated(JavaTestKit.duration("3 seconds"), listenerActor2);
+        assertEquals("# of listener actors", 0, actorContext.children().size());
+
+        // Re-register mockListener1 for entity1 and verify it is notified.
+
+        reset(mockListener1, mockListener2);
+
+        support.addEntityOwnershipListener(entity1, mockListener1);
+
+        support.notifyEntityOwnershipListeners(entity1, false, true);
+
+        verify(mockListener1, timeout(5000)).ownershipChanged(entity1, false, true);
+        verify(mockListener2, never()).ownershipChanged(entity2, false, true);
+
+        // Quickly register and unregister mockListener2 - expecting no exceptions.
+
+        support.addEntityOwnershipListener(entity1, mockListener2);
+        support.removeEntityOwnershipListener(entity1, mockListener2);
+    }
+}