blob: 9444d13535024fac7abdfc244dba88945cd60018 [file] [log] [blame]
# Contributors Listed Below - COPYRIGHT 2016
# [+] International Business Machines Corp.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
import xml.etree.ElementTree as ET
import dbus
class IntrospectionNodeParser:
def __init__(self, data, tag_match=bool, intf_match=bool):
self.data = data
self.cache = {}
self.tag_match = tag_match
self.intf_match = intf_match
def parse_args(self):
return [x.attrib for x in self.data.findall('arg')]
def parse_children(self):
return [x.attrib['name'] for x in self.data.findall('node')]
def parse_method_or_signal(self):
name = self.data.attrib['name']
return name, self.parse_args()
def parse_interface(self):
iface = {}
iface['method'] = {}
iface['signal'] = {}
for node in self.data:
if node.tag not in ['method', 'signal']:
continue
if not self.tag_match(node.tag):
continue
p = IntrospectionNodeParser(
node, self.tag_match, self.intf_match)
n, element = p.parse_method_or_signal()
iface[node.tag][n] = element
return iface
def parse_node(self):
if self.cache:
return self.cache
self.cache['interfaces'] = {}
self.cache['children'] = []
for node in self.data:
if node.tag == 'interface':
p = IntrospectionNodeParser(
node, self.tag_match, self.intf_match)
name = p.data.attrib['name']
if not self.intf_match(name):
continue
self.cache['interfaces'][name] = p.parse_interface()
elif node.tag == 'node':
self.cache['children'] = self.parse_children()
return self.cache
def get_interfaces(self):
return self.parse_node()['interfaces']
def get_children(self):
return self.parse_node()['children']
def recursive_binding(self):
return any('/' in s for s in self.get_children())
class IntrospectionParser:
def __init__(self, name, bus, tag_match=bool, intf_match=bool):
self.name = name
self.bus = bus
self.tag_match = tag_match
self.intf_match = intf_match
def _introspect(self, path):
try:
obj = self.bus.get_object(self.name, path, introspect=False)
iface = dbus.Interface(obj, dbus.INTROSPECTABLE_IFACE)
data = iface.Introspect()
except dbus.DBusException:
return None
return IntrospectionNodeParser(
ET.fromstring(data),
self.tag_match,
self.intf_match)
def _discover_flat(self, path, parser):
items = {}
interfaces = parser.get_interfaces().keys()
if interfaces:
items[path] = {}
items[path]['interfaces'] = interfaces
return items
def introspect(self, path='/', parser=None):
items = {}
if not parser:
parser = self._introspect(path)
if not parser:
return {}
items.update(self._discover_flat(path, parser))
if path != '/':
path += '/'
if parser.recursive_binding():
callback = self._discover_flat
else:
callback = self.introspect
for k in parser.get_children():
parser = self._introspect(path + k)
if not parser:
continue
items.update(callback(path + k, parser))
return items
def find_dbus_interfaces(conn, service, path, match):
class _FindInterfaces(object):
def __init__(self):
self.results = {}
@staticmethod
def _introspect(path):
obj = conn.get_object(service, path, introspect=False)
iface = dbus.Interface(obj, dbus.INTROSPECTABLE_IFACE)
return iface.Introspect()
@staticmethod
def _get_managed_objects(om):
obj = conn.get_object(service, om, introspect=False)
iface = dbus.Interface(
obj, dbus.BUS_DAEMON_IFACE + '.ObjectManager')
return iface.GetManagedObjects()
@staticmethod
def _to_path(elements):
return '/' + '/'.join(elements)
@staticmethod
def _to_path_elements(path):
return filter(bool, path.split('/'))
def __call__(self, path):
self.results = {}
self._find_interfaces(path)
return self.results
@staticmethod
def _match(iface):
return iface == dbus.BUS_DAEMON_IFACE + '.ObjectManager' \
or match(iface)
def _find_interfaces(self, path):
path_elements = self._to_path_elements(path)
path = self._to_path(path_elements)
root = ET.fromstring(self._introspect(path))
ifaces = filter(
self._match,
[x.attrib.get('name') for x in root.findall('interface')])
self.results[path] = ifaces
if dbus.BUS_DAEMON_IFACE + '.ObjectManager' in ifaces:
objs = self._get_managed_objects(path)
for k, v in objs.iteritems():
self.results[k] = v
else:
children = filter(
bool,
[x.attrib.get('name') for x in root.findall('node')])
children = [
self._to_path(
path_elements + self._to_path_elements(x))
for x in sorted(children)]
for child in children:
if child not in self.results:
self._find_interfaces(child)
return _FindInterfaces()(path)