blob: 6f17b6acc7822e743105bb96f555dbf27acc5eab [file] [log] [blame]
"""
BitBake 'msg' implementation
Message handling infrastructure for bitbake
"""
# Copyright (C) 2006 Richard Purdie
#
# SPDX-License-Identifier: GPL-2.0-only
#
import sys
import copy
import logging
import logging.config
import os
from itertools import groupby
import bb
import bb.event
class BBLogFormatter(logging.Formatter):
"""Formatter which ensures that our 'plain' messages (logging.INFO + 1) are used as is"""
DEBUG3 = logging.DEBUG - 2
DEBUG2 = logging.DEBUG - 1
DEBUG = logging.DEBUG
VERBOSE = logging.INFO - 1
NOTE = logging.INFO
PLAIN = logging.INFO + 1
VERBNOTE = logging.INFO + 2
ERROR = logging.ERROR
WARNING = logging.WARNING
CRITICAL = logging.CRITICAL
levelnames = {
DEBUG3 : 'DEBUG',
DEBUG2 : 'DEBUG',
DEBUG : 'DEBUG',
VERBOSE: 'NOTE',
NOTE : 'NOTE',
PLAIN : '',
VERBNOTE: 'NOTE',
WARNING : 'WARNING',
ERROR : 'ERROR',
CRITICAL: 'ERROR',
}
color_enabled = False
BASECOLOR, BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = list(range(29,38))
COLORS = {
DEBUG3 : CYAN,
DEBUG2 : CYAN,
DEBUG : CYAN,
VERBOSE : BASECOLOR,
NOTE : BASECOLOR,
PLAIN : BASECOLOR,
VERBNOTE: BASECOLOR,
WARNING : YELLOW,
ERROR : RED,
CRITICAL: RED,
}
BLD = '\033[1;%dm'
STD = '\033[%dm'
RST = '\033[0m'
def getLevelName(self, levelno):
try:
return self.levelnames[levelno]
except KeyError:
self.levelnames[levelno] = value = 'Level %d' % levelno
return value
def format(self, record):
record.levelname = self.getLevelName(record.levelno)
if record.levelno == self.PLAIN:
msg = record.getMessage()
else:
if self.color_enabled:
record = self.colorize(record)
msg = logging.Formatter.format(self, record)
if hasattr(record, 'bb_exc_formatted'):
msg += '\n' + ''.join(record.bb_exc_formatted)
elif hasattr(record, 'bb_exc_info'):
etype, value, tb = record.bb_exc_info
formatted = bb.exceptions.format_exception(etype, value, tb, limit=5)
msg += '\n' + ''.join(formatted)
return msg
def colorize(self, record):
color = self.COLORS[record.levelno]
if self.color_enabled and color is not None:
record = copy.copy(record)
record.levelname = "".join([self.BLD % color, record.levelname, self.RST])
record.msg = "".join([self.STD % color, record.msg, self.RST])
return record
def enable_color(self):
self.color_enabled = True
def __repr__(self):
return "%s fmt='%s' color=%s" % (self.__class__.__name__, self._fmt, "True" if self.color_enabled else "False")
class BBLogFilter(object):
def __init__(self, handler, level, debug_domains):
self.stdlevel = level
self.debug_domains = debug_domains
loglevel = level
for domain in debug_domains:
if debug_domains[domain] < loglevel:
loglevel = debug_domains[domain]
handler.setLevel(loglevel)
handler.addFilter(self)
def filter(self, record):
if record.levelno >= self.stdlevel:
return True
if record.name in self.debug_domains and record.levelno >= self.debug_domains[record.name]:
return True
return False
class LogFilterGEQLevel(logging.Filter):
def __init__(self, level):
self.strlevel = str(level)
self.level = stringToLevel(level)
def __repr__(self):
return "%s level >= %s (%d)" % (self.__class__.__name__, self.strlevel, self.level)
def filter(self, record):
return (record.levelno >= self.level)
class LogFilterLTLevel(logging.Filter):
def __init__(self, level):
self.strlevel = str(level)
self.level = stringToLevel(level)
def __repr__(self):
return "%s level < %s (%d)" % (self.__class__.__name__, self.strlevel, self.level)
def filter(self, record):
return (record.levelno < self.level)
# Message control functions
#
loggerDefaultLogLevel = BBLogFormatter.NOTE
loggerDefaultDomains = {}
def init_msgconfig(verbose, debug, debug_domains=None):
"""
Set default verbosity and debug levels config the logger
"""
if debug:
bb.msg.loggerDefaultLogLevel = BBLogFormatter.DEBUG - debug + 1
elif verbose:
bb.msg.loggerDefaultLogLevel = BBLogFormatter.VERBOSE
else:
bb.msg.loggerDefaultLogLevel = BBLogFormatter.NOTE
bb.msg.loggerDefaultDomains = {}
if debug_domains:
for (domainarg, iterator) in groupby(debug_domains):
dlevel = len(tuple(iterator))
bb.msg.loggerDefaultDomains["BitBake.%s" % domainarg] = logging.DEBUG - dlevel + 1
def constructLogOptions():
return loggerDefaultLogLevel, loggerDefaultDomains
def addDefaultlogFilter(handler, cls = BBLogFilter, forcelevel=None):
level, debug_domains = constructLogOptions()
if forcelevel is not None:
level = forcelevel
cls(handler, level, debug_domains)
def stringToLevel(level):
try:
return int(level)
except ValueError:
pass
try:
return getattr(logging, level)
except AttributeError:
pass
return getattr(BBLogFormatter, level)
#
# Message handling functions
#
def fatal(msgdomain, msg):
if msgdomain:
logger = logging.getLogger("BitBake.%s" % msgdomain)
else:
logger = logging.getLogger("BitBake")
logger.critical(msg)
sys.exit(1)
def logger_create(name, output=sys.stderr, level=logging.INFO, preserve_handlers=False, color='auto'):
"""Standalone logger creation function"""
logger = logging.getLogger(name)
console = logging.StreamHandler(output)
format = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
if color == 'always' or (color == 'auto' and output.isatty()):
format.enable_color()
console.setFormatter(format)
if preserve_handlers:
logger.addHandler(console)
else:
logger.handlers = [console]
logger.setLevel(level)
return logger
def has_console_handler(logger):
for handler in logger.handlers:
if isinstance(handler, logging.StreamHandler):
if handler.stream in [sys.stderr, sys.stdout]:
return True
return False
def mergeLoggingConfig(logconfig, userconfig):
logconfig = copy.deepcopy(logconfig)
userconfig = copy.deepcopy(userconfig)
# Merge config with the default config
if userconfig.get('version') != logconfig['version']:
raise BaseException("Bad user configuration version. Expected %r, got %r" % (logconfig['version'], userconfig.get('version')))
# Set some defaults to make merging easier
userconfig.setdefault("loggers", {})
# If a handler, formatter, or filter is defined in the user
# config, it will replace an existing one in the default config
for k in ("handlers", "formatters", "filters"):
logconfig.setdefault(k, {}).update(userconfig.get(k, {}))
seen_loggers = set()
for name, l in logconfig["loggers"].items():
# If the merge option is set, merge the handlers and
# filters. Otherwise, if it is False, this logger won't get
# add to the set of seen loggers and will replace the
# existing one
if l.get('bitbake_merge', True):
ulogger = userconfig["loggers"].setdefault(name, {})
ulogger.setdefault("handlers", [])
ulogger.setdefault("filters", [])
# Merge lists
l.setdefault("handlers", []).extend(ulogger["handlers"])
l.setdefault("filters", []).extend(ulogger["filters"])
# Replace other properties if present
if "level" in ulogger:
l["level"] = ulogger["level"]
if "propagate" in ulogger:
l["propagate"] = ulogger["propagate"]
seen_loggers.add(name)
# Add all loggers present in the user config, but not any that
# have already been processed
for name in set(userconfig["loggers"].keys()) - seen_loggers:
logconfig["loggers"][name] = userconfig["loggers"][name]
return logconfig
def setLoggingConfig(defaultconfig, userconfigfile=None):
logconfig = copy.deepcopy(defaultconfig)
if userconfigfile:
with open(os.path.normpath(userconfigfile), 'r') as f:
if userconfigfile.endswith('.yml') or userconfigfile.endswith('.yaml'):
import yaml
userconfig = yaml.load(f)
elif userconfigfile.endswith('.json') or userconfigfile.endswith('.cfg'):
import json
userconfig = json.load(f)
else:
raise BaseException("Unrecognized file format: %s" % userconfigfile)
if userconfig.get('bitbake_merge', True):
logconfig = mergeLoggingConfig(logconfig, userconfig)
else:
# Replace the entire default config
logconfig = userconfig
# Convert all level parameters to integers in case users want to use the
# bitbake defined level names
for h in logconfig["handlers"].values():
if "level" in h:
h["level"] = bb.msg.stringToLevel(h["level"])
for l in logconfig["loggers"].values():
if "level" in l:
l["level"] = bb.msg.stringToLevel(l["level"])
conf = logging.config.dictConfigClass(logconfig)
conf.configure()
# The user may have specified logging domains they want at a higher debug
# level than the standard.
for name, l in logconfig["loggers"].items():
if not name.startswith("BitBake."):
continue
if not "level" in l:
continue
curlevel = bb.msg.loggerDefaultDomains.get(name)
# Note: level parameter should already be a int because of conversion
# above
newlevel = int(l["level"])
if curlevel is None or newlevel < curlevel:
bb.msg.loggerDefaultDomains[name] = newlevel
# TODO: I don't think that setting the global log level should be necessary
#if newlevel < bb.msg.loggerDefaultLogLevel:
# bb.msg.loggerDefaultLogLevel = newlevel
return conf