2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.Optional;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.PostConstruct;
19 import javax.annotation.PreDestroy;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.mdsal.binding.api.DataBroker;
24 import org.opendaylight.mdsal.binding.api.ReadTransaction;
25 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
26 import org.opendaylight.mdsal.binding.api.RpcProviderService;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
29 import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
30 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
31 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
32 import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
33 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
34 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesCommiter;
35 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
36 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesProperty;
37 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
38 import org.opendaylight.openflowplugin.applications.frm.nodeconfigurator.NodeConfiguratorImpl;
39 import org.opendaylight.openflowplugin.applications.frm.recovery.OpenflowServiceRecoveryHandler;
40 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
41 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
42 import org.opendaylight.serviceutils.srm.RecoverableListener;
43 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.forwardingrules.manager.config.rev160511.ForwardingRulesManagerConfig;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
58 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 * forwardingrules-manager org.opendaylight.openflowplugin.applications.frm.impl
66 * Manager and middle point for whole module. It contains ActiveNodeHolder and
67 * provide all RPC services.
71 public final class ForwardingRulesManagerImpl implements ForwardingRulesManager {
72 private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesManagerImpl.class);
74 private static final int FRM_RECONCILIATION_PRIORITY = Integer.getInteger("frm.reconciliation.priority", 1);
75 private static final String SERVICE_NAME = "FRM";
77 private final AtomicLong txNum = new AtomicLong();
78 private final DataBroker dataService;
79 private final SalFlowService salFlowService;
80 private final SalGroupService salGroupService;
81 private final SalMeterService salMeterService;
82 private final SalTableService salTableService;
83 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
84 private final SalBundleService salBundleService;
85 private final AutoCloseable configurationServiceRegistration;
86 private final MastershipChangeServiceManager mastershipChangeServiceManager;
87 private final RpcProviderService rpcProviderService;
88 private ForwardingRulesCommiter<Flow> flowListener;
89 private ForwardingRulesCommiter<Group> groupListener;
90 private ForwardingRulesCommiter<Meter> meterListener;
91 private ForwardingRulesCommiter<TableFeatures> tableListener;
92 private BundleMessagesCommiter<Flow> bundleFlowListener;
93 private BundleMessagesCommiter<Group> bundleGroupListener;
94 private FlowNodeReconciliation nodeListener;
95 private NotificationRegistration reconciliationNotificationRegistration;
96 private FlowNodeConnectorInventoryTranslatorImpl flowNodeConnectorInventoryTranslatorImpl;
97 private DeviceMastershipManager deviceMastershipManager;
98 private final ReconciliationManager reconciliationManager;
99 private DevicesGroupRegistry devicesGroupRegistry;
100 private NodeConfigurator nodeConfigurator;
101 private final ArbitratorReconcileService arbitratorReconciliationManager;
102 private boolean disableReconciliation;
103 private boolean staleMarkingEnabled;
104 private int reconciliationRetryCount;
105 private boolean isBundleBasedReconciliationEnabled;
106 private final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler;
107 private final ServiceRecoveryRegistry serviceRecoveryRegistry;
108 private final FlowGroupCacheManager flowGroupCacheManager;
109 private final ListenerRegistrationHelper registrationHelper;
112 public ForwardingRulesManagerImpl(final DataBroker dataBroker,
113 final RpcConsumerRegistry rpcRegistry,
114 final RpcProviderService rpcProviderService,
115 final ForwardingRulesManagerConfig config,
116 final MastershipChangeServiceManager mastershipChangeServiceManager,
117 final ClusterSingletonServiceProvider clusterSingletonService,
118 final ConfigurationService configurationService,
119 final ReconciliationManager reconciliationManager,
120 final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler,
121 final ServiceRecoveryRegistry serviceRecoveryRegistry,
122 final FlowGroupCacheManager flowGroupCacheManager,
123 final ListenerRegistrationHelper registrationHelper) {
124 disableReconciliation = config.getDisableReconciliation();
125 staleMarkingEnabled = config.getStaleMarkingEnabled();
126 reconciliationRetryCount = config.getReconciliationRetryCount().toJava();
127 isBundleBasedReconciliationEnabled = config.getBundleBasedReconciliationEnabled();
128 configurationServiceRegistration = configurationService.registerListener(this);
129 dataService = requireNonNull(dataBroker, "DataBroker can not be null!");
130 this.registrationHelper = requireNonNull(registrationHelper, "RegistrationHelper cannot be null");
131 clusterSingletonServiceProvider = requireNonNull(clusterSingletonService,
132 "ClusterSingletonService provider can not be null");
133 this.reconciliationManager = reconciliationManager;
134 this.rpcProviderService = rpcProviderService;
135 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
136 this.flowGroupCacheManager = flowGroupCacheManager;
138 Preconditions.checkArgument(rpcRegistry != null, "RpcProviderRegistry can not be null !");
140 salFlowService = requireNonNull(rpcRegistry.getRpcService(SalFlowService.class),
141 "RPC SalFlowService not found.");
142 salGroupService = requireNonNull(rpcRegistry.getRpcService(SalGroupService.class),
143 "RPC SalGroupService not found.");
144 salMeterService = requireNonNull(rpcRegistry.getRpcService(SalMeterService.class),
145 "RPC SalMeterService not found.");
146 salTableService = requireNonNull(rpcRegistry.getRpcService(SalTableService.class),
147 "RPC SalTableService not found.");
148 salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class),
149 "RPC SalBundlService not found.");
150 this.openflowServiceRecoveryHandler = requireNonNull(openflowServiceRecoveryHandler,
151 "Openflow service recovery handler cannot be null");
152 this.serviceRecoveryRegistry = requireNonNull(serviceRecoveryRegistry,
153 "Service recovery registry cannot be null");
154 arbitratorReconciliationManager =
155 requireNonNull(rpcRegistry.getRpcService(ArbitratorReconcileService.class),
156 "ArbitratorReconciliationManager can not be null!");
161 public void start() {
162 nodeConfigurator = new NodeConfiguratorImpl();
163 devicesGroupRegistry = new DevicesGroupRegistry();
164 nodeListener = new FlowNodeReconciliationImpl(this, dataService, SERVICE_NAME, FRM_RECONCILIATION_PRIORITY,
165 ResultState.DONOTHING, flowGroupCacheManager);
166 if (isReconciliationDisabled()) {
167 LOG.debug("Reconciliation is disabled by user");
169 reconciliationNotificationRegistration = reconciliationManager.registerService(nodeListener);
170 LOG.debug("Reconciliation is enabled by user and successfully registered to the reconciliation framework");
172 deviceMastershipManager = new DeviceMastershipManager(clusterSingletonServiceProvider, nodeListener,
173 dataService, mastershipChangeServiceManager, rpcProviderService,
174 new FrmReconciliationServiceImpl(this));
175 flowNodeConnectorInventoryTranslatorImpl = new FlowNodeConnectorInventoryTranslatorImpl(dataService);
177 bundleFlowListener = new BundleFlowForwarder(this);
178 bundleGroupListener = new BundleGroupForwarder(this);
179 flowListener = new FlowForwarder(this, dataService, registrationHelper);
180 groupListener = new GroupForwarder(this, dataService, registrationHelper);
181 meterListener = new MeterForwarder(this, dataService, registrationHelper);
182 tableListener = new TableForwarder(this, dataService, registrationHelper);
183 LOG.info("ForwardingRulesManager has started successfully.");
188 public void close() throws Exception {
189 configurationServiceRegistration.close();
191 if (flowListener != null) {
192 flowListener.close();
195 if (groupListener != null) {
196 groupListener.close();
197 groupListener = null;
199 if (meterListener != null) {
200 meterListener.close();
201 meterListener = null;
203 if (tableListener != null) {
204 tableListener.close();
205 tableListener = null;
207 if (nodeListener != null) {
208 nodeListener.close();
211 if (deviceMastershipManager != null) {
212 deviceMastershipManager.close();
214 if (reconciliationNotificationRegistration != null) {
215 reconciliationNotificationRegistration.close();
216 reconciliationNotificationRegistration = null;
221 public ReadTransaction getReadTransaction() {
222 return dataService.newReadOnlyTransaction();
226 public String getNewTransactionId() {
227 return "DOM-" + txNum.getAndIncrement();
231 public boolean isNodeActive(final InstanceIdentifier<FlowCapableNode> ident) {
232 return deviceMastershipManager.isNodeActive(ident.firstKeyOf(Node.class).getId());
236 public boolean checkNodeInOperationalDataStore(final InstanceIdentifier<FlowCapableNode> ident) {
237 boolean result = false;
238 InstanceIdentifier<Node> nodeIid = ident.firstIdentifierOf(Node.class);
239 try (ReadTransaction transaction = dataService.newReadOnlyTransaction()) {
240 ListenableFuture<Optional<Node>> future = transaction
241 .read(LogicalDatastoreType.OPERATIONAL, nodeIid);
242 Optional<Node> optionalDataObject = future.get();
243 if (optionalDataObject.isPresent()) {
246 LOG.debug("{}: Failed to read {}", Thread.currentThread().getStackTrace()[1], nodeIid);
248 } catch (ExecutionException | InterruptedException e) {
249 LOG.warn("Failed to read {} ", nodeIid, e);
256 public SalFlowService getSalFlowService() {
257 return salFlowService;
261 public SalGroupService getSalGroupService() {
262 return salGroupService;
266 public SalMeterService getSalMeterService() {
267 return salMeterService;
271 public SalTableService getSalTableService() {
272 return salTableService;
276 public DevicesGroupRegistry getDevicesGroupRegistry() {
277 return devicesGroupRegistry;
281 public SalBundleService getSalBundleService() {
282 return salBundleService;
286 public ForwardingRulesCommiter<Flow> getFlowCommiter() {
291 public ForwardingRulesCommiter<Group> getGroupCommiter() {
292 return groupListener;
296 public ForwardingRulesCommiter<Meter> getMeterCommiter() {
297 return meterListener;
301 public ForwardingRulesCommiter<TableFeatures> getTableFeaturesCommiter() {
302 return tableListener;
306 public BundleMessagesCommiter<Flow> getBundleFlowListener() {
307 return bundleFlowListener;
311 public BundleMessagesCommiter<Group> getBundleGroupListener() {
312 return bundleGroupListener;
316 public ArbitratorReconcileService getArbitratorReconciliationManager() {
317 return arbitratorReconciliationManager;
321 public boolean isReconciliationDisabled() {
322 return disableReconciliation;
326 public boolean isStaleMarkingEnabled() {
327 return staleMarkingEnabled;
331 public int getReconciliationRetryCount() {
332 return reconciliationRetryCount;
336 public void addRecoverableListener(final RecoverableListener recoverableListener) {
337 serviceRecoveryRegistry.addRecoverableListener(openflowServiceRecoveryHandler.buildServiceRegistryKey(),
338 recoverableListener);
342 public FlowNodeConnectorInventoryTranslatorImpl getFlowNodeConnectorInventoryTranslatorImpl() {
343 return flowNodeConnectorInventoryTranslatorImpl;
347 public NodeConfigurator getNodeConfigurator() {
348 return nodeConfigurator;
351 public FlowNodeReconciliation getNodeListener() {
356 public boolean isBundleBasedReconciliationEnabled() {
357 return isBundleBasedReconciliationEnabled;
361 public boolean isNodeOwner(final InstanceIdentifier<FlowCapableNode> ident) {
362 return ident != null && deviceMastershipManager.isDeviceMastered(ident.firstKeyOf(Node.class).getId());
366 public void setDeviceMastershipManager(final DeviceMastershipManager deviceMastershipManager) {
367 this.deviceMastershipManager = deviceMastershipManager;
371 public void onPropertyChanged(@NonNull final String propertyName, @NonNull final String propertyValue) {
372 final ForwardingRulesProperty forwardingRulesProperty = ForwardingRulesProperty.forValue(propertyName);
373 if (forwardingRulesProperty != null) {
374 switch (forwardingRulesProperty) {
375 case DISABLE_RECONCILIATION:
376 disableReconciliation = Boolean.parseBoolean(propertyValue);
378 case STALE_MARKING_ENABLED:
379 staleMarkingEnabled = Boolean.parseBoolean(propertyValue);
381 case RECONCILIATION_RETRY_COUNT:
382 reconciliationRetryCount = Integer.parseInt(propertyValue);
384 case BUNDLE_BASED_RECONCILIATION_ENABLED:
385 isBundleBasedReconciliationEnabled = Boolean.parseBoolean(propertyValue);
388 LOG.warn("No forwarding rule property found.");