2 * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved.
4 * This program and the accompanying materials are made available under the
5 * terms of the Eclipse Public License v1.0 which accompanies this distribution,
6 * and is available at http://www.eclipse.org/legal/epl-v10.html
8 package org.opendaylight.openflowplugin.applications.frm.impl;
10 import static java.util.Objects.requireNonNull;
12 import com.google.common.annotations.VisibleForTesting;
13 import com.google.common.base.Preconditions;
14 import com.google.common.util.concurrent.ListenableFuture;
15 import java.util.Optional;
16 import java.util.concurrent.ExecutionException;
17 import java.util.concurrent.atomic.AtomicLong;
18 import javax.annotation.PostConstruct;
19 import javax.annotation.PreDestroy;
20 import javax.inject.Inject;
21 import javax.inject.Singleton;
22 import org.eclipse.jdt.annotation.NonNull;
23 import org.opendaylight.mdsal.binding.api.DataBroker;
24 import org.opendaylight.mdsal.binding.api.ReadTransaction;
25 import org.opendaylight.mdsal.binding.api.RpcConsumerRegistry;
26 import org.opendaylight.mdsal.binding.api.RpcProviderService;
27 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
28 import org.opendaylight.mdsal.singleton.common.api.ClusterSingletonServiceProvider;
29 import org.opendaylight.openflowplugin.api.openflow.FlowGroupCacheManager;
30 import org.opendaylight.openflowplugin.api.openflow.configuration.ConfigurationService;
31 import org.opendaylight.openflowplugin.api.openflow.mastership.MastershipChangeServiceManager;
32 import org.opendaylight.openflowplugin.applications.frm.BundleMessagesCommiter;
33 import org.opendaylight.openflowplugin.applications.frm.FlowNodeReconciliation;
34 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesCommiter;
35 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager;
36 import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesProperty;
37 import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator;
38 import org.opendaylight.openflowplugin.applications.frm.nodeconfigurator.NodeConfiguratorImpl;
39 import org.opendaylight.openflowplugin.applications.frm.recovery.OpenflowServiceRecoveryHandler;
40 import org.opendaylight.openflowplugin.applications.reconciliation.NotificationRegistration;
41 import org.opendaylight.openflowplugin.applications.reconciliation.ReconciliationManager;
42 import org.opendaylight.serviceutils.srm.RecoverableListener;
43 import org.opendaylight.serviceutils.srm.ServiceRecoveryRegistry;
44 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode;
45 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.meters.Meter;
46 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.tables.table.Flow;
47 import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.service.rev130819.SalFlowService;
48 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.service.rev130918.SalGroupService;
49 import org.opendaylight.yang.gen.v1.urn.opendaylight.group.types.rev131018.groups.Group;
50 import org.opendaylight.yang.gen.v1.urn.opendaylight.inventory.rev130819.nodes.Node;
51 import org.opendaylight.yang.gen.v1.urn.opendaylight.meter.service.rev130918.SalMeterService;
52 import org.opendaylight.yang.gen.v1.urn.opendaylight.openflowplugin.extension.onf.bundle.service.rev170124.SalBundleService;
53 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.arbitrator.reconcile.service.rev180227.ArbitratorReconcileService;
54 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.app.forwardingrules.manager.config.rev160511.ForwardingRulesManagerConfig;
55 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.openflowplugin.rf.state.rev170713.ResultState;
56 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.service.rev131026.SalTableService;
57 import org.opendaylight.yang.gen.v1.urn.opendaylight.table.types.rev131026.table.features.TableFeatures;
58 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
63 * forwardingrules-manager org.opendaylight.openflowplugin.applications.frm.impl
66 * Manager and middle point for whole module. It contains ActiveNodeHolder and
67 * provide all RPC services.
71 public class ForwardingRulesManagerImpl implements ForwardingRulesManager {
72 private static final Logger LOG = LoggerFactory.getLogger(ForwardingRulesManagerImpl.class);
74 static final int STARTUP_LOOP_TICK = 1000;
75 static final int STARTUP_LOOP_MAX_RETRIES = 240;
76 private static final int FRM_RECONCILIATION_PRIORITY = Integer.getInteger("frm.reconciliation.priority", 1);
77 private static final String SERVICE_NAME = "FRM";
79 private final AtomicLong txNum = new AtomicLong();
80 private final DataBroker dataService;
81 private final SalFlowService salFlowService;
82 private final SalGroupService salGroupService;
83 private final SalMeterService salMeterService;
84 private final SalTableService salTableService;
85 private final ClusterSingletonServiceProvider clusterSingletonServiceProvider;
86 private final SalBundleService salBundleService;
87 private final AutoCloseable configurationServiceRegistration;
88 private final MastershipChangeServiceManager mastershipChangeServiceManager;
89 private final RpcProviderService rpcProviderService;
90 private ForwardingRulesCommiter<Flow> flowListener;
91 private ForwardingRulesCommiter<Group> groupListener;
92 private ForwardingRulesCommiter<Meter> meterListener;
93 private ForwardingRulesCommiter<TableFeatures> tableListener;
94 private BundleMessagesCommiter<Flow> bundleFlowListener;
95 private BundleMessagesCommiter<Group> bundleGroupListener;
96 private FlowNodeReconciliation nodeListener;
97 private NotificationRegistration reconciliationNotificationRegistration;
98 private FlowNodeConnectorInventoryTranslatorImpl flowNodeConnectorInventoryTranslatorImpl;
99 private DeviceMastershipManager deviceMastershipManager;
100 private final ReconciliationManager reconciliationManager;
101 private DevicesGroupRegistry devicesGroupRegistry;
102 private NodeConfigurator nodeConfigurator;
103 private final ArbitratorReconcileService arbitratorReconciliationManager;
104 private boolean disableReconciliation;
105 private boolean staleMarkingEnabled;
106 private int reconciliationRetryCount;
107 private boolean isBundleBasedReconciliationEnabled;
108 private final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler;
109 private final ServiceRecoveryRegistry serviceRecoveryRegistry;
110 private final FlowGroupCacheManager flowGroupCacheManager;
111 private final ListenerRegistrationHelper registrationHelper;
114 public ForwardingRulesManagerImpl(final DataBroker dataBroker,
115 final RpcConsumerRegistry rpcRegistry,
116 final RpcProviderService rpcProviderService,
117 final ForwardingRulesManagerConfig config,
118 final MastershipChangeServiceManager mastershipChangeServiceManager,
119 final ClusterSingletonServiceProvider clusterSingletonService,
120 final ConfigurationService configurationService,
121 final ReconciliationManager reconciliationManager,
122 final OpenflowServiceRecoveryHandler openflowServiceRecoveryHandler,
123 final ServiceRecoveryRegistry serviceRecoveryRegistry,
124 final FlowGroupCacheManager flowGroupCacheManager,
125 final ListenerRegistrationHelper registrationHelper) {
126 disableReconciliation = config.getDisableReconciliation();
127 staleMarkingEnabled = config.getStaleMarkingEnabled();
128 reconciliationRetryCount = config.getReconciliationRetryCount().toJava();
129 isBundleBasedReconciliationEnabled = config.getBundleBasedReconciliationEnabled();
130 this.configurationServiceRegistration = configurationService.registerListener(this);
131 this.registrationHelper = requireNonNull(registrationHelper, "RegistrationHelper cannot be null");
132 this.dataService = requireNonNull(dataBroker, "DataBroker can not be null!");
133 this.clusterSingletonServiceProvider = requireNonNull(clusterSingletonService,
134 "ClusterSingletonService provider can not be null");
135 this.reconciliationManager = reconciliationManager;
136 this.rpcProviderService = rpcProviderService;
137 this.mastershipChangeServiceManager = mastershipChangeServiceManager;
138 this.flowGroupCacheManager = flowGroupCacheManager;
140 Preconditions.checkArgument(rpcRegistry != null, "RpcProviderRegistry can not be null !");
142 this.salFlowService = requireNonNull(rpcRegistry.getRpcService(SalFlowService.class),
143 "RPC SalFlowService not found.");
144 this.salGroupService = requireNonNull(rpcRegistry.getRpcService(SalGroupService.class),
145 "RPC SalGroupService not found.");
146 this.salMeterService = requireNonNull(rpcRegistry.getRpcService(SalMeterService.class),
147 "RPC SalMeterService not found.");
148 this.salTableService = requireNonNull(rpcRegistry.getRpcService(SalTableService.class),
149 "RPC SalTableService not found.");
150 this.salBundleService = requireNonNull(rpcRegistry.getRpcService(SalBundleService.class),
151 "RPC SalBundlService not found.");
152 this.openflowServiceRecoveryHandler = requireNonNull(openflowServiceRecoveryHandler,
153 "Openflow service recovery handler cannot be null");
154 this.serviceRecoveryRegistry = requireNonNull(serviceRecoveryRegistry,
155 "Service recovery registry cannot be null");
156 this.arbitratorReconciliationManager =
157 requireNonNull(rpcRegistry.getRpcService(ArbitratorReconcileService.class),
158 "ArbitratorReconciliationManager can not be null!");
163 public void start() {
164 nodeConfigurator = new NodeConfiguratorImpl();
165 this.devicesGroupRegistry = new DevicesGroupRegistry();
166 this.nodeListener = new FlowNodeReconciliationImpl(this, dataService, SERVICE_NAME, FRM_RECONCILIATION_PRIORITY,
167 ResultState.DONOTHING, flowGroupCacheManager);
168 if (this.isReconciliationDisabled()) {
169 LOG.debug("Reconciliation is disabled by user");
171 this.reconciliationNotificationRegistration = reconciliationManager.registerService(this.nodeListener);
172 LOG.debug("Reconciliation is enabled by user and successfully registered to the reconciliation framework");
174 this.deviceMastershipManager = new DeviceMastershipManager(clusterSingletonServiceProvider, this.nodeListener,
175 dataService, mastershipChangeServiceManager, rpcProviderService,
176 new FrmReconciliationServiceImpl(this));
177 flowNodeConnectorInventoryTranslatorImpl = new FlowNodeConnectorInventoryTranslatorImpl(dataService);
179 this.bundleFlowListener = new BundleFlowForwarder(this);
180 this.bundleGroupListener = new BundleGroupForwarder(this);
181 this.flowListener = new FlowForwarder(this, dataService, registrationHelper);
182 this.groupListener = new GroupForwarder(this, dataService, registrationHelper);
183 this.meterListener = new MeterForwarder(this, dataService, registrationHelper);
184 this.tableListener = new TableForwarder(this, dataService, registrationHelper);
185 LOG.info("ForwardingRulesManager has started successfully.");
190 public void close() throws Exception {
191 configurationServiceRegistration.close();
193 if (this.flowListener != null) {
194 this.flowListener.close();
195 this.flowListener = null;
197 if (this.groupListener != null) {
198 this.groupListener.close();
199 this.groupListener = null;
201 if (this.meterListener != null) {
202 this.meterListener.close();
203 this.meterListener = null;
205 if (this.tableListener != null) {
206 this.tableListener.close();
207 this.tableListener = null;
209 if (this.nodeListener != null) {
210 this.nodeListener.close();
211 this.nodeListener = null;
213 if (deviceMastershipManager != null) {
214 deviceMastershipManager.close();
216 if (this.reconciliationNotificationRegistration != null) {
217 this.reconciliationNotificationRegistration.close();
218 this.reconciliationNotificationRegistration = null;
223 public ReadTransaction getReadTransaction() {
224 return dataService.newReadOnlyTransaction();
228 public String getNewTransactionId() {
229 return "DOM-" + txNum.getAndIncrement();
233 public boolean isNodeActive(final InstanceIdentifier<FlowCapableNode> ident) {
234 return deviceMastershipManager.isNodeActive(ident.firstKeyOf(Node.class).getId());
238 public boolean checkNodeInOperationalDataStore(final InstanceIdentifier<FlowCapableNode> ident) {
239 boolean result = false;
240 InstanceIdentifier<Node> nodeIid = ident.firstIdentifierOf(Node.class);
241 try (ReadTransaction transaction = dataService.newReadOnlyTransaction()) {
242 ListenableFuture<Optional<Node>> future = transaction
243 .read(LogicalDatastoreType.OPERATIONAL, nodeIid);
244 Optional<Node> optionalDataObject = future.get();
245 if (optionalDataObject.isPresent()) {
248 LOG.debug("{}: Failed to read {}", Thread.currentThread().getStackTrace()[1], nodeIid);
250 } catch (ExecutionException | InterruptedException e) {
251 LOG.warn("Failed to read {} ", nodeIid, e);
258 public SalFlowService getSalFlowService() {
259 return salFlowService;
263 public SalGroupService getSalGroupService() {
264 return salGroupService;
268 public SalMeterService getSalMeterService() {
269 return salMeterService;
273 public SalTableService getSalTableService() {
274 return salTableService;
278 public DevicesGroupRegistry getDevicesGroupRegistry() {
279 return this.devicesGroupRegistry;
283 public SalBundleService getSalBundleService() {
284 return salBundleService;
288 public ForwardingRulesCommiter<Flow> getFlowCommiter() {
293 public ForwardingRulesCommiter<Group> getGroupCommiter() {
294 return groupListener;
298 public ForwardingRulesCommiter<Meter> getMeterCommiter() {
299 return meterListener;
303 public ForwardingRulesCommiter<TableFeatures> getTableFeaturesCommiter() {
304 return tableListener;
308 public BundleMessagesCommiter<Flow> getBundleFlowListener() {
309 return bundleFlowListener;
313 public BundleMessagesCommiter<Group> getBundleGroupListener() {
314 return bundleGroupListener;
318 public ArbitratorReconcileService getArbitratorReconciliationManager() {
319 return arbitratorReconciliationManager;
323 public boolean isReconciliationDisabled() {
324 return disableReconciliation;
328 public boolean isStaleMarkingEnabled() {
329 return staleMarkingEnabled;
333 public int getReconciliationRetryCount() {
334 return reconciliationRetryCount;
338 public void addRecoverableListener(final RecoverableListener recoverableListener) {
339 serviceRecoveryRegistry.addRecoverableListener(openflowServiceRecoveryHandler.buildServiceRegistryKey(),
340 recoverableListener);
344 public FlowNodeConnectorInventoryTranslatorImpl getFlowNodeConnectorInventoryTranslatorImpl() {
345 return flowNodeConnectorInventoryTranslatorImpl;
349 public NodeConfigurator getNodeConfigurator() {
350 return nodeConfigurator;
353 public FlowNodeReconciliation getNodeListener() {
358 public boolean isBundleBasedReconciliationEnabled() {
359 return isBundleBasedReconciliationEnabled;
363 public boolean isNodeOwner(final InstanceIdentifier<FlowCapableNode> ident) {
364 return ident != null && deviceMastershipManager.isDeviceMastered(ident.firstKeyOf(Node.class).getId());
368 public void setDeviceMastershipManager(final DeviceMastershipManager deviceMastershipManager) {
369 this.deviceMastershipManager = deviceMastershipManager;
373 public void onPropertyChanged(@NonNull final String propertyName, @NonNull final String propertyValue) {
374 final ForwardingRulesProperty forwardingRulesProperty = ForwardingRulesProperty.forValue(propertyName);
375 if (forwardingRulesProperty != null) {
376 switch (forwardingRulesProperty) {
377 case DISABLE_RECONCILIATION:
378 disableReconciliation = Boolean.valueOf(propertyValue);
380 case STALE_MARKING_ENABLED:
381 staleMarkingEnabled = Boolean.valueOf(propertyValue);
383 case RECONCILIATION_RETRY_COUNT:
384 reconciliationRetryCount = Integer.parseInt(propertyValue);
386 case BUNDLE_BASED_RECONCILIATION_ENABLED:
387 isBundleBasedReconciliationEnabled = Boolean.valueOf(propertyValue);
390 LOG.warn("No forwarding rule property found.");