2 * Copyright (c) 2020 Ericsson India Global Services Pvt Ltd. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import static org.opendaylight.openflowplugin.applications.frm.util.FrmUtil.getInventoryConfigDataStoreStatus;
12 import com.google.common.util.concurrent.ListenableFuture;
13 import com.google.common.util.concurrent.ListeningExecutorService;
14 import com.google.common.util.concurrent.MoreExecutors;
15 import com.google.common.util.concurrent.ThreadFactoryBuilder;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.TimeUnit;
18 import javax.inject.Inject;
19 import javax.inject.Singleton;
20 import org.opendaylight.mdsal.binding.api.ClusteredDataTreeChangeListener;
21 import org.opendaylight.mdsal.binding.api.DataBroker;
22 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
23 import org.opendaylight.openflowplugin.common.wait.SimpleTaskRetryLooper;
24 import org.opendaylight.yangtools.concepts.ListenerRegistration;
25 import org.opendaylight.yangtools.yang.binding.DataObject;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
31 public class ListenerRegistrationHelper {
32 private static final Logger LOG = LoggerFactory.getLogger(ListenerRegistrationHelper.class);
33 private static final long INVENTORY_CHECK_TIMER = 1;
34 private final String operational = "OPERATIONAL";
35 private final ListeningExecutorService listeningExecutorService;
36 private final DataBroker dataBroker;
39 public ListenerRegistrationHelper(final DataBroker dataBroker) {
40 this.dataBroker = dataBroker;
41 listeningExecutorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(
42 new ThreadFactoryBuilder()
43 .setNameFormat("frm-listener" + "%d")
45 .setUncaughtExceptionHandler((thread, ex) -> LOG.error("Uncaught exception {}", thread, ex))
49 public <T extends DataObject, L extends ClusteredDataTreeChangeListener<T>>
50 ListenableFuture<ListenerRegistration<L>>
51 checkedRegisterListener(DataTreeIdentifier<T> treeId, L listener) {
52 return listeningExecutorService.submit(() -> {
53 while (! getInventoryConfigDataStoreStatus().equals(operational)) {
55 LOG.debug("Retrying for datastore to become operational for listener {}", listener);
56 Thread.sleep(INVENTORY_CHECK_TIMER * 1000);
57 } catch (InterruptedException e) {
58 LOG.info("registerDataTreeChangeListener thread is interrupted");
59 Thread.currentThread().interrupt();
62 SimpleTaskRetryLooper looper = new SimpleTaskRetryLooper(ForwardingRulesManagerImpl.STARTUP_LOOP_TICK,
63 ForwardingRulesManagerImpl.STARTUP_LOOP_MAX_RETRIES);
64 return looper.loopUntilNoException(() -> dataBroker.registerDataTreeChangeListener(treeId, listener));
68 public void close() throws Exception {
69 MoreExecutors.shutdownAndAwaitTermination(listeningExecutorService, 5, TimeUnit.SECONDS);