Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#
# Copyright (C) 2010-2013 Vinay Sajip. See LICENSE.txt for details.
#
"""
This module contains classes which help you work with queues. A typical
application is when you want to log from performance-critical threads, but
where the handlers you want to use are slow (for example,
:class:`~logging.handlers.SMTPHandler`). In that case, you can create a queue,
pass it to a :class:`QueueHandler` instance and use that instance with your
loggers. Elsewhere, you can instantiate a :class:`QueueListener` with the same
queue and some slow handlers, and call :meth:`~QueueListener.start` on it.
This will start monitoring the queue on a separate thread and call all the
configured handlers *on that thread*, so that your logging thread is not held
up by the slow handlers.
Note that as well as in-process queues, you can use these classes with queues
from the :mod:`multiprocessing` module.
**N.B.** This is part of the standard library since Python 3.2, so the
version here is for use with earlier Python versions.
"""
import logging
try:
import Queue as queue
except ImportError:
import queue
import threading
class QueueHandler(logging.Handler):
"""
This handler sends events to a queue. Typically, it would be used together
with a multiprocessing Queue to centralise logging to file in one process
(in a multi-process application), so as to avoid file write contention
between processes.
:param queue: The queue to send `LogRecords` to.
"""
def __init__(self, queue):
"""
Initialise an instance, using the passed queue.
"""
logging.Handler.__init__(self)
self.queue = queue
def enqueue(self, record):
"""
Enqueue a record.
The base implementation uses :meth:`~queue.Queue.put_nowait`. You may
want to override this method if you want to use blocking, timeouts or
custom queue implementations.
:param record: The record to enqueue.
"""
self.queue.put_nowait(record)
def prepare(self, record):
"""
Prepares a record for queuing. The object returned by this method is
enqueued.
The base implementation formats the record to merge the message
and arguments, and removes unpickleable items from the record
in-place.
You might want to override this method if you want to convert
the record to a dict or JSON string, or send a modified copy
of the record while leaving the original intact.
:param record: The record to prepare.
"""
# The format operation gets traceback text into record.exc_text
# (if there's exception data), and also puts the message into
# record.message. We can then use this to replace the original
# msg + args, as these might be unpickleable. We also zap the
# exc_info attribute, as it's no longer needed and, if not None,
# will typically not be pickleable.
self.format(record)
record.msg = record.message
record.args = None
record.exc_info = None
return record
def emit(self, record):
"""
Emit a record.
Writes the LogRecord to the queue, preparing it for pickling first.
:param record: The record to emit.
"""
try:
self.enqueue(self.prepare(record))
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
class QueueListener(object):
"""
This class implements an internal threaded listener which watches for
LogRecords being added to a queue, removes them and passes them to a
list of handlers for processing.
:param record: The queue to listen to.
:param handlers: The handlers to invoke on everything received from
the queue.
"""
_sentinel = None
def __init__(self, queue, *handlers):
"""
Initialise an instance with the specified queue and
handlers.
"""
self.queue = queue
self.handlers = handlers
self._stop = threading.Event()
self._thread = None
def dequeue(self, block):
"""
Dequeue a record and return it, optionally blocking.
The base implementation uses :meth:`~queue.Queue.get`. You may want to
override this method if you want to use timeouts or work with custom
queue implementations.
:param block: Whether to block if the queue is empty. If `False` and
the queue is empty, an :class:`~queue.Empty` exception
will be thrown.
"""
return self.queue.get(block)
def start(self):
"""
Start the listener.
This starts up a background thread to monitor the queue for
LogRecords to process.
"""
self._thread = t = threading.Thread(target=self._monitor)
t.setDaemon(True)
t.start()
def prepare(self , record):
"""
Prepare a record for handling.
This method just returns the passed-in record. You may want to
override this method if you need to do any custom marshalling or
manipulation of the record before passing it to the handlers.
:param record: The record to prepare.
"""
return record
def handle(self, record):
"""
Handle a record.
This just loops through the handlers offering them the record
to handle.
:param record: The record to handle.
"""
record = self.prepare(record)
for handler in self.handlers:
handler.handle(record)
def _monitor(self):
"""
Monitor the queue for records, and ask the handler
to deal with them.
This method runs on a separate, internal thread.
The thread will terminate if it sees a sentinel object in the queue.
"""
q = self.queue
has_task_done = hasattr(q, 'task_done')
while not self._stop.isSet():
try:
record = self.dequeue(True)
if record is self._sentinel:
break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
pass
# There might still be records in the queue.
while True:
try:
record = self.dequeue(False)
if record is self._sentinel:
break
self.handle(record)
if has_task_done:
q.task_done()
except queue.Empty:
break
def enqueue_sentinel(self):
"""
Writes a sentinel to the queue to tell the listener to quit. This
implementation uses ``put_nowait()``. You may want to override this
method if you want to use timeouts or work with custom queue
implementations.
"""
self.queue.put_nowait(self._sentinel)
def stop(self):
"""
Stop the listener.
This asks the thread to terminate, and then waits for it to do so.
Note that if you don't call this before your application exits, there
may be some records still left on the queue, which won't be processed.
"""
self._stop.set()
self.enqueue_sentinel()
self._thread.join()
self._thread = None