server: prepare for non-blocking discovery
Add deferred signal handling capability, in preparation for
receiving signals during service discovery.
Add a "busy" exception for method calls during service
discovery.
Change-Id: Id52efbb4f57837c0f449025cfdcb17965c757220
Signed-off-by: Brad Bishop <bradleyb@fuzziesquirrel.com>
diff --git a/obmc/mapper/server.py b/obmc/mapper/server.py
index 0120de5..a9afee6 100644
--- a/obmc/mapper/server.py
+++ b/obmc/mapper/server.py
@@ -27,6 +27,14 @@
import obmc.dbuslib.enums
+class MapperBusyException(dbus.exceptions.DBusException):
+ _dbus_error_name = 'org.freedesktop.DBus.Error.ObjectPathInUse'
+
+ def __init__(self):
+ super(MapperBusyException, self).__init__(
+ 'busy processing bus traffic')
+
+
class MapperNotFoundException(dbus.exceptions.DBusException):
_dbus_error_name = obmc.mapper.MAPPER_NOT_FOUND
@@ -219,6 +227,7 @@
self.manager = Manager(bus, obmc.dbuslib.bindings.OBJ_PREFIX)
self.unique = bus.get_unique_name()
self.bus_map = {}
+ self.defer_signals = {}
self.bus_map[self.unique] = obmc.mapper.MAPPER_NAME
# add my object mananger instance
@@ -254,6 +263,19 @@
self.service = dbus.service.BusName(
obmc.mapper.MAPPER_NAME, self.bus)
+ def discovery_callback(self, owner, items):
+ if owner in self.defer_signals:
+ self.add_items(owner, items)
+ pending = self.defer_signals[owner]
+ del self.defer_signals[owner]
+
+ for x in pending:
+ x()
+
+ def discovery_error(self, owner, path, e):
+ if owner in self.defer_signals:
+ raise e
+
def cache_get(self, path):
cache_entry = self.cache.get(path, {})
if cache_entry is None:
@@ -270,28 +292,47 @@
new = list(set(old).union([dbus.BUS_DAEMON_IFACE + '.ObjectManager']))
self.update_interfaces(path, owner, old, new)
+ def defer_signal(self, owner, callback):
+ self.defer_signals.setdefault(owner, []).append(callback)
+
def interfaces_added_handler(self, path, iprops, **kw):
path = str(path)
owner = str(kw['sender'])
interfaces = self.get_signal_interfaces(owner, iprops.iterkeys())
- if interfaces:
+ if not interfaces:
+ return
+
+ if owner not in self.defer_signals:
self.add_new_objmgr(str(kw['sender_path']), owner)
cache_entry = self.cache_get(path)
old = self.interfaces_get(cache_entry, owner)
new = list(set(interfaces).union(old))
new = {x: iprops[x] for x in new}
self.update_interfaces(path, owner, old, new)
+ else:
+ self.defer_signal(
+ owner,
+ lambda: self.interfaces_added_handler(
+ path, iprops, **kw))
def interfaces_removed_handler(self, path, interfaces, **kw):
path = str(path)
owner = str(kw['sender'])
interfaces = self.get_signal_interfaces(owner, interfaces)
- if interfaces:
+ if not interfaces:
+ return
+
+ if owner not in self.defer_signals:
self.add_new_objmgr(str(kw['sender_path']), owner)
cache_entry = self.cache_get(path)
old = self.interfaces_get(cache_entry, owner)
new = list(set(old).difference(interfaces))
self.update_interfaces(path, owner, old, new)
+ else:
+ self.defer_signal(
+ owner,
+ lambda: self.interfaces_removed_handler(
+ path, interfaces, **kw))
def properties_changed_handler(self, interface, new, old, **kw):
owner = str(kw['sender'])
@@ -303,12 +344,18 @@
if associations is None:
return
- associations = [
- (str(x), str(y), str(z)) for x, y, z in associations]
- self.update_associations(
- path, owner,
- self.index_get_associations(path, [owner]),
- associations)
+ if owner not in self.defer_signals:
+ associations = [
+ (str(x), str(y), str(z)) for x, y, z in associations]
+ self.update_associations(
+ path, owner,
+ self.index_get_associations(path, [owner]),
+ associations)
+ else:
+ self.defer_signal(
+ owner,
+ lambda: self.properties_changed_handler(
+ interface, new, old, **kw))
def process_new_owner(self, owned_name, owner):
# unique name
@@ -337,6 +384,11 @@
if valid and new:
self.process_new_owner(owned_name, new)
if valid and old:
+ # discard any unhandled signals
+ # or in progress discovery
+ if old in self.defer_signals:
+ del self.defer_signals[old]
+
self.process_old_owner(owned_name, old)
def update_interfaces(self, path, owner, old, new):
@@ -444,6 +496,9 @@
@dbus.service.method(obmc.mapper.MAPPER_IFACE, 's', 'a{sas}')
def GetObject(self, path):
+ if len(self.defer_signals):
+ raise MapperBusyException()
+
o = self.cache_get(path)
if not o:
raise MapperNotFoundException(path)
@@ -451,6 +506,9 @@
@dbus.service.method(obmc.mapper.MAPPER_IFACE, 'si', 'as')
def GetSubTreePaths(self, path, depth):
+ if len(self.defer_signals):
+ raise MapperBusyException()
+
try:
return self.cache.iterkeys(path, depth)
except KeyError:
@@ -458,6 +516,9 @@
@dbus.service.method(obmc.mapper.MAPPER_IFACE, 'si', 'a{sa{sas}}')
def GetSubTree(self, path, depth):
+ if len(self.defer_signals):
+ raise MapperBusyException()
+
try:
return {x: y for x, y in self.cache.dataitems(path, depth)}
except KeyError:
@@ -601,6 +662,9 @@
@dbus.service.method(obmc.mapper.MAPPER_IFACE, 's', 'a{sa{sas}}')
def GetAncestors(self, path):
+ if len(self.defer_signals):
+ raise MapperBusyException()
+
elements = filter(bool, path.split('/'))
paths = []
objs = {}