X-Git-Url: https://git.opendaylight.org/gerrit/gitweb?a=blobdiff_plain;f=applications%2Fforwardingrules-manager%2Fsrc%2Fmain%2Fjava%2Forg%2Fopendaylight%2Fopenflowplugin%2Fapplications%2Ffrm%2Fimpl%2FAbstractListeningCommiter.java;h=731232da9a71f552e1fa6ffca14518d5cafc4b88;hb=777c94332871b8c34f56f7f2010de1536cb759ba;hp=2fa433f0ca7bf51cb898f57fb5ee00a4004b2beb;hpb=474d93306f20da5b9de690aec208a98ece39d2f6;p=openflowplugin.git diff --git a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/AbstractListeningCommiter.java b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/AbstractListeningCommiter.java index 2fa433f0ca..731232da9a 100644 --- a/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/AbstractListeningCommiter.java +++ b/applications/forwardingrules-manager/src/main/java/org/opendaylight/openflowplugin/applications/frm/impl/AbstractListeningCommiter.java @@ -1,5 +1,5 @@ -/** - * Copyright (c) 2014 Cisco Systems, Inc. and others. All rights reserved. +/* + * Copyright (c) 2014, 2017 Cisco Systems, Inc. and others. All rights reserved. * * This program and the accompanying materials are made available under the * terms of the Eclipse Public License v1.0 which accompanies this distribution, @@ -7,98 +7,142 @@ */ package org.opendaylight.openflowplugin.applications.frm.impl; -import com.google.common.base.Preconditions; -import org.opendaylight.controller.md.sal.binding.api.DataObjectModification; -import org.opendaylight.controller.md.sal.binding.api.DataTreeModification; -import org.opendaylight.controller.md.sal.common.api.data.AsyncDataChangeEvent; +import static java.util.Objects.requireNonNull; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Collection; +import org.eclipse.jdt.annotation.Nullable; +import org.opendaylight.mdsal.binding.api.DataBroker; +import org.opendaylight.mdsal.binding.api.DataObjectModification; +import org.opendaylight.mdsal.binding.api.DataTreeIdentifier; +import org.opendaylight.mdsal.binding.api.DataTreeModification; +import org.opendaylight.mdsal.common.api.LogicalDatastoreType; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesCommiter; import org.opendaylight.openflowplugin.applications.frm.ForwardingRulesManager; +import org.opendaylight.openflowplugin.applications.frm.NodeConfigurator; +import org.opendaylight.serviceutils.srm.RecoverableListener; import org.opendaylight.yang.gen.v1.urn.opendaylight.flow.inventory.rev130819.FlowCapableNode; +import org.opendaylight.yangtools.concepts.ListenerRegistration; import org.opendaylight.yangtools.yang.binding.DataObject; import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; - /** - * AbstractChangeListner implemented basic {@link AsyncDataChangeEvent} processing for - * flow node subDataObject (flows, groups and meters). - * - * @author Vaclav Demcak - * + * AbstractChangeListner implemented basic {@link org.opendaylight.mdsal.binding.api.DataTreeModification} + * processing for flow node subDataObject (flows, groups and meters). */ -public abstract class AbstractListeningCommiter implements ForwardingRulesCommiter { +public abstract class AbstractListeningCommiter + implements ForwardingRulesCommiter, RecoverableListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractListeningCommiter.class); + final ForwardingRulesManager provider; + NodeConfigurator nodeConfigurator; + protected final DataBroker dataBroker; + protected final ListenerRegistrationHelper registrationHelper; + protected ListenerRegistration listenerRegistration; - protected ForwardingRulesManager provider; - - protected final Class clazz; + @SuppressFBWarnings(value = "MC_OVERRIDABLE_METHOD_CALL_IN_CONSTRUCTOR", justification = "See FIXME below") + protected AbstractListeningCommiter(final ForwardingRulesManager provider, final DataBroker dataBroker, + final ListenerRegistrationHelper registrationHelper) { + this.provider = requireNonNull(provider, "ForwardingRulesManager can not be null!"); + this.nodeConfigurator = requireNonNull(provider.getNodeConfigurator(), "NodeConfigurator can not be null!"); + this.dataBroker = requireNonNull(dataBroker, "DataBroker can not be null!"); + this.registrationHelper = requireNonNull(registrationHelper, "registrationHelper can not be null!"); - public AbstractListeningCommiter (ForwardingRulesManager provider, Class clazz) { - this.provider = Preconditions.checkNotNull(provider, "ForwardingRulesManager can not be null!"); - this.clazz = Preconditions.checkNotNull(clazz, "Class can not be null!"); + // FIXME: this may start listening on an uninitialized object: clean up the lifecycle here + registerListener(); + provider.addRecoverableListener(this); } + @SuppressWarnings("checkstyle:IllegalCatch") @Override - public void onDataTreeChanged(Collection> changes) { - Preconditions.checkNotNull(changes, "Changes may not be null!"); + public void onDataTreeChanged(final Collection> changes) { + LOG.trace("Received data changes :{}", requireNonNull(changes, "Changes may not be null!")); for (DataTreeModification change : changes) { final InstanceIdentifier key = change.getRootPath().getRootIdentifier(); final DataObjectModification mod = change.getRootNode(); final InstanceIdentifier nodeIdent = key.firstIdentifierOf(FlowCapableNode.class); - - if (preConfigurationCheck(nodeIdent)) { - switch (mod.getModificationType()) { - case DELETE: - remove(key, mod.getDataBefore(), nodeIdent); - break; - case SUBTREE_MODIFIED: - update(key, mod.getDataBefore(), mod.getDataAfter(), nodeIdent); - break; - case WRITE: - if (mod.getDataBefore() == null) { - add(key, mod.getDataAfter(), nodeIdent); - } else { - update(key, mod.getDataBefore(), mod.getDataAfter(), nodeIdent); - } - break; - default: - throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType()); - } - } - else{ - if (provider.getConfiguration().isStaleMarkingEnabled()) { - LOG.info("Stale-Marking ENABLED and switch {} is NOT connected, storing stale entities", - nodeIdent.toString()); - // Switch is NOT connected + try { + if (preConfigurationCheck(nodeIdent)) { switch (mod.getModificationType()) { case DELETE: - createStaleMarkEntity(key, mod.getDataBefore(), nodeIdent); + remove(key, mod.getDataBefore(), nodeIdent); break; case SUBTREE_MODIFIED: + update(key, mod.getDataBefore(), mod.getDataAfter(), nodeIdent); break; case WRITE: + if (mod.getDataBefore() == null) { + add(key, mod.getDataAfter(), nodeIdent); + } else { + update(key, mod.getDataBefore(), mod.getDataAfter(), nodeIdent); + } break; default: - throw new IllegalArgumentException("Unhandled modification type " + mod.getModificationType()); + throw new + IllegalArgumentException("Unhandled modification type " + + mod.getModificationType()); + } + } else { + if (provider.isStaleMarkingEnabled()) { + LOG.info("Stale-Marking ENABLED and switch {} is NOT connected, storing stale entities", + nodeIdent.toString()); + // Switch is NOT connected + switch (mod.getModificationType()) { + case DELETE: + createStaleMarkEntity(key, mod.getDataBefore(), nodeIdent); + break; + case SUBTREE_MODIFIED: + break; + case WRITE: + break; + default: + throw new + IllegalArgumentException("Unhandled modification type " + + mod.getModificationType()); + } } } + } catch (RuntimeException e) { + LOG.error("Failed to handle event {} key {} due to error ", mod.getModificationType(), key, e); } } } + @Override + public final void registerListener() { + final DataTreeIdentifier treeId = + DataTreeIdentifier.create(LogicalDatastoreType.CONFIGURATION, getWildCardPath()); + Futures.addCallback(registrationHelper.checkedRegisterListener(treeId, this), + new FutureCallback>() { + @Override + public void onSuccess( + @Nullable final ListenerRegistration flowListenerRegistration) { + LOG.info("{} registered successfully", flowListenerRegistration.getInstance()); + listenerRegistration = flowListenerRegistration; + } + + @Override + public void onFailure(final Throwable throwable) { + LOG.error("Registration failed ", throwable); + } + }, MoreExecutors.directExecutor()); + } + /** * Method return wildCardPath for Listener registration - * and for identify the correct KeyInstanceIdentifier from data; + * and for identify the correct KeyInstanceIdentifier from data. */ protected abstract InstanceIdentifier getWildCardPath(); private boolean preConfigurationCheck(final InstanceIdentifier nodeIdent) { - Preconditions.checkNotNull(nodeIdent, "FlowCapableNode ident can not be null!"); + requireNonNull(nodeIdent, "FlowCapableNode identifier can not be null!"); // In single node cluster, node should be in local cache before we get any flow/group/meter // data change event from data store. So first check should pass. // In case of 3-node cluster, when shard leader changes, clustering will send blob of data @@ -106,20 +150,8 @@ public abstract class AbstractListeningCommiter implement // should get populated. But to handle a scenario where flow request comes before the blob // of config/operational data gets processes, it won't find node in local cache and it will // skip the flow/group/meter operational. This requires an addition check, where it reads - // node from operational data store and if it's present it calls flowNodeConnected to explictly + // node from operational data store and if it's present it calls flowNodeConnected to explicitly // trigger the event of new node connected. - - if(!provider.isNodeOwner(nodeIdent)) { return false; } - - if (!provider.isNodeActive(nodeIdent)) { - if (provider.checkNodeInOperationalDataStore(nodeIdent)) { - provider.getFlowNodeReconciliation().flowNodeConnected(nodeIdent); - return true; - } else { - return false; - } - } - return true; + return provider.isNodeOwner(nodeIdent); } -} - +} \ No newline at end of file