blob: cddae75a0679fb4df5f7058d65261f7fd63d9cf5 [file] [log] [blame]
#!/usr/bin/env python3
"""systemctl: subset of systemctl used for image construction
Mask/preset systemd units
"""
import argparse
import fnmatch
import os
import re
import sys
from collections import namedtuple
from itertools import chain
from pathlib import Path
version = 1.0
ROOT = Path("/")
SYSCONFDIR = Path("etc")
BASE_LIBDIR = Path("lib")
LIBDIR = Path("usr", "lib")
locations = list()
class SystemdFile():
"""Class representing a single systemd configuration file"""
_clearable_keys = ['WantedBy']
def __init__(self, root, path, instance_unit_name):
self.sections = dict()
self._parse(root, path)
dirname = os.path.basename(path.name) + ".d"
for location in locations:
files = (root / location / "system" / dirname).glob("*.conf")
if instance_unit_name:
inst_dirname = instance_unit_name + ".d"
files = chain(files, (root / location / "system" / inst_dirname).glob("*.conf"))
for path2 in sorted(files):
self._parse(root, path2)
def _parse(self, root, path):
"""Parse a systemd syntax configuration file
Args:
path: A pathlib.Path object pointing to the file
"""
skip_re = re.compile(r"^\s*([#;]|$)")
section_re = re.compile(r"^\s*\[(?P<section>.*)\]")
kv_re = re.compile(r"^\s*(?P<key>[^\s]+)\s*=\s*(?P<value>.*)")
section = None
if path.is_symlink():
try:
path.resolve()
except FileNotFoundError:
# broken symlink, try relative to root
path = root / Path(os.readlink(str(path))).relative_to(ROOT)
with path.open() as f:
for line in f:
if skip_re.match(line):
continue
line = line.strip()
m = section_re.match(line)
if m:
if m.group('section') not in self.sections:
section = dict()
self.sections[m.group('section')] = section
else:
section = self.sections[m.group('section')]
continue
while line.endswith("\\"):
line += f.readline().rstrip("\n")
m = kv_re.match(line)
k = m.group('key')
v = m.group('value')
if k not in section:
section[k] = list()
# If we come across a "key=" line for a "clearable key", then
# forget all preceding assignments. This works because we are
# processing files in correct parse order.
if k in self._clearable_keys and not v:
del section[k]
continue
section[k].extend(v.split())
def get(self, section, prop):
"""Get a property from section
Args:
section: Section to retrieve property from
prop: Property to retrieve
Returns:
List representing all properties of type prop in section.
Raises:
KeyError: if ``section`` or ``prop`` not found
"""
return self.sections[section][prop]
class Presets():
"""Class representing all systemd presets"""
def __init__(self, scope, root):
self.directives = list()
self._collect_presets(scope, root)
def _parse_presets(self, presets):
"""Parse presets out of a set of preset files"""
skip_re = re.compile(r"^\s*([#;]|$)")
directive_re = re.compile(r"^\s*(?P<action>enable|disable)\s+(?P<unit_name>(.+))")
Directive = namedtuple("Directive", "action unit_name")
for preset in presets:
with preset.open() as f:
for line in f:
m = directive_re.match(line)
if m:
directive = Directive(action=m.group('action'),
unit_name=m.group('unit_name'))
self.directives.append(directive)
elif skip_re.match(line):
pass
else:
sys.exit("Unparsed preset line in {}".format(preset))
def _collect_presets(self, scope, root):
"""Collect list of preset files"""
presets = dict()
for location in locations:
paths = (root / location / scope).glob("*.preset")
for path in paths:
# earlier names override later ones
if path.name not in presets:
presets[path.name] = path
self._parse_presets([v for k, v in sorted(presets.items())])
def state(self, unit_name):
"""Return state of preset for unit_name
Args:
presets: set of presets
unit_name: name of the unit
Returns:
None: no matching preset
`enable`: unit_name is enabled
`disable`: unit_name is disabled
"""
for directive in self.directives:
if fnmatch.fnmatch(unit_name, directive.unit_name):
return directive.action
return None
def add_link(path, target):
try:
path.parent.mkdir(parents=True)
except FileExistsError:
pass
if not path.is_symlink():
print("ln -s {} {}".format(target, path))
path.symlink_to(target)
class SystemdUnitNotFoundError(Exception):
def __init__(self, path, unit):
self.path = path
self.unit = unit
class SystemdUnit():
def __init__(self, root, unit):
self.root = root
self.unit = unit
self.config = None
def _path_for_unit(self, unit):
for location in locations:
path = self.root / location / "system" / unit
if path.exists() or path.is_symlink():
return path
raise SystemdUnitNotFoundError(self.root, unit)
def _process_deps(self, config, service, location, prop, dirstem):
systemdir = self.root / SYSCONFDIR / "systemd" / "system"
target = ROOT / location.relative_to(self.root)
try:
for dependent in config.get('Install', prop):
wants = systemdir / "{}.{}".format(dependent, dirstem) / service
add_link(wants, target)
except KeyError:
pass
def enable(self, caller_unit=None):
# if we're enabling an instance, first extract the actual instance
# then figure out what the template unit is
template = re.match(r"[^@]+@(?P<instance>[^\.]*)\.", self.unit)
instance_unit_name = None
if template:
instance = template.group('instance')
if instance != "":
instance_unit_name = self.unit
unit = re.sub(r"@[^\.]*\.", "@.", self.unit, 1)
else:
instance = None
unit = self.unit
path = self._path_for_unit(unit)
if path.is_symlink():
# ignore aliases
return
config = SystemdFile(self.root, path, instance_unit_name)
if instance == "":
try:
default_instance = config.get('Install', 'DefaultInstance')[0]
except KeyError:
# no default instance, so nothing to enable
return
service = self.unit.replace("@.",
"@{}.".format(default_instance))
else:
service = self.unit
self._process_deps(config, service, path, 'WantedBy', 'wants')
self._process_deps(config, service, path, 'RequiredBy', 'requires')
try:
for also in config.get('Install', 'Also'):
try:
if caller_unit != also:
SystemdUnit(self.root, also).enable(unit)
except SystemdUnitNotFoundError as e:
sys.exit("Error: Systemctl also enable issue with %s (%s)" % (service, e.unit))
except KeyError:
pass
systemdir = self.root / SYSCONFDIR / "systemd" / "system"
target = ROOT / path.relative_to(self.root)
try:
for dest in config.get('Install', 'Alias'):
alias = systemdir / dest
add_link(alias, target)
except KeyError:
pass
def mask(self):
systemdir = self.root / SYSCONFDIR / "systemd" / "system"
add_link(systemdir / self.unit, "/dev/null")
def collect_services(root):
"""Collect list of service files"""
services = set()
for location in locations:
paths = (root / location / "system").glob("*")
for path in paths:
if path.is_dir():
continue
services.add(path.name)
return services
def preset_all(root):
presets = Presets('system-preset', root)
services = collect_services(root)
for service in services:
state = presets.state(service)
if state == "enable" or state is None:
try:
SystemdUnit(root, service).enable()
except SystemdUnitNotFoundError:
sys.exit("Error: Systemctl preset_all issue in %s" % service)
# If we populate the systemd links we also create /etc/machine-id, which
# allows systemd to boot with the filesystem read-only before generating
# a real value and then committing it back.
#
# For the stateless configuration, where /etc is generated at runtime
# (for example on a tmpfs), this script shouldn't run at all and we
# allow systemd to completely populate /etc.
(root / SYSCONFDIR / "machine-id").touch()
def main():
if sys.version_info < (3, 4, 0):
sys.exit("Python 3.4 or greater is required")
parser = argparse.ArgumentParser()
parser.add_argument('command', nargs='?', choices=['enable', 'mask',
'preset-all'])
parser.add_argument('service', nargs=argparse.REMAINDER)
parser.add_argument('--root')
parser.add_argument('--preset-mode',
choices=['full', 'enable-only', 'disable-only'],
default='full')
args = parser.parse_args()
root = Path(args.root) if args.root else ROOT
locations.append(SYSCONFDIR / "systemd")
# Handle the usrmerge case by ignoring /lib when it's a symlink
if not (root / BASE_LIBDIR).is_symlink():
locations.append(BASE_LIBDIR / "systemd")
locations.append(LIBDIR / "systemd")
command = args.command
if not command:
parser.print_help()
return 0
if command == "mask":
for service in args.service:
try:
SystemdUnit(root, service).mask()
except SystemdUnitNotFoundError as e:
sys.exit("Error: Systemctl main mask issue in %s (%s)" % (service, e.unit))
elif command == "enable":
for service in args.service:
try:
SystemdUnit(root, service).enable()
except SystemdUnitNotFoundError as e:
sys.exit("Error: Systemctl main enable issue in %s (%s)" % (service, e.unit))
elif command == "preset-all":
if len(args.service) != 0:
sys.exit("Too many arguments.")
if args.preset_mode != "enable-only":
sys.exit("Only enable-only is supported as preset-mode.")
preset_all(root)
else:
raise RuntimeError()
if __name__ == '__main__':
main()