Merge "Bug 5543 - Bo: Update JUnit tests part_11"
[openflowplugin.git] / applications / forwardingrules-sync / src / main / java / org / opendaylight / openflowplugin / applications / frsync / impl / ForwardingRulesSyncProvider.java
1 /**
2  * Copyright (c) 2016 Cisco Systems, Inc. and others.  All rights reserved.
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6  * and is available at http://www.eclipse.org/legal/epl-v10.html
7  */
8
9 package org.opendaylight.openflowplugin.applications.frsync.impl;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.util.concurrent.ListeningExecutorService;
13 import com.google.common.util.concurrent.MoreExecutors;
14 import com.google.common.util.concurrent.ThreadFactoryBuilder;
15 import java.util.concurrent.ExecutorService;
16 import java.util.concurrent.Executors;
17 import java.util.concurrent.TimeUnit;
18 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
19 import org.opendaylight.controller.md.sal.binding.api.DataTreeIdentifier;
20 import org.opendaylight.controller.md.sal.common.api.data.LogicalDatastoreType;
21 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
22 import org.opendaylight.controller.sal.binding.api.BindingAwareProvider;
23 import org.opendaylight.controller.sal.binding.api.RpcConsumerRegistry;
24 import org.opendaylight.openflowplugin.applications.frsync.NodeListener;
25 import org.opendaylight.openflowplugin.applications.frsync.SyncPlanPushStrategy;
26 import org.opendaylight.openflowplugin.applications.frsync.SyncReactor;
27 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeCachedDao;
28 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeDao;
29 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeOdlDao;
30 import org.opendaylight.openflowplugin.applications.frsync.dao.FlowCapableNodeSnapshotDao;
31 import org.opendaylight.openflowplugin.applications.frsync.impl.strategy.SyncPlanPushStrategyFlatBatchImpl;
32 import org.opendaylight.openflowplugin.applications.frsync.util.RetryRegistry;
33 import org.opendaylight.openflowplugin.applications.frsync.util.SemaphoreKeeperGuavaImpl;
34 import org.opendaylight.yang.gen.v1.urn.opendaylight.flat.batch.service.rev160321.SalFlatBatchService;
35 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
36 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.Nodes;
37 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
38 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
39 import org.opendaylight.yangtools.concepts.ListenerRegistration;
40 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 /**
45  * Top provider of forwarding rules synchronization functionality.
46  */
47 public class ForwardingRulesSyncProvider implements AutoCloseable, BindingAwareProvider {
48
49     private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesSyncProvider.class);
50
51     private final DataBroker dataService;
52     private final SalTableService salTableService;
53     private final SalFlatBatchService flatBatchService;
54
55     /** Wildcard path to flow-capable-node augmentation of inventory node. */
56     private static final InstanceIdentifier<FlowCapableNode> FLOW_CAPABLE_NODE_WC_PATH =
57             InstanceIdentifier.create(Nodes.class).child(Node.class).augmentation(FlowCapableNode.class);
58     /** Wildcard path to node (not flow-capable-node augmentation) of inventory node. */
59     private static final InstanceIdentifier<Node> NODE_WC_PATH =
60             InstanceIdentifier.create(Nodes.class).child(Node.class);
61
62     private final DataTreeIdentifier<FlowCapableNode> nodeConfigDataTreePath;
63     private final DataTreeIdentifier<Node> nodeOperationalDataTreePath;
64
65     private ListenerRegistration<NodeListener> dataTreeConfigChangeListener;
66     private ListenerRegistration<NodeListener> dataTreeOperationalChangeListener;
67
68     private final ListeningExecutorService syncThreadPool;
69
70     public ForwardingRulesSyncProvider(final BindingAwareBroker broker,
71                                        final DataBroker dataBroker,
72                                        final RpcConsumerRegistry rpcRegistry) {
73         Preconditions.checkArgument(rpcRegistry != null, "RpcConsumerRegistry can not be null !");
74         this.dataService = Preconditions.checkNotNull(dataBroker, "DataBroker can not be null!");
75         this.salTableService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalTableService.class),
76                 "RPC SalTableService not found.");
77         this.flatBatchService = Preconditions.checkNotNull(rpcRegistry.getRpcService(SalFlatBatchService.class),
78                 "RPC SalFlatBatchService not found.");
79
80         nodeConfigDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.CONFIGURATION, FLOW_CAPABLE_NODE_WC_PATH);
81         nodeOperationalDataTreePath = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NODE_WC_PATH);
82
83         final ExecutorService executorService= Executors.newCachedThreadPool(new ThreadFactoryBuilder()
84                 .setNameFormat(SyncReactorFutureDecorator.FRM_RPC_CLIENT_PREFIX + "%d")
85                 .setDaemon(false)
86                 .setUncaughtExceptionHandler((thread, e) -> LOG.error("Uncaught exception {}", thread, e))
87                 .build());
88         syncThreadPool = MoreExecutors.listeningDecorator(executorService);
89
90         broker.registerProvider(this);
91     }
92
93     @Override
94     public void onSessionInitiated(final BindingAwareBroker.ProviderContext providerContext) {
95         final TableForwarder tableForwarder = new TableForwarder(salTableService);
96
97         final SyncPlanPushStrategy syncPlanPushStrategy = new SyncPlanPushStrategyFlatBatchImpl()
98                 .setFlatBatchService(flatBatchService)
99                 .setTableForwarder(tableForwarder);
100
101         final RetryRegistry retryRegistry = new RetryRegistry();
102
103         final SyncReactor syncReactorImpl = new SyncReactorImpl(syncPlanPushStrategy);
104         final SyncReactor syncReactorRetry = new SyncReactorRetryDecorator(syncReactorImpl, retryRegistry);
105         final SyncReactor syncReactorGuard = new SyncReactorGuardDecorator(syncReactorRetry,
106                 new SemaphoreKeeperGuavaImpl<InstanceIdentifier<FlowCapableNode>>(1, true));
107
108         final SyncReactor reactor = new SyncReactorFutureZipDecorator(syncReactorGuard, syncThreadPool);
109
110         final FlowCapableNodeSnapshotDao configSnapshot = new FlowCapableNodeSnapshotDao();
111         final FlowCapableNodeSnapshotDao operationalSnapshot = new FlowCapableNodeSnapshotDao();
112         final FlowCapableNodeDao configDao = new FlowCapableNodeCachedDao(configSnapshot,
113                 new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.CONFIGURATION));
114         final FlowCapableNodeDao operationalDao = new FlowCapableNodeCachedDao(operationalSnapshot,
115                 new FlowCapableNodeOdlDao(dataService, LogicalDatastoreType.OPERATIONAL));
116
117         final NodeListener<FlowCapableNode> nodeListenerConfig =
118                 new SimplifiedConfigListener(reactor, configSnapshot, operationalDao);
119         final NodeListener<Node> nodeListenerOperational =
120                 new SimplifiedOperationalRetryListener(reactor, operationalSnapshot, configDao, retryRegistry);
121
122         dataTreeConfigChangeListener =
123                 dataService.registerDataTreeChangeListener(nodeConfigDataTreePath, nodeListenerConfig);
124         dataTreeOperationalChangeListener =
125                 dataService.registerDataTreeChangeListener(nodeOperationalDataTreePath, nodeListenerOperational);
126
127         LOG.info("ForwardingRulesSync has started.");
128     }
129
130     public void close() throws InterruptedException {
131         if (dataTreeConfigChangeListener != null) {
132             dataTreeConfigChangeListener.close();
133             dataTreeConfigChangeListener = null;
134         }
135
136         if (dataTreeOperationalChangeListener != null) {
137             dataTreeOperationalChangeListener.close();
138             dataTreeOperationalChangeListener = null;
139         }
140
141         syncThreadPool.shutdown();
142     }
143 }