*/
package org.opendaylight.controller.md.sal.binding.api;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.TimeUnit;
import org.opendaylight.yangtools.yang.binding.Notification;
* notification has or has not been seen.
*/
public interface NotificationPublishService extends BindingService {
+
+ /**
+ * Well-known value indicating that the binding-aware implementation is currently not
+ * able to accept a notification.
+ */
+ ListenableFuture<Object> REJECTED = Futures.immediateFailedFuture(new NotificationRejectedException("Rejected due to resource constraints."));
+
/**
* Publishes a notification to subscribed listeners. This initiates
* the process of sending the notification, but delivery to the
* listeners can happen asynchronously, potentially after a call to
* this method returns.
*
- * This method is guaranteed not to block.
+ * Still guaranteed not to block. Returns Listenable Future which will complete once.
*
* @param notification
* the notification to publish.
- * @return true if the notification was accepted for processing, false otherwise
+ * @return A listenable future which will report completion when the service has finished
+ * propagating the notification to its immediate registrants, or {@value #REJECTED} if resource
+ * constraints prevent
* @throws NullPointerException if the notification is null
*/
- boolean offerNotification(Notification notification);
+ ListenableFuture<? extends Object> offerNotification(Notification notification);
/**
* Publishes a notification to subscribed listeners. This initiates
* @param timeout how long to wait before giving up, in units of unit
* @param unit a TimeUnit determining how to interpret the
* timeout parameter
- * @return true if the notification was accepted for processing, false otherwise
+ * @return A listenable future which will report completion when the service has finished
+ * propagating the notification to its immediate registrants, or {@value #REJECTED} if resource
+ * constraints prevent
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if the notification or unit is null
* @throws IllegalArgumentException if timeout is negative.
*/
- boolean offerNotification(Notification notification, int timeout, TimeUnit unit)
- throws InterruptedException;
+ ListenableFuture<? extends Object> offerNotification(Notification notification, int timeout, TimeUnit unit)
+ throws InterruptedException;
+
}
--- /dev/null
+/*
+ * Copyright (c) 2015 Cisco Systems, Inc. and others. All rights reserved.
+ *
+ * This program and the accompanying materials are made available under the
+ * terms of the Eclipse Public License v1.0 which accompanies this distribution,
+ * and is available at http://www.eclipse.org/legal/epl-v10.html
+ */
+
+package org.opendaylight.controller.md.sal.binding.api;
+
+/**
+ * <p/>
+ * This exception indicates that given notification can not be processed by corresponding mechanism.
+ * More info can be provided in message.
+ * <p/>
+ * <p>
+ * Expected use: {@link NotificationPublishService}
+ * </p>
+ */
+public class NotificationRejectedException extends Exception {
+
+ private static final long serialVersionUID = 3722954527834860394L;
+
+ public NotificationRejectedException(String message) {
+ super(message);
+ }
+}
}
@Override
- public boolean offerNotification(final Notification notification) {
- final ListenableFuture<?> listenableFuture = domPublishService.offerNotification(toDomNotification(notification));
- return !DOMNotificationPublishService.REJECTED.equals(listenableFuture);
+ public ListenableFuture<? extends Object> offerNotification(final Notification notification) {
+ ListenableFuture<?> offerResult = domPublishService.offerNotification(toDomNotification(notification));
+ return DOMNotificationPublishService.REJECTED.equals(offerResult)
+ ? NotificationPublishService.REJECTED
+ : offerResult;
}
@Override
- public boolean offerNotification(final Notification notification, final int timeout, final TimeUnit unit) throws InterruptedException {
- final ListenableFuture<?> listenableFuture =
- domPublishService.offerNotification(toDomNotification(notification), timeout, unit);
- return !DOMNotificationPublishService.REJECTED.equals(listenableFuture);
+ public ListenableFuture<? extends Object> offerNotification(final Notification notification, final int timeout, final TimeUnit unit) throws InterruptedException {
+ ListenableFuture<?> offerResult = domPublishService.offerNotification(toDomNotification(notification), timeout, unit);
+ return DOMNotificationPublishService.REJECTED.equals(offerResult)
+ ? NotificationPublishService.REJECTED
+ : offerResult;
}
private DOMNotification toDomNotification(final Notification notification) {
package org.opendaylight.controller.md.sal.binding.impl.test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.junit.Assert;
import org.junit.Test;
+import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.md.sal.binding.test.AbstractNotificationBrokerTest;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.OpendaylightMdsalListTestListener;
import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.controller.md.sal.test.list.rev140701.TwoLevelListChanged;
import org.opendaylight.yangtools.concepts.ListenerRegistration;
import org.opendaylight.yangtools.yang.binding.YangModuleInfo;
import org.opendaylight.yangtools.yang.binding.util.BindingReflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ForwardedNotificationAdapterTest extends AbstractNotificationBrokerTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ForwardedNotificationAdapterTest.class);
+
@Override
protected Iterable<YangModuleInfo> getModuleInfos() throws Exception {
return ImmutableSet.of(BindingReflections.getModuleInfo(TwoLevelListChanged.class));
final TestNotifListener testNotifListener = new TestNotifListener(latch);
final ListenerRegistration<TestNotifListener> listenerRegistration = getNotificationService()
.registerNotificationListener(testNotifListener);
- assertTrue(getNotificationPublishService().offerNotification(testData));
+ try {
+ getNotificationPublishService().offerNotification(testData).get(1, TimeUnit.SECONDS);
+ } catch (ExecutionException | TimeoutException e) {
+ LOG.error("Notification delivery failed", e);
+ Assert.fail("notification should be delivered");
+ }
latch.await();
assertTrue(testNotifListener.getReceivedNotifications().size() == 1);
final TestNotifListener testNotifListener = new TestNotifListener(latch);
final ListenerRegistration<TestNotifListener> listenerRegistration = getNotificationService()
.registerNotificationListener(testNotifListener);
- assertTrue(getNotificationPublishService().offerNotification(testData, 5, TimeUnit.SECONDS));
+ assertNotSame(NotificationPublishService.REJECTED,
+ getNotificationPublishService().offerNotification(testData, 5, TimeUnit.SECONDS));
latch.await();
assertTrue(testNotifListener.getReceivedNotifications().size() == 1);