__author__ = 'Martin Lauko, Dusan Madar'
__copyright__ = "Copyright(c) 2015, Cisco Systems, Inc."
__version__ = "0.1"
__status__ = "alpha"
# nfq -> NetFilterQueue
# nsh -> Network Service Headers
# fwd -> forwarder/forwarding
-# tun -> tunnel
-# sp -> Service Path
-# spi -> Service Path Id
# acl -> access list
# ace -> access list entry
+# `rule(s)` and `chain(s)` can be used interchangeably in ip(6)tables context
logger = logging.getLogger('classifier')
logger.setLevel(logging.DEBUG)
+# silence `requests` module logging
+requests_logger = logging.getLogger('requests')
+requests_logger.setLevel(logging.WARNING)
+
#: constants
-NFQ = 'NFQUEUE'
-NFQ_NUMBER = 2
IPV4 = 4
IPv6 = 6
+NFQ_NUMBER = 2
#: ACE items to ip(6)tables flags/types mapping
'destination-port-range': '--dport'}
}
+#: IP version to NSH next protocol mapping
+ipv_2_next_protocol = {4: 0x1, # IPv4
+ 6: 0x2, # IPv6
+ 10: 0x3} # Ethernet
+
def run_cmd(cmd):
"""
"""
cmd = [str(cmd_part) for cmd_part in cmd]
+ logger.debug('Executing command: %s', ' '.join(cmd))
try:
- logger.debug('Executing command: `%s`', ' '.join(cmd))
+ process = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ _, err = process.communicate()
+
+ if process.returncode != 0:
+ err = err.strip()
+ logger.exception(err.decode())
- subprocess.check_call(cmd)
- except subprocess.CalledProcessError:
+ except OSError:
logger.exception('Command execution failed')
iptables = 'iptables'
ip6tables = 'ip6tables'
- if IPV4 in ipv and IPv6 in ipv:
+ if (IPV4 in ipv) and (IPv6 in ipv):
ip_tables = (iptables, ip6tables)
elif IPV4 in ipv:
ip_tables = (iptables,)
How it works:
1. sfc_agent receives an ACL and passes it for processing
- 2. the RSP referenced by ACL is requested from ODL
+ 2. the RSP (its SFF locator) referenced by ACL is requested from ODL
3. if the RSP exists in the ODL iptables rules for it are applied
After this process is over, every packet successfully matched to an
- iptables rule will be NSH encapsulated and traverses appropriate RSP.
+ iptables rule (i.e. successfully classified) will be NSH encapsulated
+ and forwarded to a related SFF, which knows how to traverse the RSP.
Rules are created using appropriate iptables command. If the ACE rule
is MAC address related both iptables and ip6tabeles rules re issued.
If ACE rule is IPv4 address related, only iptables rules are issued,
same for IPv6.
- ACL RULES
+ ACL RULES FOR
----------------------------------
MAC iptables, ip6tables
IPv4 iptables
IPv6 ip6tables
- Information regarding redirection to RSP(s) are stored as a simple RSP
- to SFF mapping (rsp_id -> sff); which is represented as
- a dictionary:
-
- {rsp_id: {'ip': <ip>,
- 'port': <port>,
- 'starting-index': <starting-index>,
- 'transport-type': <transport-type>,
- 'chains': {'chain_name': (<ipv>,)}, ...},
+ Information regarding already registered RSP(s) are stored in an
+ internal data-store, which is represented as a dictionary:
+
+ {rsp_id: {'name': <rsp_name>,
+ 'chains': {'chain_name': (<ipv>,),
+ ...
+ },
+ 'sff': {'ip': <ip>,
+ 'port': <port>,
+ 'starting-index': <starting-index>,
+ 'transport-type': <transport-type>
+ },
+ },
...
}
Where:
- - ip: SFF IP
- - port: SFF port
- - starting-index: index given to packet at first RSP hop
- - transport-type:
- - chains: dict of iptables chains (rules) related to the RSP
+ - name: RSP name
+ - chains: dict of iptables rules/chains related to the RSP
+ - SFF:
+ - ip: SFF IP
+ - port: SFF port
+ - starting-index: index given to packet at first RSP hop
+ - transport-type:
"""
# internal data-store
self.rsp_2_sff = {}
- self.nfq = NetfilterQueue()
+ # NFQ for classified packets, initialized by packet_collector()
+ self.nfq = None
# socket used to forward NSH encapsulated packets
self.fwd_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# identifiers of the currently processed RSP, set by process_acl()
- # these will be different for each processed ACL
+ # these will be different for each processed ACL/ACE
self.rsp_id = None
self.rsp_acl = None
self.rsp_ace = None
# IP version of the currently processed RSP, set by parse_ace()
# this attribute serves as run_iptables_cmd() 'ipv' argument
- self.rsp_ipv = (IPV4, IPv6)
+ self.rsp_ipv = None
+
+ # currently processed RSP mark, set by parse_ace()
+ # this attribute serves as the ip(6)tables mark argument
+ self.rsp_mark = None
def _get_current_ip_version(self, ip):
"""
return ip.version
- def _fetch_rsp_from_odl(self, rsp_name):
+ def _get_rsp_by_name(self, rsp_name):
+ """
+ Retrieve RSP data from the data-store based on its name
+
+ :param rsp_name: RSP name
+ :type rsp_name: str
+
+ :return tuple (rsp_id, rsp_data) or None
+
+ """
+ for rsp_id, rsp_data in self.rsp_2_sff.items():
+ if rsp_name == rsp_data['name']:
+ return rsp_id, rsp_data
+ else:
+ return None
+
+ def _fetch_rsp_first_hop_from_odl(self, rsp_name):
"""
Fetch RSPs' forwarding parameters (SFF locator) from ODL
{'name': rsp_name}
}
- logger.info('Requesting RSP "%s" from ODL', rsp_name)
- rsp_data = requests.post(url=url,
- data=json.dumps(data),
- auth=(USERNAME, PASSWORD),
- headers={'content-type': 'application/json'})
+ headers = {'content-type': 'application/json'}
+
+ try:
+ logger.info('Requesting SFF for RSP "%s" from ODL', rsp_name)
+ rsp_data = requests.post(url=url,
+ timeout=5,
+ headers=headers,
+ data=json.dumps(data),
+ auth=(USERNAME, PASSWORD))
+ except requests.exceptions.Timeout:
+ logger.exception('Failed to get RSP "%s" from ODL: timeout',
+ rsp_name)
+ return
if not rsp_data.content:
logger.warning('RSP "%s" not found in ODL', rsp_name)
- return None
+ return
rsp_json = rsp_data.json()
return rsp_json['output']['rendered-service-path-first-hop']
- def _process_packet(self, packet):
+ def _compose_packet_mark(self):
"""
- Main NFQ callback for each queued packet.
- Drop the packet if RSP is unknown, pass it for processing otherwise.
+ Compose packet mark as a combination of the RSP ID and IP version
- :param packet: packet to process
- :type packet: `:class:netfilterqueue.Packet`
+ :return int
"""
- try:
- # packet mark contains a RSP identifier
- rsp_id = packet.get_mark()
+ return int(str(self.rsp_id) + "%02d" % sum(self.rsp_ipv))
+
+ def _decompose_packet_mark(self, mark):
+ """
+ Decompose packet mark to the RSP ID and IP version
- logger.debug('NFQ received a %s, marked "%d"', packet, rsp_id)
+ :param mark: packet mark
+ :type mark: int
- if rsp_id in self.rsp_2_sff:
- self.forward_packet(packet, rsp_id)
- else:
- logger.warning('Dropping packet as it did\'t match any rule')
- packet.drop()
+ :return tuple
- except:
- logger.exception('NFQ failed to receive a packet')
+ """
+ mark = str(mark)
+
+ rsp_id = int(mark[:-2])
+ ipv = int(mark[-2:])
+
+ return rsp_id, ipv
- def forward_packet(self, packet, rsp_id):
+ def forward_packet(self, packet, rsp_id, ipv):
"""
Encapsulate given packet with NSH and forward it to SFF related with
currently matched RSP
:type packet: `:class:netfilterqueue.Packet`
:param rsp_id: RSP identifier
:type rsp_id: int
+ :param ipv: IP version
+ :type ipv: int
"""
- fwd_to = self.rsp_2_sff[rsp_id]
+ fwd_to = self.rsp_2_sff[rsp_id]['sff']
+ next_protocol = ipv_2_next_protocol[ipv]
- # TODO: get context headers from ODL, using default values for now
+ # NOTES:
+ # so far metadata are not supported -> just sending an empty ctx_header
+ # tunnel_id (0x0500) is hard-coded, will it be always the same?
ctx_header = CONTEXTHEADER(0, 0, 0, 0)
- # TODO: tunnel_id (0x0500) is hard-coded
vxlan_header = VXLANGPE(int('00000100', 2), 0, 0x894F, 0x0500, 64)
- base_header = BASEHEADER(0x1, int('01000000', 2), 0x6, 0x1, 0x1,
- rsp_id, fwd_to['starting-index'])
+ base_header = BASEHEADER(0x1, int('01000000', 2), 0x6,
+ 0x1, next_protocol, rsp_id,
+ fwd_to['starting-index'])
nsh_encapsulation = build_packet(vxlan_header, base_header, ctx_header)
nsh_packet = nsh_encapsulation + packet.get_payload()
self.fwd_socket.sendto(nsh_packet, (fwd_to['ip'], fwd_to['port']))
+ def process_packet(self, packet):
+ """
+ Main NFQ callback for each classified packet.
+ Drop the packet if RSP is unknown, pass it for processing otherwise.
+
+ :param packet: packet to process
+ :type packet: `:class:netfilterqueue.Packet`
+
+ """
+ try:
+ mark = packet.get_mark()
+ rsp_id, ipv = self._decompose_packet_mark(mark)
+
+ logger.debug('NFQ received a %s, marked "%d"', packet, mark)
+
+ if rsp_id in self.rsp_2_sff:
+ self.forward_packet(packet, rsp_id, ipv)
+ else:
+ logger.warning('Dropping packet as it did\'t match any rule')
+ packet.drop()
+
+ except:
+ logger.exception('NFQ failed to receive a packet')
+
+ def packet_collector(self):
+ """
+ Main NFQ related method. Configure the queue and wait for packets.
+
+ NOTE: NetfilterQueue.run() blocs!
+
+ """
+ try:
+ self.nfq = NetfilterQueue()
+
+ logger.info('Binding to NFQ queue number "%s"', NFQ_NUMBER)
+ self.nfq.bind(NFQ_NUMBER, self.process_packet)
+ except:
+ msg = ('Failed to bind to the NFQ queue number "%s". '
+ 'HINT: try to run command `sudo iptables -L` to check if '
+ 'the required queue is available.' % NFQ_NUMBER)
+
+ logger.exception(msg)
+ raise
+
+ try:
+ logger.info('Starting NFQ - waiting for packets ...')
+ self.nfq.run()
+ except:
+ logger.exception('Failed to start NFQ')
+ raise
+
+ def collect_packets(self):
+ """
+ Start a thread for classified packets collection
+ """
+ nfq_thread = threading.Thread(target=self.packet_collector)
+ nfq_thread.daemon = True
+ nfq_thread.start()
+
def parse_ace(self, ace_matches):
"""
Parse given Access List Entries (ACE) matches and put together an
if source_mac in ace_matches:
ace_rule_cmd.extend(['-m', 'mac', '--mac-source'])
ace_rule_cmd.append(ace_matches[source_mac])
+ self.rsp_ipv = (IPV4, IPv6)
- ace_rule_cmd.extend(['-j', 'MARK', '--set-mark', self.rsp_id])
+ self.rsp_mark = self._compose_packet_mark()
+ ace_rule_cmd.extend(['-j', 'MARK', '--set-mark', self.rsp_mark])
return ace_rule_cmd
def process_acl(self, acl_data):
"""
- Parse ACL data and create iptables rules accordingly
+ Parse ACL data and create/remove ip(6)tables rules accordingly.
+
+ To be able to create/remove an ip(6)tables rule/chain these attributes
+ must be set (i.e. not None):
+ self.rsp_chain, self.rsp_ipv + self.rsp_id for creating a rule/chain
:param acl_data: ACL
:type acl_data: dict
"""
for acl in acl_data['access-list']:
-
- self.rsp_acl = acl['acl-name']
+ self.rsp_acl = acl['acl-name'].upper()
for ace in acl['access-list-entries']:
+ if 'delete' in ace:
+ self.remove_acl_rsps()
+ return
+
rsp_name = (ace['actions']
['service-function-acl:rendered-service-path'])
- rsp = self._fetch_rsp_from_odl(rsp_name)
- if rsp is None:
+ sff_data = self._fetch_rsp_first_hop_from_odl(rsp_name)
+ if sff_data is None:
continue
- self.rsp_id = rsp['path-id']
- self.rsp_ace = ace['rule-name']
+ # NOTE: assuming that RSP IDs are unique
+ rsp_id = sff_data['path-id']
+ if rsp_id in self.rsp_2_sff:
+ logger.warning('RSP "%s" already exists', rsp_id)
+ continue
+
+ self.rsp_id = rsp_id
+ self.rsp_ace = ace['rule-name'].upper()
self.rsp_chain = '-'.join((self.rsp_acl,
self.rsp_ace,
'RSP',
- str(rsp['path-id'])))
+ str(rsp_id)))
- # `self.rsp_ipv` is set by this
+ # `self.rsp_ipv` and `self.rsp_mark` are set by this
ace_rule_cmd = self.parse_ace(ace['matches'])
- # TODO: test - this will very probably not work properly
- if 'delete' in ace:
- self.remove_rsp()
- return
-
- self.create_rsp(rsp)
+ self.create_rsp(rsp_name, sff_data)
run_iptables_cmd(ace_rule_cmd, self.rsp_ipv)
def register_rsp(self):
"""
Create iptables rules for the current ACL -> ACE -> RSP
- In other words: create an iptables chain for the given RSP, direct all
- incoming packets through this chain and send matched packets (matching
- is mark based) to the NetfilterQueue.
+ In other words: create an iptables chain for the current RSP, mark
+ traversing packets and redirect them to the NFQ.
+
+ Packet mark (which must be an integer) is a combination of the RSP ID
+ and the IP version for which an ip(6)tables rule/chain exists.
+ IP version is described by the last two mark digits, i.e. 04 -> IPv4,
+ 06 -> IPv6, 10 -> IPv4 and IPv6.
+
+ For example a '104' mark describes RSP "1" for which an IPv4 iptables
+ rule exists. Mark 5010 describes RSP "50" for which both an IPv4 and
+ an IPv6 ip(6)tables rules exists.
+
"""
logger.debug('Creating iptables rule for ACL "%s", ACE "%s", RSP "%s"',
self.rsp_acl, self.rsp_ace, self.rsp_id)
'-j', self.rsp_chain],
self.rsp_ipv)
- # append [-A] a redirection of matched packets to the NetfilterQueue
+ # append [-A] packet marking and redirection to the NFQ
run_iptables_cmd(['-A', self.rsp_chain,
- '-m', 'mark', '--mark', self.rsp_id,
- '-j', NFQ, '--queue-num', NFQ_NUMBER],
+ '-m', 'mark', '--mark', self.rsp_mark,
+ '-j', 'NFQUEUE', '--queue-num', NFQ_NUMBER],
self.rsp_ipv)
- def create_rsp(self, rsp):
+ def create_rsp(self, rsp_name, sff_data):
"""
Create iptables rules for the current RSP and add it to the data-store
- :param rsp: RSPs' SFF description
- :type rsp: dict
+ :param rsp_name: RSP name
+ :type rsp_name: str
+ :param sff_data: RSPs' SFF description
+ :type sff_data: dict
"""
self.register_rsp()
if self.rsp_id not in self.rsp_2_sff:
- rsp.pop('path-id')
- self.rsp_2_sff[self.rsp_id] = rsp
+ self.rsp_2_sff[self.rsp_id] = {'name': rsp_name}
+
+ if 'sff' not in self.rsp_2_sff[self.rsp_id]:
+ sff_data.pop('path-id')
+ self.rsp_2_sff[self.rsp_id]['sff'] = sff_data
if 'chains' not in self.rsp_2_sff[self.rsp_id]:
self.rsp_2_sff[self.rsp_id]['chains'] = {}
self.rsp_2_sff[self.rsp_id]['chains'][self.rsp_chain] = self.rsp_ipv
- def unregister_rsp(self, log=True):
+ def unregister_rsp(self):
"""
Remove iptables rules for the current RSP
-
- :param log: flag to log the message regarding removing RSP
- :type log: bool
-
"""
- if log:
- logger.debug('Removing iptables rule for ACL "%s", ACE "%s", RSP '
- '"%s"', self.rsp_acl, self.rsp_ace, self.rsp_id)
-
# delete [-D] the jump to the chain
run_iptables_cmd(['-D', 'PREROUTING',
'-j', self.rsp_chain],
run_iptables_cmd(['-X', self.rsp_chain],
self.rsp_ipv)
- def remove_rsp(self):
+ def remove_rsp(self, rsp_name):
"""
- Remove iptables rules for the current RSP and remove it from the
- data-store
+ Remove ip(6)tables rules/chains for a given RSP and remove it from the
+ data-store as well; state (return) if the removal was succesfull.
+
+ :param rsp_name: RSP name
+ :type rsp_name: str
+
+ :return bool
+
+ """
+ try:
+ rsp_id, rsp_data = self._get_rsp_by_name(rsp_name)
+ except TypeError:
+ return False
+
+ logger.debug('Removing iptables rules for RSP "%s"', rsp_id)
+
+ for chain_name, ipv in rsp_data['chains'].items():
+ self.rsp_chain = chain_name
+ self.rsp_ipv = ipv
+
+ self.unregister_rsp()
+
+ del self.rsp_2_sff[rsp_id]
+ return True
+
+ def remove_acl_rsps(self):
+ """
+ Remove ip(6)tables rules/chains related to the current ACL
"""
- self.unregister_rsp()
- del self.rsp_2_sff[self.rsp_id]
+ rsps_to_remove = []
+
+ for rsp_id in self.rsp_2_sff:
+ rsp = self.rsp_2_sff[rsp_id]
+ rsp_chains = rsp['chains']
+
+ chains_to_remove = [chain_name for chain_name in rsp_chains.keys()
+ if self.rsp_acl in chain_name]
+
+ if not chains_to_remove:
+ continue
+
+ logger.debug('Removing iptables rule for ACL "%s"', self.rsp_acl)
+
+ for chain_name in chains_to_remove:
+ self.rsp_chain = chain_name
+ self.rsp_ipv = rsp_chains.pop(chain_name)
+
+ self.unregister_rsp()
+
+ if not rsp_chains:
+ rsps_to_remove.append(rsp_id)
+
+ for rsp_id in rsps_to_remove:
+ del self.rsp_2_sff[rsp_id]
def remove_all_rsps(self):
"""
- Remove iptables rules for ALL registered RSPs and clear the data-store
+ Remove ip(6)tables rules for ALL registered RSPs and clear the
+ data-store
"""
if not self.rsp_2_sff:
return
- logger.debug('Removing iptables rule(s) for ALL RSPs')
+ logger.debug('Removing created iptables rule(s) for ALL RSPs')
for rsp_id in self.rsp_2_sff:
rsp = self.rsp_2_sff[rsp_id]
self.rsp_ipv = ipv
self.rsp_chain = chain
- self.unregister_rsp(log=False)
+ self.unregister_rsp()
self.rsp_2_sff = {}
- def packet_collector(self):
- """
- Main NFQ related method. Configure the queue and wait for packets.
-
- NOTE: NetfilterQueue.run() blocs!
-
+ def nfq_running(self):
"""
- try:
- logger.info('Binding to NFQ queue number "%s"', NFQ_NUMBER)
- self.nfq.bind(NFQ_NUMBER, self._process_packet)
- except:
- msg = ('Failed to bind to the NFQ queue number "%s". '
- 'HINT: try to run command `sudo iptables -L` to check if '
- 'the required queue is available.' % NFQ_NUMBER)
+ Check if the NFQ is running
- logger.exception(msg)
- raise
-
- try:
- logger.info('Starting NFQ - waiting for packets ...')
- self.nfq.run()
- except:
- logger.exception('Failed to start NFQ')
- raise
+ :return bool
- def collect_packets(self):
- """
- Start a thread for NFQ packets collection
"""
- nfq_thread = threading.Thread(target=self.packet_collector)
- nfq_thread.daemon = True
- nfq_thread.start()
+ if self.nfq is None:
+ return False
+ else:
+ return True
def start_classifier():
"""
nfq_classifier = NfqClassifier()
- nfq_classifier.remove_all_rsps()
- nfq_classifier.nfq.unbind()
+ if nfq_classifier.nfq_running():
+ nfq_classifier.remove_all_rsps()