Skip to content
Snippets Groups Projects
Commit 631419e7 authored by Manuela Kuhn's avatar Manuela Kuhn
Browse files

Added logutils module

parent 77730ded
No related merge requests found
#
# Copyright (C) 2010-2013 Vinay Sajip. See LICENSE.txt for details.
#
"""
The logutils package provides a set of handlers for the Python standard
library's logging package.
Some of these handlers are out-of-scope for the standard library, and
so they are packaged here. Others are updated versions which have
appeared in recent Python releases, but are usable with older versions
of Python, and so are packaged here.
"""
import logging
from string import Template
__version__ = '0.3.3'
class NullHandler(logging.Handler):
"""
This handler does nothing. It's intended to be used to avoid the
"No handlers could be found for logger XXX" one-off warning. This is
important for library code, which may contain code to log events. If a user
of the library does not configure logging, the one-off warning might be
produced; to avoid this, the library developer simply needs to instantiate
a NullHandler and add it to the top-level logger of the library module or
package.
"""
def handle(self, record):
"""
Handle a record. Does nothing in this class, but in other
handlers it typically filters and then emits the record in a
thread-safe way.
"""
pass
def emit(self, record):
"""
Emit a record. This does nothing and shouldn't be called during normal
processing, unless you redefine :meth:`~logutils.NullHandler.handle`.
"""
pass
def createLock(self):
"""
Since this handler does nothing, it has no underlying I/O to protect
against multi-threaded access, so this method returns `None`.
"""
self.lock = None
class PercentStyle(object):
default_format = '%(message)s'
asctime_format = '%(asctime)s'
def __init__(self, fmt):
self._fmt = fmt or self.default_format
def usesTime(self):
return self._fmt.find(self.asctime_format) >= 0
def format(self, record):
return self._fmt % record.__dict__
class StrFormatStyle(PercentStyle):
default_format = '{message}'
asctime_format = '{asctime}'
def format(self, record):
return self._fmt.format(**record.__dict__)
class StringTemplateStyle(PercentStyle):
default_format = '${message}'
asctime_format = '${asctime}'
def __init__(self, fmt):
self._fmt = fmt or self.default_format
self._tpl = Template(self._fmt)
def usesTime(self):
fmt = self._fmt
return fmt.find('$asctime') >= 0 or fmt.find(self.asctime_format) >= 0
def format(self, record):
return self._tpl.substitute(**record.__dict__)
_STYLES = {
'%': PercentStyle,
'{': StrFormatStyle,
'$': StringTemplateStyle
}
class Formatter(logging.Formatter):
"""
Subclasses Formatter in Pythons earlier than 3.2 in order to give
3.2 Formatter behaviour with respect to allowing %-, {} or $-
formatting.
"""
def __init__(self, fmt=None, datefmt=None, style='%'):
"""
Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a
default as described above. Allow for specialized date formatting with
the optional datefmt argument (if omitted, you get the ISO8601 format).
Use a style parameter of '%', '{' or '$' to specify that you want to
use one of %-formatting, :meth:`str.format` (``{}``) formatting or
:class:`string.Template` formatting in your format string.
"""
if style not in _STYLES:
raise ValueError('Style must be one of: %s' % ','.join(
_STYLES.keys()))
self._style = _STYLES[style](fmt)
self._fmt = self._style._fmt
self.datefmt = datefmt
def usesTime(self):
"""
Check if the format uses the creation time of the record.
"""
return self._style.usesTime()
def formatMessage(self, record):
return self._style.format(record)
def format(self, record):
"""
Format the specified record as text.
The record's attribute dictionary is used as the operand to a
string formatting operation which yields the returned string.
Before formatting the dictionary, a couple of preparatory steps
are carried out. The message attribute of the record is computed
using LogRecord.getMessage(). If the formatting string uses the
time (as determined by a call to usesTime(), formatTime() is
called to format the event time. If there is exception information,
it is formatted using formatException() and appended to the message.
"""
record.message = record.getMessage()
if self.usesTime():
record.asctime = self.formatTime(record, self.datefmt)
s = self.formatMessage(record)
if record.exc_info:
# Cache the traceback text to avoid converting it multiple times
# (it's constant anyway)
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
if record.exc_text:
if s[-1:] != "\n":
s = s + "\n"
s = s + record.exc_text
return s
class BraceMessage(object):
def __init__(self, fmt, *args, **kwargs):
self.fmt = fmt
self.args = args
self.kwargs = kwargs
self.str = None
def __str__(self):
if self.str is None:
self.str = self.fmt.format(*self.args, **self.kwargs)
return self.str
class DollarMessage(object):
def __init__(self, fmt, **kwargs):
self.fmt = fmt
self.kwargs = kwargs
self.str = None
def __str__(self):
if self.str is None:
self.str = Template(self.fmt).substitute(**self.kwargs)
return self.str
def hasHandlers(logger):
"""
See if a logger has any handlers.
"""
rv = False
while logger:
if logger.handlers:
rv = True
break
elif not logger.propagate:
break
else:
logger = logger.parent
return rv
#
# Copyright (C) 2010-2013 Vinay Sajip. See LICENSE.txt for details.
#
import logging
import logutils
class LoggerAdapter(object):
"""
An adapter for loggers which makes it easier to specify contextual
information in logging output.
"""
def __init__(self, logger, extra):
"""
Initialize the adapter with a logger and a dict-like object which
provides contextual information. This constructor signature allows
easy stacking of LoggerAdapters, if so desired.
You can effectively pass keyword arguments as shown in the
following example:
adapter = LoggerAdapter(someLogger, dict(p1=v1, p2="v2"))
"""
self.logger = logger
self.extra = extra
def process(self, msg, kwargs):
"""
Process the logging message and keyword arguments passed in to
a logging call to insert contextual information. You can either
manipulate the message itself, the keyword args or both. Return
the message and kwargs modified (or not) to suit your needs.
Normally, you'll only need to override this one method in a
LoggerAdapter subclass for your specific needs.
"""
kwargs["extra"] = self.extra
return msg, kwargs
#
# Boilerplate convenience methods
#
def debug(self, msg, *args, **kwargs):
"""
Delegate a debug call to the underlying logger.
"""
self.log(logging.DEBUG, msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
"""
Delegate an info call to the underlying logger.
"""
self.log(logging.INFO, msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
"""
Delegate a warning call to the underlying logger.
"""
self.log(logging.WARNING, msg, *args, **kwargs)
warn = warning
def error(self, msg, *args, **kwargs):
"""
Delegate an error call to the underlying logger.
"""
self.log(logging.ERROR, msg, *args, **kwargs)
def exception(self, msg, *args, **kwargs):
"""
Delegate an exception call to the underlying logger.
"""
kwargs["exc_info"] = 1
self.log(logging.ERROR, msg, *args, **kwargs)
def critical(self, msg, *args, **kwargs):
"""
Delegate a critical call to the underlying logger.
"""
self.log(logging.CRITICAL, msg, *args, **kwargs)
def log(self, level, msg, *args, **kwargs):
"""
Delegate a log call to the underlying logger, after adding
contextual information from this adapter instance.
"""
if self.isEnabledFor(level):
msg, kwargs = self.process(msg, kwargs)
self.logger._log(level, msg, args, **kwargs)
def isEnabledFor(self, level):
"""
Is this logger enabled for level 'level'?
"""
if self.logger.manager.disable >= level:
return False
return level >= self.getEffectiveLevel()
def setLevel(self, level):
"""
Set the specified level on the underlying logger.
"""
self.logger.setLevel(level)
def getEffectiveLevel(self):
"""
Get the effective level for the underlying logger.
"""
return self.logger.getEffectiveLevel()
def hasHandlers(self):
"""
See if the underlying logger has any handlers.
"""
return logutils.hasHandlers(self.logger)
#
# Copyright (C) 2010-2013 Vinay Sajip. All rights reserved.
#
import ctypes
import logging
import os
try:
unicode
except NameError:
unicode = None
class ColorizingStreamHandler(logging.StreamHandler):
"""
A stream handler which supports colorizing of console streams
under Windows, Linux and Mac OS X.
:param strm: The stream to colorize - typically ``sys.stdout``
or ``sys.stderr``.
"""
# color names to indices
color_map = {
'black': 0,
'red': 1,
'green': 2,
'yellow': 3,
'blue': 4,
'magenta': 5,
'cyan': 6,
'white': 7,
}
#levels to (background, foreground, bold/intense)
if os.name == 'nt':
level_map = {
logging.DEBUG: (None, 'blue', True),
logging.INFO: (None, 'white', False),
logging.WARNING: (None, 'yellow', True),
logging.ERROR: (None, 'red', True),
logging.CRITICAL: ('red', 'white', True),
}
else:
"Maps levels to colour/intensity settings."
level_map = {
logging.DEBUG: (None, 'blue', False),
logging.INFO: (None, 'black', False),
logging.WARNING: (None, 'yellow', False),
logging.ERROR: (None, 'red', False),
logging.CRITICAL: ('red', 'white', True),
}
csi = '\x1b['
reset = '\x1b[0m'
@property
def is_tty(self):
"Returns true if the handler's stream is a terminal."
isatty = getattr(self.stream, 'isatty', None)
return isatty and isatty()
def emit(self, record):
try:
message = self.format(record)
stream = self.stream
if unicode and isinstance(message, unicode):
enc = getattr(stream, 'encoding', 'utf-8')
message = message.encode(enc, 'replace')
if not self.is_tty:
stream.write(message)
else:
self.output_colorized(message)
stream.write(getattr(self, 'terminator', '\n'))
self.flush()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
if os.name != 'nt':
def output_colorized(self, message):
"""
Output a colorized message.
On Linux and Mac OS X, this method just writes the
already-colorized message to the stream, since on these
platforms console streams accept ANSI escape sequences
for colorization. On Windows, this handler implements a
subset of ANSI escape sequence handling by parsing the
message, extracting the sequences and making Win32 API
calls to colorize the output.
:param message: The message to colorize and output.
"""
self.stream.write(message)
else:
import re
ansi_esc = re.compile(r'\x1b\[((?:\d+)(?:;(?:\d+))*)m')
nt_color_map = {
0: 0x00, # black
1: 0x04, # red
2: 0x02, # green
3: 0x06, # yellow
4: 0x01, # blue
5: 0x05, # magenta
6: 0x03, # cyan
7: 0x07, # white
}
def output_colorized(self, message):
"""
Output a colorized message.
On Linux and Mac OS X, this method just writes the
already-colorized message to the stream, since on these
platforms console streams accept ANSI escape sequences
for colorization. On Windows, this handler implements a
subset of ANSI escape sequence handling by parsing the
message, extracting the sequences and making Win32 API
calls to colorize the output.
:param message: The message to colorize and output.
"""
parts = self.ansi_esc.split(message)
write = self.stream.write
h = None
fd = getattr(self.stream, 'fileno', None)
if fd is not None:
fd = fd()
if fd in (1, 2): # stdout or stderr
h = ctypes.windll.kernel32.GetStdHandle(-10 - fd)
while parts:
text = parts.pop(0)
if text:
write(text)
if parts:
params = parts.pop(0)
if h is not None:
params = [int(p) for p in params.split(';')]
color = 0
for p in params:
if 40 <= p <= 47:
color |= self.nt_color_map[p - 40] << 4
elif 30 <= p <= 37:
color |= self.nt_color_map[p - 30]
elif p == 1:
color |= 0x08 # foreground intensity on
elif p == 0: # reset to default color
color = 0x07
else:
pass # error condition ignored
ctypes.windll.kernel32.SetConsoleTextAttribute(h, color)
def colorize(self, message, record):
"""
Colorize a message for a logging event.
This implementation uses the ``level_map`` class attribute to
map the LogRecord's level to a colour/intensity setting, which is
then applied to the whole message.
:param message: The message to colorize.
:param record: The ``LogRecord`` for the message.
"""
if record.levelno in self.level_map:
bg, fg, bold = self.level_map[record.levelno]
params = []
if bg in self.color_map:
params.append(str(self.color_map[bg] + 40))
if fg in self.color_map:
params.append(str(self.color_map[fg] + 30))
if bold:
params.append('1')
if params:
message = ''.join((self.csi, ';'.join(params),
'm', message, self.reset))
return message
def format(self, record):
"""
Formats a record for output.
This implementation colorizes the message line, but leaves
any traceback unolorized.
"""
message = logging.StreamHandler.format(self, record)
if self.is_tty:
# Don't colorize any traceback
parts = message.split('\n', 1)
parts[0] = self.colorize(parts[0], record)
message = '\n'.join(parts)
return message
This diff is collapsed.
#
# Copyright (C) 2010-2013 Vinay Sajip. See LICENSE.txt for details.
#
import logging
class HTTPHandler(logging.Handler):
"""
A class which sends records to a Web server, using either GET or
POST semantics.
:param host: The Web server to connect to.
:param url: The URL to use for the connection.
:param method: The HTTP method to use. GET and POST are supported.
:param secure: set to True if HTTPS is to be used.
:param credentials: Set to a username/password tuple if desired. If
set, a Basic authentication header is sent. WARNING:
if using credentials, make sure `secure` is `True`
to avoid sending usernames and passwords in
cleartext over the wire.
"""
def __init__(self, host, url, method="GET", secure=False, credentials=None):
"""
Initialize an instance.
"""
logging.Handler.__init__(self)
method = method.upper()
if method not in ["GET", "POST"]:
raise ValueError("method must be GET or POST")
self.host = host
self.url = url
self.method = method
self.secure = secure
self.credentials = credentials
def mapLogRecord(self, record):
"""
Default implementation of mapping the log record into a dict
that is sent as the CGI data. Overwrite in your class.
Contributed by Franz Glasner.
:param record: The record to be mapped.
"""
return record.__dict__
def emit(self, record):
"""
Emit a record.
Send the record to the Web server as a percent-encoded dictionary
:param record: The record to be emitted.
"""
try:
import http.client, urllib.parse
host = self.host
if self.secure:
h = http.client.HTTPSConnection(host)
else:
h = http.client.HTTPConnection(host)
url = self.url
data = urllib.parse.urlencode(self.mapLogRecord(record))
if self.method == "GET":
if (url.find('?') >= 0):
sep = '&'
else:
sep = '?'
url = url + "%c%s" % (sep, data)
h.putrequest(self.method, url)
# support multiple hosts on one IP address...
# need to strip optional :port from host, if present
i = host.find(":")
if i >= 0:
host = host[:i]
h.putheader("Host", host)
if self.method == "POST":
h.putheader("Content-type",
"application/x-www-form-urlencoded")
h.putheader("Content-length", str(len(data)))
if self.credentials:
import base64
s = ('u%s:%s' % self.credentials).encode('utf-8')
s = 'Basic ' + base64.b64encode(s).strip()
h.putheader('Authorization', s)
h.endheaders(data if self.method == "POST" else None)
h.getresponse() #can't do anything with the result
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
#
# Copyright (C) 2010-2013 Vinay Sajip. See LICENSE.txt for details.
#
"""
This module contains classes which help you work with queues. A typical
application is when you want to log from performance-critical threads, but
where the handlers you want to use are slow (for example,
:class:`~logging.handlers.SMTPHandler`). In that case, you can create a queue,
pass it to a :class:`QueueHandler` instance and use that instance with your
loggers. Elsewhere, you can instantiate a :class:`QueueListener` with the same
queue and some slow handlers, and call :meth:`~QueueListener.start` on it.
This will start monitoring the queue on a separate thread and call all the
configured handlers *on that thread*, so that your logging thread is not held
up by the slow handlers.
Note that as well as in-process queues, you can use these classes with queues
from the :mod:`multiprocessing` module.
**N.B.** This is part of the standard library since Python 3.2, so the
version here is for use with earlier Python versions.
"""
import logging
try:
import Queue as queue
except ImportError:
import queue
import threading
class QueueHandler(logging.Handler):
"""
This handler sends events to a queue. Typically, it would be used together
with a multiprocessing Queue to centralise logging to file in one process
(in a multi-process application), so as to avoid file write contention
between processes.
:param queue: The queue to send `LogRecords` to.
"""
def __init__(self, queue):
"""
Initialise an instance, using the passed queue.
"""
logging.Handler.__init__(self)
self.queue = queue
def enqueue(self, record):
"""
Enqueue a record.
The base implementation uses :meth:`~queue.Queue.put_nowait`. You may
want to override this method if you want to use blocking, timeouts or
custom queue implementations.
:param record: The record to enqueue.
"""
self.queue.put_nowait(record)
def prepare(self, record):
"""
Prepares a record for queuing. The object returned by this method is
enqueued.
The base implementation formats the record to merge the message
and arguments, and removes unpickleable items from the record
in-place.
You might want to override this method if you want to convert
the record to a dict or JSON string, or send a modified copy
of the record while leaving the original intact.
:param record: The record to prepare.
"""
# The format operation gets traceback text into record.exc_text
# (if there's exception data), and also puts the message into
# record.message. We can then use this to replace the original
# msg + args, as these might be unpickleable. We also zap the
# exc_info attribute, as it's no longer needed and, if not None,
# will typically not be pickleable.
self.format(record)
record.msg = record.message
record.args = None
record.exc_info = None
return record
def emit(self, record):
"""
Emit a record.
Writes the LogRecord to the queue, preparing it for pickling first.
:param record: The record to emit.
"""
try:
self.enqueue(self.prepare(record))
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
class QueueListener(object):
"""
This class implements an internal threaded listener which watches for
LogRecords being added to a queue, removes them and passes them to a
list of handlers for processing.
:param record: The queue to listen to.
:param handlers: The handlers to invoke on everything received from
the queue.
"""
_sentinel = None
def __init__(self, queue, *handlers):
"""
Initialise an instance with the specified queue and
handlers.
"""
self.queue = queue
self.handlers = handlers
self._stop = threading.Event()
self._thread = None
def dequeue(self, block):
"""
Dequeue a record and return it, optionally blocking.
The base implementation uses :meth:`~queue.Queue.get`. You may want to
override this method if you want to use timeouts or work with custom
queue implementations.
:param block: Whether to block if the queue is empty. If `False` and
the queue is empty, an :class:`~queue.Empty` exception
will be thrown.
"""
return self.queue.get(block)
def start(self):
"""
Start the listener.
This starts up a background thread to monitor the queue for
LogRecords to process.
"""
self._thread = t = threading.Thread(target=self._monitor)
t.setDaemon(True)
t.start()
def prepare(self , record):
"""
Prepare a record for handling.
This method just returns the passed-in record. You may want to
override this method if you need to do any custom marshalling or
manipulation of the record before passing it to the handlers.
:param record: The record to prepare.
"""
return record
def handle(self, record):
"""
Handle a record.
This just loops through the handlers offering them the record
to handle.
:param record: The record to handle.
"""
record = self.prepare(record)
for handler in self.handlers:
handler.handle(record)
def _monitor(self):
"""
Monitor the queue for records, and ask the handler
to deal with them.
This method runs on a separate, internal thread.
The thread will terminate if it sees a sentinel object in the queue.
"""
q = self.queue
has_task_done = hasattr(q, 'task_done')
while not self._stop.isSet():
try:
record = self.dequeue(True)
if record is self._sentinel:
break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
pass
# There might still be records in the queue.
while True:
try:
record = self.dequeue(False)
if record is self._sentinel:
break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
break
def enqueue_sentinel(self):
"""
Writes a sentinel to the queue to tell the listener to quit. This
implementation uses ``put_nowait()``. You may want to override this
method if you want to use timeouts or work with custom queue
implementations.
"""
self.queue.put_nowait(self._sentinel)
def stop(self):
"""
Stop the listener.
This asks the thread to terminate, and then waits for it to do so.
Note that if you don't call this before your application exits, there
may be some records still left on the queue, which won't be processed.
"""
self._stop.set()
self.enqueue_sentinel()
self._thread.join()
self._thread = None
#
# Copyright (C) 2011-2013 Vinay Sajip. See LICENSE.txt for details.
#
"""
This module contains classes which help you work with Redis queues.
"""
from logutils.queue import QueueHandler, QueueListener
try:
import cPickle as pickle
except ImportError:
import pickle
class RedisQueueHandler(QueueHandler):
"""
A QueueHandler implementation which pushes pickled
records to a Redis queue using a specified key.
:param key: The key to use for the queue. Defaults to
"python.logging".
:param redis: If specified, this instance is used to
communicate with a Redis instance.
:param limit: If specified, the queue is restricted to
have only this many elements.
"""
def __init__(self, key='python.logging', redis=None, limit=0):
if redis is None:
from redis import Redis
redis = Redis()
self.key = key
assert limit >= 0
self.limit = limit
QueueHandler.__init__(self, redis)
def enqueue(self, record):
s = pickle.dumps(vars(record))
self.queue.rpush(self.key, s)
if self.limit:
self.queue.ltrim(self.key, -self.limit, -1)
class RedisQueueListener(QueueListener):
"""
A QueueListener implementation which fetches pickled
records from a Redis queue using a specified key.
:param key: The key to use for the queue. Defaults to
"python.logging".
:param redis: If specified, this instance is used to
communicate with a Redis instance.
"""
def __init__(self, *handlers, **kwargs):
redis = kwargs.get('redis')
if redis is None:
from redis import Redis
redis = Redis()
self.key = kwargs.get('key', 'python.logging')
QueueListener.__init__(self, redis, *handlers)
def dequeue(self, block):
"""
Dequeue and return a record.
"""
if block:
s = self.queue.blpop(self.key)[1]
else:
s = self.queue.lpop(self.key)
if not s:
record = None
else:
record = pickle.loads(s)
return record
def enqueue_sentinel(self):
self.queue.rpush(self.key, '')
#
# Copyright (C) 2010-2013 Vinay Sajip. See LICENSE.txt for details.
#
import logging
from logging.handlers import BufferingHandler
class TestHandler(BufferingHandler):
"""
This handler collects records in a buffer for later inspection by
your unit test code.
:param matcher: The :class:`~logutils.testing.Matcher` instance to
use for matching.
"""
def __init__(self, matcher):
# BufferingHandler takes a "capacity" argument
# so as to know when to flush. As we're overriding
# shouldFlush anyway, we can set a capacity of zero.
# You can call flush() manually to clear out the
# buffer.
BufferingHandler.__init__(self, 0)
self.formatted = []
self.matcher = matcher
def shouldFlush(self):
"""
Should the buffer be flushed?
This returns `False` - you'll need to flush manually, usually after
your unit test code checks the buffer contents against your
expectations.
"""
return False
def emit(self, record):
"""
Saves the `__dict__` of the record in the `buffer` attribute,
and the formatted records in the `formatted` attribute.
:param record: The record to emit.
"""
self.formatted.append(self.format(record))
self.buffer.append(record.__dict__)
def flush(self):
"""
Clears out the `buffer` and `formatted` attributes.
"""
BufferingHandler.flush(self)
self.formatted = []
def matches(self, **kwargs):
"""
Look for a saved dict whose keys/values match the supplied arguments.
Return `True` if found, else `False`.
:param kwargs: A set of keyword arguments whose names are LogRecord
attributes and whose values are what you want to
match in a stored LogRecord.
"""
result = False
for d in self.buffer:
if self.matcher.matches(d, **kwargs):
result = True
break
#if not result:
# print('*** matcher failed completely on %d records' % len(self.buffer))
return result
def matchall(self, kwarglist):
"""
Accept a list of keyword argument values and ensure that the handler's
buffer of stored records matches the list one-for-one.
Return `True` if exactly matched, else `False`.
:param kwarglist: A list of keyword-argument dictionaries, each of
which will be passed to :meth:`matches` with the
corresponding record from the buffer.
"""
if self.count != len(kwarglist):
result = False
else:
result = True
for d, kwargs in zip(self.buffer, kwarglist):
if not self.matcher.matches(d, **kwargs):
result = False
break
return result
@property
def count(self):
"""
The number of records in the buffer.
"""
return len(self.buffer)
class Matcher(object):
"""
This utility class matches a stored dictionary of
:class:`logging.LogRecord` attributes with keyword arguments
passed to its :meth:`~logutils.testing.Matcher.matches` method.
"""
_partial_matches = ('msg', 'message')
"""
A list of :class:`logging.LogRecord` attribute names which
will be checked for partial matches. If not in this list,
an exact match will be attempted.
"""
def matches(self, d, **kwargs):
"""
Try to match a single dict with the supplied arguments.
Keys whose values are strings and which are in self._partial_matches
will be checked for partial (i.e. substring) matches. You can extend
this scheme to (for example) do regular expression matching, etc.
Return `True` if found, else `False`.
:param kwargs: A set of keyword arguments whose names are LogRecord
attributes and whose values are what you want to
match in a stored LogRecord.
"""
result = True
for k in kwargs:
v = kwargs[k]
dv = d.get(k)
if not self.match_value(k, dv, v):
#print('*** matcher failed: %s, %r, %r' % (k, dv, v))
result = False
break
return result
def match_value(self, k, dv, v):
"""
Try to match a single stored value (dv) with a supplied value (v).
Return `True` if found, else `False`.
:param k: The key value (LogRecord attribute name).
:param dv: The stored value to match against.
:param v: The value to compare with the stored value.
"""
if type(v) != type(dv):
result = False
elif type(dv) is not str or k not in self._partial_matches:
result = (v == dv)
else:
result = dv.find(v) >= 0
#if not result:
# print('*** matcher failed on %s: %r vs. %r' % (k, dv, v))
return result
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment