Concurrency HOWTO¶
There are many outstanding resources, both online and in print, that would do an excellent job of introducing you to concurrency. This howto document builds on those by walking you through how to apply that knowledge using Python.
Python supports the following concurrency models directly:
free-threading (stdlib, C-API)
isolated threads, AKA CSP/actor model (stdlib*, C-API)
coroutines, AKA async/await (language, stdlib, C-API)
multi-processing (stdlib)
distributed, e.g. SMP (stdlib (limited))
In this document, we’ll look at how to take advantage of Python’s concurrency support. The overall focus is on the following:
Note
You should always make sure concurrency is the right tool for the job before you reach for it when solving your problem. There are many cases where concurrency simply isn’t applicable or will only complicate the solution. In-depth discussion of this point is outside the scope of this document.
Note
Free-threading is one of the oldest concurrency models, fundamental to operating systems, and widely supported in programming languages. However, it is generally considered perilous and not human-friendly. Other concurrency models have demonstrated better usability and newer programming languages typically avoid exposing threads directly. Take that into consideration before reaching for threads and look at the alternatives first.
Note
Python supports other concurrency models indirectly through community-maintained PyPI packages. One well-known example is dask, which supports “distributed” computing.
Quick reference¶
Terminology
We’ll be using the following terms and ideas throughout:
- task (logical thread)
- a cohesive linear sequence of abstract steps in a program;effectively, a mini-program;the logical equivalent of executed instructions corresponding to code;also known as “logical process”
- physical thread (OS thread)
- where the actual code for a logical thread runs on the CPU (and operating system);we avoid using using plain “thread” for this, to avoid ambiguity
- Python thread
- the Python runtime running in a physical threadparticularly the portion of the runtime state active in the physical thread(see
threading.Thread) - concurrency (multitasking)
- a program with multiple logical threads running simultaneously(not necessarily in parallel)
- parallelism (multi-core)
running a program’s multiple logical threads on multiple physical threads (CPU cores)
For convenience, here is a summary of what we’ll cover later.
Concurrency Primitives
primitive |
used with |
purpose |
|---|---|---|
… |
… |
… |
High-level App Examples
workload (app) |
per-request inputs |
per-request outputs |
N core tasks |
core task |
|---|---|---|---|---|
N filenames (stdin)
file bytes x N (disk)
|
M matches (stdout) |
1+ per file |
time: ~ file size
mem: small
|
|
… |
… |
… |
… |
|
… |
… |
… |
… |
Each has side-by-side implementations for the different models:
workload (app) |
side-by-side examples |
|---|---|
Python Concurrency Models¶
As mentioned, there are essentially five concurrency models that Python supports directly:
model |
Python stdlib |
description |
|---|---|---|
free threading |
using multiple physical threads in the same process,
with no isolation between them
|
|
isolated threads
(multiple interpreters)
|
threads, often physical, with strict isolation between them
(e.g. CSP and actor model)
|
|
coroutines (async/await) |
switching between logical threads is explicitly controlled by each |
|
multi-processing |
using multiple isolated processes |
|
distributed |
(limited)
|
multiprocessing across multiple computers |
There are tradeoffs to each, whether in performance or complexity. We’ll take a look at those tradeoffs in detail later.
Before that, we’ll review various comparisons of the concurrency models, and we’ll briefly talk about critical caveats for specific models.
Comparison tables¶
The following tables provide a detailed look with side-by-side comparisons.
key characteristics¶
scale |
multi-core |
overhead |
||
|---|---|---|---|---|
free-threading |
small-medium |
yes |
very low |
|
multiple interpreters |
small-medium |
yes |
limited |
|
coroutines |
small-medium |
no |
no |
low |
multi-processing |
small |
yes |
limited |
medium |
distributed |
large |
yes |
limited |
medium |
overhead details¶
memory |
startup |
cross-task |
management |
system |
|
|---|---|---|---|---|---|
free threading |
very low |
very low |
none |
very low |
none |
multiple interpreters |
low |
very low |
none |
||
coroutines |
low |
low |
none |
low |
none |
multi-processing |
medium |
medium |
medium |
medium |
low |
distributed |
medium+ |
medium+ |
medium-high |
medium |
low-medium |
complexity¶
parallel |
shared
mem
|
shared
I/O
|
shared
env
|
cross
thread
|
sync |
tracking |
compat |
extra
LOC
|
|
|---|---|---|---|---|---|---|---|---|---|
free-threading |
all |
all |
yes |
high |
explicit |
yes |
low? |
||
multiple interpreters |
yes |
limited |
all |
yes |
low |
implicit |
??? |
yes |
low? |
coroutines |
no |
all |
all |
yes |
low-med? |
implicit |
??? |
no |
low-med |
multi-processing |
yes |
limited |
no |
no? |
low |
implicit
+optional
|
??? |
yes |
low-med? |
distributed |
yes |
limited |
no |
no? |
low |
implicit
+optional
|
??? |
yes |
medium? |
exposure¶
academic
research
|
academic
curriculum
|
industry |
examples |
Python
history
|
|
|---|---|---|---|---|---|
free-threading |
very high |
high |
high |
high |
0.9? |
isolated threads
(multiple interpreters)
|
high |
low? |
low-medium? |
low-medium? |
|
coroutines |
medium-high? |
medium? |
medium? |
medium-high? |
3.3-3.5 (2.2) |
multi-processing |
??? |
low? |
low-medium? |
low? |
2.6 |
distributed |
medium-high? |
low? |
medium? |
medium? |
n/a |
Critical caveats¶
Here are some important details to consider, specific to individual concurrency models in Python.
Data races and non-deterministic scheduling (free-threading)¶
The principal caveat for physical threads is that each thread shares the full memory of the process with all its other threads. Combined with their non-deterministic scheduling (and parallel execution), threads expose programs to a significant risk of races.
The potential consequences of a race are data corruption and invalidated expectations of data consistency. In each case, the non-deterministic scheduling of threads means it is both hard to reproduce races and to track down where a race happened. These qualities make these bugs especially frustrating and worth diligently avoiding.
Python threads are light wrappers around physical threads and thus have the same caveats. The majority of data in a Python program is mutable and all of the program’s data is subject to potential modification by any thread at any moment. This requires extra effort, to synchronize around reads and writes. Furthermore, given the maximally-broad scope of the data involved, it’s difficult to be sure all possible races have been dealt with, especially as a code base changes over time.
The other concurrency models essentially don’t have this problem.
In the case of coroutines, explicit cooperative scheduling eliminates
the risk of a simultaneous read-write or write-write. It also means
program logic can rely on memory consistency between synchronization
points (await).
With the remaining concurrency models, data is never shared between logical threads unless done explicitly (typically at the existing inherent points of synchronization). By default that shared data is either read-only or managed in a thread-safe way. Most notably, the opt-in sharing means the set of shared data to manage is explicitly defined (and often small) instead of covering all memory in the process.
The Global Interpreter Lock (GIL)¶
While physical threads are the direct route to multi-core parallelism, Python’s threads have always had an extra wrinkle that gets in the way: the global interpreter lock (GIL).
The GIL is very efficient tool for keeping the Python implementation simple, which is an important constraint for the project. In fact, it protects Python’s maintainers and users from a large category of concurrency problems that one must normally face when threads are involved.
The big tradeoff is that the bytecode interpreter, which executes your Python code, only runs while holding the GIL. That means only one thread can be running Python code at a time. Threads will take short turns, so none have to wait too long, but it still prevents any actual parallelism of CPU-bound code.
That said, the Python runtime (and extension modules) can release the GIL when the thread is doing slow or long-running work unrelated to Python, like a blocking IO operation.
There is also an ongoing effort to eliminate the GIL: PEP 703. Any attempt to remove the GIL necessarily involves some slowdown to single-threaded performance and extra maintenance burden to the Python project and extension module maintainers. However, there is sufficient interest in unlocking full multi-core parallelism to justify the current experiment.
You can also move from free-threading to isolated threads using multiple interpreters. Each interpreter has has its own GIL. Thus, If you want multi-core parallelism, run a different interpreter in each thread. Their isolation means that each can run unblocked in that thread.
Thread isolation and multiple interpreters¶
As just noted, races effectively stop being a problem if the memory used by each physical thread is effectively isolated from the others. That isolation can also help with the other caveats related to physical threads. In Python you can get this isolation by using multiple interpreters.
In this context, an “interpreter” represents nearly all the capability and state of the Python runtime, for its C-API and to execute Python code. The full runtime supports multiple interpreters and includes some state that all interpreters share. Most importantly, the state of each interpreter is effectively isolated from the others.
That isolation includes things like sys.modules. By default,
interpreters mostly don’t share any data (including objects) at all.
Anything that gets shared is done on a strictly opt-in basis. That
means programmers wouldn’t need to worry about possible races with
any data in the program. They would only need to worry about data
that was explicitly shared.
Interpreters themselves are not specific to any thread, but instead each physical thread has (at most) one interpreter active at any given moment. Each interpreter can be associated in this way with any number of threads. Since each interpreter is isolated from the others, any thread using one interpreter is thus isolated from threads using any other interpreter.
Using multiple interpreters is fairly straight-forward:
create a new interpreter
switch the current thread to use that interpreter
call
exec(), but targeting the new interpreterswitch back
Note that no threads were involved; running in an interpreter happens relative to the current thread. New threads aren’t implicitly involved.
Multi-processing and distributed computing provide similar isolation, though with some tradeoffs.
A stdlib module for using multiple interpreters¶
While use of multiple interpreters has been part of Python’s C-API
for decades, the feature hasn’t been exposed to Python code through
the stdlib. PEP 734 proposes changing that by adding a new
interpreters module.
In the meantime, an implementation of that PEP is available for Python 3.13+ on PyPI: interpreters-pep-734.
Improving performance for multiple interpreters¶
The long-running effort to improve on Python’s implementation of multiple interpreters focused on isolation and stability; very little done to improve performance. This has the most impact on:
how much memory each interpreter uses (i.e. how many can run at the same time)
how long it takes to create a new interpreter
It also impacts how efficiently data/objects can be passed between interpreters, and how effectively objects can be shared.
As the work on isolation wraps up, improvements will shift to focus on performance and memory usage. Thus, the overhead of using multiple interpreters will drastically decrease over time.
Tracing execution¶
TBD
Coroutines are contagious¶
Coroutines can be an effective mechanism for letting a program’s non-blocking code run while simultaneously waiting for blocking code to finish. The tricky part is that the underlying machinery (the event loop) relies on each coroutine explicitly yielding control at the appropriate moments.
Normal functions do not follow this pattern, so they cannot take advantage of that cooperative scheduling to avoid blocking the program. Thus, coroutines and non-coroutines don’t mix well. While there are tools for wrapping normal functions to act like coroutines, they are often converted into coroutines instead. At that point, if any non-async code relies on the function then either you’ll need to convert the other code a coroutine or you’ll need to keep the original non-async implementation around along with the new, almost identical async one.
You can see how that can proliferate, leading to possible extra maintenance/development costs.
Processes consume extra resources¶
When using multi-processing for concurrency, keep in mind that the operating system will assign a certain set of limited resources to each process. For example, each process has its own PID and handle to the executable. You can run only so many processes before you run out of these resources. Concurrency in a single process doesn’t have this problem, and a distributed program can work around it.
Using multiprocessing for distributed computing¶
Not only does the multiprocessing module support concurrency
with multiple local processes, it can also support a distributed model
using remote computers. That said, consider first looking into tools
that have been designed specifically for distributed computing,
like dask.
Resilience to crashes¶
A process can crash if it does something it shouldn’t, like try to access memory outside what the OS has provided it. If your program is running in multiple processes (incl. distributed) then you can more easily recover from a crash in any one process. Recovering from a crash when using free-threading, multiple interpreters, or coroutines isn’t nearly so easy.
High-level APIs¶
Also note that Python’s stdlib provides various higher-level APIs that support these concurrency models in various contexts:
free-threading |
|||
multiple interpreters |
(pending) |
||
coroutines |
??? |
||
multi-processing |
|||
distributed |
??? |
Designing A Program For Concurrency¶
Whether you are starting a new project using concurrency or refactoring an existing one to use it, it’s important to design for concurrency before taking one more step. Doing so will save you a lot of headache later.
decide if your program might benefit from concurrency
identify the other concurrency-related characteristics of your program
go for it!
At each step you should be continuously asking yourself if concurrency is still a good fit for your program.
Some problems are obviously not solvable with concurrency. Otherwise, even if you could use concurrency, it might not provide much value. Furthermore, even if it seems like it would provide meaningful value, the additional costs in performance, complexity, or maintainability might outweigh that benefit.
Thus, when you’re thinking of solving a problem using concurrency, it’s crucial that you understand the problem well.
Getting started¶
How can concurrency help?¶
TBD
How can concurrency hurt?¶
TBD
Analyze your problem¶
Identifying the logical tasks in your program¶
TBD
The concurrency characteristics of your program¶
TBD
Other considerations¶
TBD
Pick a concurrency model¶
TBD
Python Concurrency Primitives¶
TBD
Group A¶
primitive 1¶
TBD
Group B¶
primitive 1¶
TBD
Python Concurrency Workload Examples¶
Below, we have a series of examples of how to implement the most common Python workloads that take advantage of concurrency. For each workload, you will find an implementation for each of the concurrency models.
The implementations are meant to accurately demonstrate how best
to solve the problem using the given concurrency model. The examples
for the workload are presented side-by-side, for easier comparison.
The examples for threads, multiprocessing, and multiple interpreters
will use concurrent.futures when that is the better approach.
Performance comparisons are not included here.
Here’s a summary of the examples, by workload:
workload |
req in |
req out |
N core tasks |
core task |
|---|---|---|---|---|
N filenames (stdin)
file bytes x N (disk)
|
M matches (stdout) |
1+ per file |
time: ~ file size
mem: small
|
|
… |
… |
… |
… |
|
… |
… |
… |
… |
Also see:
Note
Each example is implemented as a basic command line tool, but can be easily adapted to run as a web service.
Workload: grep¶
This is a basic Python implementation of the linux grep tool.
We read from one or more files and report about lines that match
(or don’t match) the given regular expression.
This represents a workload involving a mix of moderate IO and CPU work.
For full example code see the side-by-side implementations below.
Design and analysis¶
Design steps from above:
concurrency fits?
Yes! There is potentially a bunch of work happening at the same time, and we want results as fast as possible.
identify logical tasks
At a high level, the application works like this:
handle args (including compile regex)
if recursive, walk tree to find filenames
for each file, yield each match
print each match
exit with 0 if matched and 1 otherwise
At step 3 we do the following for each file:
a. open the file b. iterate over the lines c. apply the regex to each line d. yield each match e. close the file
select concurrent tasks
Concurrent work happens at step 3. Sub-steps a, b, and e are IO-intensive. Sub-step c is CPU-intensive. The simplest approach would be one concurrent worker per file. Relative to a strictly sequential approach, there’s extra complexity here in managing the workers, fanning out the work to them, and merging the results back into a single iterator.
If we were worried about any particularly large file or sufficiently large regular expression, we could take things further. That would involve splitting up step 3 even further by breaking the file into chunks that are divided up among multiple workers. However, doing so would introduce extra complexity that might not pay for itself.
concurrency-related characteristics
TBD
pick best model
TBD
Here are additional key constraints and considerations:
there’s usually a limit to how many files can be open concurrently, so we’ll have to be careful not to process too many at once
the order of the yielded/printed matches must match the order of the requested files and the order of each files lines
High-level code¶
With the initial design and analysis done, let’s move on to code. We’ll start with the high-level code corresponding to the application’s five top-level tasks we identified earlier.
Most of the high-level code has nothing to do with concurrency.
The part that does, search(), is highlighted.
(expand)
1def main(regex=regex, filenames=filenames):
2 # step 1
3 regex = re.compile(regex)
4 # step 2
5 filenames = resolve_filenames(filenames, recursive)
6 # step 3
7 matches = search(filenames, regex, opts)
8
9 # step 4
10
11 if hasattr(type(matches), '__aiter__'):
12 async def iter_and_show(matches=matches):
13 matches = type(matches).__aiter__(matches)
14
15 # Handle the first match.
16 async for filename, line in matches:
17 if opts.quiet:
18 return 0
19 elif opts.filesonly:
20 print(filename)
21 else:
22 async for second in matches:
23 print(f'{filename}: {line}')
24 filename, line = second
25 print(f'{filename}: {line}')
26 break
27 else:
28 print(line)
29 break
30 else:
31 return 1
32
33 # Handle the remaining matches.
34 if opts.filesonly:
35 async for filename, _ in matches:
36 print(filename)
37 else:
38 async for filename, line in matches:
39 print(f'{filename}: {line}')
40
41 return 0
42 return asyncio.run(search_and_show())
43 else:
44 matches = iter(matches)
45
46 # Handle the first match.
47 for filename, line in matches:
48 if opts.quiet:
49 return 0
50 elif opts.filesonly:
51 print(filename)
52 else:
53 for second in matches:
54 print(f'{filename}: {line}')
55 filename, line = second
56 print(f'{filename}: {line}')
57 break
58 else:
59 print(line)
60 break
61 else:
62 return 1
63
64 # Handle the remaining matches.
65 if opts.filesonly:
66 for filename, _ in matches:
67 print(filename)
68 else:
69 for filename, line in matches:
70 print(f'{filename}: {line}')
71
72 return 0
73rc = main()
74
75# step 5
76sys.exit(rc)
The search() function that gets called returns an iterator
(or async iterator) that yields the matches, which get printed.
Here’s the high-level code again, but with highlighting on each line
that uses the iterator.
(expand)
1def main(regex=regex, filenames=filenames):
2 # step 1
3 regex = re.compile(regex)
4 # step 2
5 filenames = resolve_filenames(filenames, recursive)
6 # step 3
7 matches = search(filenames, regex, opts)
8
9 # step 4
10
11 if hasattr(type(matches), '__aiter__'):
12 async def iter_and_show(matches=matches):
13 matches = type(matches).__aiter__(matches)
14
15 # Handle the first match.
16 async for filename, line in matches:
17 if opts.quiet:
18 return 0
19 elif opts.filesonly:
20 print(filename)
21 else:
22 async for second in matches:
23 print(f'{filename}: {line}')
24 filename, line = second
25 print(f'{filename}: {line}')
26 break
27 else:
28 print(line)
29 break
30 else:
31 return 1
32
33 # Handle the remaining matches.
34 if opts.filesonly:
35 async for filename, _ in matches:
36 print(filename)
37 else:
38 async for filename, line in matches:
39 print(f'{filename}: {line}')
40
41 return 0
42 return asyncio.run(search_and_show())
43 else:
44 matches = iter(matches)
45
46 # Handle the first match.
47 for filename, line in matches:
48 if opts.quiet:
49 return 0
50 elif opts.filesonly:
51 print(filename)
52 else:
53 for second in matches:
54 print(f'{filename}: {line}')
55 filename, line = second
56 print(f'{filename}: {line}')
57 break
58 else:
59 print(line)
60 break
61 else:
62 return 1
63
64 # Handle the remaining matches.
65 if opts.filesonly:
66 for filename, _ in matches:
67 print(filename)
68 else:
69 for filename, line in matches:
70 print(f'{filename}: {line}')
71
72 return 0
73rc = main()
74
75# step 5
76sys.exit(rc)
Here’s the search function for a non-concurrent implementation:
1def search_sequential(filenames, regex, opts):
2 for filename in filenames:
3 lines = iter_lines(filename)
4 yield from search_lines(lines, regex, opts, filename)
iter_lines() is a straight-forward helper that opens the file
and yields each line.
search_lines() is a sequential-search helper used by all the
example implementations here:
1def search_lines(lines, regex, opts, filename):
2 try:
3 if opts.filesonly:
4 if opts.invert:
5 for line in lines:
6 m = regex.search(line)
7 if m:
8 break
9 else:
10 yield (filename, None)
11 else:
12 for line in lines:
13 m = regex.search(line)
14 if m:
15 yield (filename, None)
16 break
17 else:
18 assert not opts.invert, opts
19 for line in lines:
20 m = regex.search(line)
21 if not m:
22 continue
23 if line.endswith(os.linesep):
24 line = line[:-len(os.linesep)]
25 yield (filename, line)
26 except UnicodeDecodeError:
27 # It must be a binary file.
28 return
Concurrent Code¶
Now lets look at how concurrency actually fits in. We’ll start with an example using threads. However, the pattern is essentially the same for all the concurrency models.
1def search_using_threads(filenames, regex, opts):
2 matches_by_file = queue.Queue()
3
4 def do_background():
5 MAX_FILES = 10
6 MAX_MATCHES = 100
7
8 # Make sure we don't have too many threads at once,
9 # i.e. too many files open at once.
10 counter = threading.Semaphore(MAX_FILES)
11
12 def search_file(filename, matches):
13 lines = iter_lines(filename)
14 for match in search_lines(lines, regex, opts, filename):
15 matches.put(match) # blocking
16 matches.put(None) # blocking
17 # Let a new thread start.
18 counter.release()
19
20 for filename in filenames:
21 # Prepare for the file.
22 matches = queue.Queue(MAX_MATCHES)
23 matches_by_file.put(matches)
24
25 # Start a thread to process the file.
26 t = threading.Thread(target=search_file, args=(filename, matches))
27 counter.acquire()
28 t.start()
29 matches_by_file.put(None)
30
31 background = threading.Thread(target=do_background)
32 background.start()
33
34 # Yield the results as they are received, in order.
35 matches = matches_by_file.get() # blocking
36 while matches is not None:
37 match = matches.get() # blocking
38 while match is not None:
39 yield match
40 match = matches.get() # blocking
41 matches = matches_by_file.get() # blocking
42
43 background.join()
We loop over the filenames and start a thread for each one. Each one sends the matches it finds back using a queue.
We want to start yielding matches as soon as possible, so we also use a background thread to run the code that loops over the filenames.
We use a queue of queues (matches_by_file) to make sure
we get results back in the right order, regardless of when the worker
threads provide them.
The operating system will only let us have so many files open at once,
so we limit how many workers are running. (MAX_FILES)
If the workers find matches substantially faster than we can use them
then we may end up using more memory than we need to. To avoid any
backlog, we limit how many matches can be queued up for any given file.
(MAX_MATCHES)
One notable point is that the actual files are not opened until we need to iterate over the lines. For the most part, this is so we can avoid dealing with passing an open file to a concurrency worker. Instead we pass the filename, which is much simpler.
Finally, we have to manage the workers manually. If we used concurrent.futures, it would take care of that for us.
Here are some things we don’t do but might be worth doing:
stop iteration when requested (or for
ctrl-C)split up each file between multiple workers
…
Recall that the search() function returns an iterator that yields
all the matches. Concurrency may be happening as long as that iterator
hasn’t been exhausted. That means it is happening more or less the
entire time we loop over the matches to print them in main()
(in the high-level code above).
Side-by-side¶
Here are the implementations for the different concurrency models, side-by-side for easy comparison (main differences highlighted):
sequential |
threads |
multiple interpreters |
coroutines |
multiple processes |
concurrent.futures |
|---|---|---|---|---|---|
(expand) 1import os
2import os.path
3import re
4import sys
5
6
7def search(filenames, regex, opts):
8 for filename in filenames:
9 # iter_lines() opens the file too.
10 lines = iter_lines(filename)
11 yield from search_lines(
12 lines, regex, opts, filename)
13
14
15def iter_lines(filename):
16 if filename == '-':
17 yield from sys.stdin
18 else:
19 with open(filename) as infile:
20 yield from infile
21
22
23def search_lines(lines, regex, opts, filename):
24 try:
25 if opts.filesonly:
26 if opts.invert:
27 for line in lines:
28 m = regex.search(line)
29 if m:
30 break
31 else:
32 yield (filename, None)
33 else:
34 for line in lines:
35 m = regex.search(line)
36 if m:
37 yield (filename, None)
38 break
39 else:
40 assert not opts.invert, opts
41 for line in lines:
42 m = regex.search(line)
43 if not m:
44 continue
45 if line.endswith(os.linesep):
46 line = line[:-len(os.linesep)]
47 yield (filename, line)
48 except UnicodeDecodeError:
49 # It must be a binary file.
50 return
51
52
53def resolve_filenames(filenames, recursive=False):
54 for filename in filenames:
55 assert isinstance(filename, str), repr(filename)
56 if filename == '-':
57 yield '-'
58 elif not os.path.isdir(filename):
59 yield filename
60 elif recursive:
61 for d, _, files in os.walk(filename):
62 for base in files:
63 yield os.path.join(d, base)
64
65
66if __name__ == '__main__':
67 # Parse the args.
68 import argparse
69 ap = argparse.ArgumentParser(prog='grep')
70
71 ap.add_argument('-r', '--recursive',
72 action='store_true')
73 ap.add_argument('-L', '--files-without-match',
74 dest='filesonly',
75 action='store_const', const='invert')
76 ap.add_argument('-l', '--files-with-matches',
77 dest='filesonly',
78 action='store_const', const='match')
79 ap.add_argument('-q', '--quiet', action='store_true')
80 ap.set_defaults(invert=False)
81
82 reopts = ap.add_mutually_exclusive_group(required=True)
83 reopts.add_argument('-e', '--regexp', dest='regex',
84 metavar='REGEX')
85 reopts.add_argument('regex', nargs='?',
86 metavar='REGEX')
87
88 ap.add_argument('files', nargs='+', metavar='FILE')
89
90 opts = ap.parse_args()
91 ns = vars(opts)
92
93 regex = ns.pop('regex')
94 filenames = ns.pop('files')
95 recursive = ns.pop('recursive')
96 if opts.filesonly:
97 if opts.filesonly == 'invert':
98 opts.invert = True
99 else:
100 assert opts.filesonly == 'match', opts
101 opts.invert = False
102 opts.filesonly = bool(opts.filesonly)
103
104 def main(regex=regex, filenames=filenames):
105 # step 1
106 regex = re.compile(regex)
107 # step 2
108 filenames = resolve_filenames(filenames, recursive)
109 # step 3
110 matches = search(filenames, regex, opts)
111 matches = iter(matches)
112
113 # step 4
114
115 # Handle the first match.
116 for filename, line in matches:
117 if opts.quiet:
118 return 0
119 elif opts.filesonly:
120 print(filename)
121 else:
122 for second in matches:
123 print(f'{filename}: {line}')
124 filename, line = second
125 print(f'{filename}: {line}')
126 break
127 else:
128 print(line)
129 break
130 else:
131 return 1
132
133 # Handle the remaining matches.
134 if opts.filesonly:
135 for filename, _ in matches:
136 print(filename)
137 else:
138 for filename, line in matches:
139 print(f'{filename}: {line}')
140
141 return 0
142 rc = main()
143
144 # step 5
145 sys.exit(rc)
|
(expand) 1import os
2import os.path
3import re
4import sys
5
6import queue
7import threading
8
9
10def search(filenames, regex, opts):
11 matches_by_file = queue.Queue()
12
13 def do_background():
14 MAX_FILES = 10
15 MAX_MATCHES = 100
16
17 # Make sure we don't have too many threads at once,
18 # i.e. too many files open at once.
19 counter = threading.Semaphore(MAX_FILES)
20
21 def search_file(filename, matches):
22 lines = iter_lines(filename)
23 for match in search_lines(
24 lines, regex, opts, filename):
25 matches.put(match) # blocking
26 matches.put(None) # blocking
27 # Let a new thread start.
28 counter.release()
29
30 for filename in filenames:
31 # Prepare for the file.
32 matches = queue.Queue(MAX_MATCHES)
33 matches_by_file.put(matches)
34
35 # Start a thread to process the file.
36 t = threading.Thread(target=search_file,
37 args=(filename, matches))
38 counter.acquire()
39 t.start()
40 matches_by_file.put(None)
41
42 background = threading.Thread(target=do_background)
43 background.start()
44
45 # Yield the results as they are received, in order.
46 matches = matches_by_file.get() # blocking
47 while matches is not None:
48 match = matches.get() # blocking
49 while match is not None:
50 yield match
51 match = matches.get() # blocking
52 matches = matches_by_file.get() # blocking
53
54 background.join()
55
56
57def iter_lines(filename):
58 if filename == '-':
59 yield from sys.stdin
60 else:
61 with open(filename) as infile:
62 yield from infile
63
64
65def search_lines(lines, regex, opts, filename):
66 try:
67 if opts.filesonly:
68 if opts.invert:
69 for line in lines:
70 m = regex.search(line)
71 if m:
72 break
73 else:
74 yield (filename, None)
75 else:
76 for line in lines:
77 m = regex.search(line)
78 if m:
79 yield (filename, None)
80 break
81 else:
82 assert not opts.invert, opts
83 for line in lines:
84 m = regex.search(line)
85 if not m:
86 continue
87 if line.endswith(os.linesep):
88 line = line[:-len(os.linesep)]
89 yield (filename, line)
90 except UnicodeDecodeError:
91 # It must be a binary file.
92 return
93
94
95def resolve_filenames(filenames, recursive=False):
96 for filename in filenames:
97 assert isinstance(filename, str), repr(filename)
98 if filename == '-':
99 yield '-'
100 elif not os.path.isdir(filename):
101 yield filename
102 elif recursive:
103 for d, _, files in os.walk(filename):
104 for base in files:
105 yield os.path.join(d, base)
106
107
108if __name__ == '__main__':
109 # Parse the args.
110 import argparse
111 ap = argparse.ArgumentParser(prog='grep')
112
113 ap.add_argument('-r', '--recursive',
114 action='store_true')
115 ap.add_argument('-L', '--files-without-match',
116 dest='filesonly',
117 action='store_const', const='invert')
118 ap.add_argument('-l', '--files-with-matches',
119 dest='filesonly',
120 action='store_const', const='match')
121 ap.add_argument('-q', '--quiet', action='store_true')
122 ap.set_defaults(invert=False)
123
124 reopts = ap.add_mutually_exclusive_group(required=True)
125 reopts.add_argument('-e', '--regexp', dest='regex',
126 metavar='REGEX')
127 reopts.add_argument('regex', nargs='?',
128 metavar='REGEX')
129
130 ap.add_argument('files', nargs='+', metavar='FILE')
131
132 opts = ap.parse_args()
133 ns = vars(opts)
134
135 regex = ns.pop('regex')
136 filenames = ns.pop('files')
137 recursive = ns.pop('recursive')
138 if opts.filesonly:
139 if opts.filesonly == 'invert':
140 opts.invert = True
141 else:
142 assert opts.filesonly == 'match', opts
143 opts.invert = False
144 opts.filesonly = bool(opts.filesonly)
145
146 def main(regex=regex, filenames=filenames):
147 # step 1
148 regex = re.compile(regex)
149 # step 2
150 filenames = resolve_filenames(filenames, recursive)
151 # step 3
152 matches = search(filenames, regex, opts)
153 matches = iter(matches)
154
155 # step 4
156
157 # Handle the first match.
158 for filename, line in matches:
159 if opts.quiet:
160 return 0
161 elif opts.filesonly:
162 print(filename)
163 else:
164 for second in matches:
165 print(f'{filename}: {line}')
166 filename, line = second
167 print(f'{filename}: {line}')
168 break
169 else:
170 print(line)
171 break
172 else:
173 return 1
174
175 # Handle the remaining matches.
176 if opts.filesonly:
177 for filename, _ in matches:
178 print(filename)
179 else:
180 for filename, line in matches:
181 print(f'{filename}: {line}')
182
183 return 0
184 rc = main()
185
186 # step 5
187 sys.exit(rc)
|
(expand) 1import os
2import os.path
3import re
4import sys
5
6import test.support.interpreters as interpreters
7import test.support.interpreters.queues as interp_queues
8import types
9import queue
10import threading
11
12
13def search(filenames, regex, opts):
14 matches_by_file = queue.Queue()
15
16 def do_background():
17 MAX_FILES = 10
18 MAX_MATCHES = 100
19 new_queue = interpreters.queues.create
20
21 def new_interpreter():
22 interp = interpreters.create()
23 interp.exec(f"""if True:
24 with open({__file__!r}) as infile:
25 text = infile.read()
26 ns = dict()
27 exec(text, ns, ns)
28 prep_interpreter = ns['prep_interpreter']
29 del ns, text
30
31 search_file = prep_interpreter(
32 {regex.pattern!r},
33 {regex.flags},
34 {tuple(vars(opts).items())},
35 )
36 """)
37 return interp
38
39 ready_workers = queue.Queue(MAX_FILES)
40 workers = []
41
42 def next_worker():
43 if len(workers) < MAX_FILES:
44 interp = new_interpreter()
45 workers.append(interp)
46 ready_workers.put(interp)
47 return ready_workers.get() # blocking
48
49 def do_work(filename, matches, interp):
50 interp.prepare_main(matches=matches)
51 interp.exec(
52 f'search_file({filename!r}, matches)')
53 # Let a new thread start.
54 ready_workers.put(interp)
55
56 for filename in filenames:
57 # Prepare for the file.
58 matches = interp_queues.create(MAX_MATCHES)
59 matches_by_file.put(matches)
60 interp = next_worker()
61
62 # Start a thread to process the file.
63 t = threading.Thread(
64 target=do_work,
65 args=(filename, matches, interp),
66 )
67 t.start()
68 matches_by_file.put(None)
69
70 background = threading.Thread(target=do_background)
71 background.start()
72
73 # Yield the results as they are received, in order.
74 matches = matches_by_file.get() # blocking
75 while matches is not None:
76 match = matches.get() # blocking
77 while match is not None:
78 yield match
79 match = matches.get() # blocking
80 matches = matches_by_file.get() # blocking
81
82 background.join()
83
84
85def prep_interpreter(regex_pat, regex_flags, opts):
86 regex = re.compile(regex_pat, regex_flags)
87 opts = types.SimpleNamespace(**dict(opts))
88
89 def search_file(filename, matches):
90 lines = iter_lines(filename)
91 for match in search_lines(
92 lines, regex, opts, filename):
93 matches.put(match) # blocking
94 matches.put(None) # blocking
95 return search_file
96
97
98def iter_lines(filename):
99 if filename == '-':
100 yield from sys.stdin
101 else:
102 with open(filename) as infile:
103 yield from infile
104
105
106def search_lines(lines, regex, opts, filename):
107 try:
108 if opts.filesonly:
109 if opts.invert:
110 for line in lines:
111 m = regex.search(line)
112 if m:
113 break
114 else:
115 yield (filename, None)
116 else:
117 for line in lines:
118 m = regex.search(line)
119 if m:
120 yield (filename, None)
121 break
122 else:
123 assert not opts.invert, opts
124 for line in lines:
125 m = regex.search(line)
126 if not m:
127 continue
128 if line.endswith(os.linesep):
129 line = line[:-len(os.linesep)]
130 yield (filename, line)
131 except UnicodeDecodeError:
132 # It must be a binary file.
133 return
134
135
136def resolve_filenames(filenames, recursive=False):
137 for filename in filenames:
138 assert isinstance(filename, str), repr(filename)
139 if filename == '-':
140 yield '-'
141 elif not os.path.isdir(filename):
142 yield filename
143 elif recursive:
144 for d, _, files in os.walk(filename):
145 for base in files:
146 yield os.path.join(d, base)
147
148
149if __name__ == '__main__':
150 # Parse the args.
151 import argparse
152 ap = argparse.ArgumentParser(prog='grep')
153
154 ap.add_argument('-r', '--recursive',
155 action='store_true')
156 ap.add_argument('-L', '--files-without-match',
157 dest='filesonly',
158 action='store_const', const='invert')
159 ap.add_argument('-l', '--files-with-matches',
160 dest='filesonly',
161 action='store_const', const='match')
162 ap.add_argument('-q', '--quiet', action='store_true')
163 ap.set_defaults(invert=False)
164
165 reopts = ap.add_mutually_exclusive_group(required=True)
166 reopts.add_argument('-e', '--regexp', dest='regex',
167 metavar='REGEX')
168 reopts.add_argument('regex', nargs='?',
169 metavar='REGEX')
170
171 ap.add_argument('files', nargs='+', metavar='FILE')
172
173 opts = ap.parse_args()
174 ns = vars(opts)
175
176 regex = ns.pop('regex')
177 filenames = ns.pop('files')
178 recursive = ns.pop('recursive')
179 if opts.filesonly:
180 if opts.filesonly == 'invert':
181 opts.invert = True
182 else:
183 assert opts.filesonly == 'match', opts
184 opts.invert = False
185 opts.filesonly = bool(opts.filesonly)
186
187 def main(regex=regex, filenames=filenames):
188 # step 1
189 regex = re.compile(regex)
190 # step 2
191 filenames = resolve_filenames(filenames, recursive)
192 # step 3
193 matches = search(filenames, regex, opts)
194 matches = iter(matches)
195
196 # step 4
197
198 # Handle the first match.
199 for filename, line in matches:
200 if opts.quiet:
201 return 0
202 elif opts.filesonly:
203 print(filename)
204 else:
205 for second in matches:
206 print(f'{filename}: {line}')
207 filename, line = second
208 print(f'{filename}: {line}')
209 break
210 else:
211 print(line)
212 break
213 else:
214 return 1
215
216 # Handle the remaining matches.
217 if opts.filesonly:
218 for filename, _ in matches:
219 print(filename)
220 else:
221 for filename, line in matches:
222 print(f'{filename}: {line}')
223
224 return 0
225 rc = main()
226
227 # step 5
228 sys.exit(rc)
|
(expand) 1import os
2import os.path
3import re
4import sys
5
6import asyncio
7
8
9async def search(filenames, regex, opts):
10 matches_by_file = asyncio.Queue()
11
12 async def do_background():
13 MAX_FILES = 10
14 MAX_MATCHES = 100
15
16 # Make sure we don't have too many coros at once,
17 # i.e. too many files open at once.
18 counter = asyncio.Semaphore(MAX_FILES)
19
20 async def search_file(filename, matches):
21 # aiter_lines() opens the file too.
22 lines = iter_lines(filename)
23 async for m in search_lines(
24 lines, regex, opts, filename):
25 await matches.put(match)
26 await matches.put(None)
27 # Let a new coroutine start.
28 counter.release()
29
30 async with asyncio.TaskGroup() as tg:
31 for filename in filenames:
32 # Prepare for the file.
33 matches = asyncio.Queue(MAX_MATCHES)
34 await matches_by_file.put(matches)
35
36 # Start a coroutine to process the file.
37 tg.create_task(
38 search_file(filename, matches),
39 )
40 await counter.acquire()
41 await matches_by_file.put(None)
42
43 background = asyncio.create_task(do_background())
44
45 # Yield the results as they are received, in order.
46 matches = await matches_by_file.get() # blocking
47 while matches is not None:
48 match = await matches.get() # blocking
49 while match is not None:
50 yield match
51 match = await matches.get() # blocking
52 matches = await matches_by_file.get() # blocking
53
54 await asyncio.wait([background])
55
56
57async def iter_lines(filename):
58 if filename == '-':
59 infile = sys.stdin
60 line = await read_line_async(infile)
61 while line:
62 yield line
63 line = await read_line_async(infile)
64 else:
65 # XXX Open using async?
66 with open(filename) as infile:
67 line = await read_line_async(infile)
68 while line:
69 yield line
70 line = await read_line_async(infile)
71
72
73async def read_line_async(infile):
74 # XXX Do this async!
75 # maybe make use of asyncio.to_thread()
76 # or loop.run_in_executor()?
77 return infile.readline()
78
79
80async def search_lines(lines, regex, opts, filename):
81 try:
82 if opts.filesonly:
83 if opts.invert:
84 async for line in lines:
85 m = regex.search(line)
86 if m:
87 break
88 else:
89 yield (filename, None)
90 else:
91 async for line in lines:
92 m = regex.search(line)
93 if m:
94 yield (filename, None)
95 break
96 else:
97 assert not opts.invert, opts
98 async for line in lines:
99 m = regex.search(line)
100 if not m:
101 continue
102 if line.endswith(os.linesep):
103 line = line[:-len(os.linesep)]
104 yield (filename, line)
105 except UnicodeDecodeError:
106 # It must be a binary file.
107 return
108
109
110def resolve_filenames(filenames, recursive=False):
111 for filename in filenames:
112 assert isinstance(filename, str), repr(filename)
113 if filename == '-':
114 yield '-'
115 elif not os.path.isdir(filename):
116 yield filename
117 elif recursive:
118 for d, _, files in os.walk(filename):
119 for base in files:
120 yield os.path.join(d, base)
121
122
123if __name__ == '__main__':
124 # Parse the args.
125 import argparse
126 ap = argparse.ArgumentParser(prog='grep')
127
128 ap.add_argument('-r', '--recursive',
129 action='store_true')
130 ap.add_argument('-L', '--files-without-match',
131 dest='filesonly',
132 action='store_const', const='invert')
133 ap.add_argument('-l', '--files-with-matches',
134 dest='filesonly',
135 action='store_const', const='match')
136 ap.add_argument('-q', '--quiet', action='store_true')
137 ap.set_defaults(invert=False)
138
139 reopts = ap.add_mutually_exclusive_group(required=True)
140 reopts.add_argument('-e', '--regexp', dest='regex',
141 metavar='REGEX')
142 reopts.add_argument('regex', nargs='?',
143 metavar='REGEX')
144
145 ap.add_argument('files', nargs='+', metavar='FILE')
146
147 opts = ap.parse_args()
148 ns = vars(opts)
149
150 regex = ns.pop('regex')
151 filenames = ns.pop('files')
152 recursive = ns.pop('recursive')
153 if opts.filesonly:
154 if opts.filesonly == 'invert':
155 opts.invert = True
156 else:
157 assert opts.filesonly == 'match', opts
158 opts.invert = False
159 opts.filesonly = bool(opts.filesonly)
160
161 async def main(regex=regex, filenames=filenames):
162 # step 1
163 regex = re.compile(regex)
164 # step 2
165 filenames = resolve_filenames(filenames, recursive)
166 # step 3
167 matches = search(filenames, regex, opts)
168 matches = type(matches).__aiter__(matches)
169
170 # step 4
171
172 # Handle the first match.
173 async for filename, line in matches:
174 if opts.quiet:
175 return 0
176 elif opts.filesonly:
177 print(filename)
178 else:
179 async for second in matches:
180 print(f'{filename}: {line}')
181 filename, line = second
182 print(f'{filename}: {line}')
183 break
184 else:
185 print(line)
186 break
187 else:
188 return 1
189
190 # Handle the remaining matches.
191 if opts.filesonly:
192 async for filename, _ in matches:
193 print(filename)
194 else:
195 async for filename, line in matches:
196 print(f'{filename}: {line}')
197
198 return 0
199 rc = asyncio.run(main())
200
201 # step 5
202 sys.exit(rc)
|
(expand) 1import os
2import os.path
3import re
4import sys
5
6import multiprocessing
7import queue
8import threading
9
10
11def search(filenames, regex, opts):
12 matches_by_file = queue.Queue()
13
14 def do_background():
15 MAX_FILES = 10
16 MAX_MATCHES = 100
17
18 # Make sure we don't have too many procs at once,
19 # i.e. too many files open at once.
20 counter = threading.Semaphore(MAX_FILES)
21 finished = multiprocessing.Queue()
22 active = {}
23 done = False
24
25 def monitor_tasks():
26 while not done:
27 try:
28 index = finished.get(timeout=0.1)
29 except queue.Empty:
30 continue
31 proc = active.pop(index)
32 proc.join(0.1)
33 if proc.is_alive():
34 # It's taking too long to terminate.
35 # We can wait for it at the end.
36 active[index] = proc
37 # Let a new process start.
38 counter.release()
39 monitor = threading.Thread(target=monitor_tasks)
40 monitor.start()
41
42 for index, filename in enumerate(filenames):
43 # Prepare for the file.
44 matches = multiprocessing.Queue(MAX_MATCHES)
45 matches_by_file.put(matches)
46
47 # Start a subprocess to process the file.
48 proc = multiprocessing.Process(
49 target=search_file,
50 args=(filename, matches, regex, opts,
51 index, finished),
52 )
53 counter.acquire(blocking=True)
54 active[index] = proc
55 proc.start()
56 matches_by_file.put(None)
57 # Wait for all remaining tasks to finish.
58 done = True
59 monitor.join()
60 for proc in active.values():
61 proc.join()
62
63 background = threading.Thread(target=do_background)
64 background.start()
65
66 # Yield the results as they are received, in order.
67 matches = matches_by_file.get() # blocking
68 while matches is not None:
69 match = matches.get() # blocking
70 while match is not None:
71 yield match
72 match = matches.get() # blocking
73 matches = matches_by_file.get() # blocking
74
75 background.join()
76
77
78def search_file(filename, matches, regex, opts,
79 index, finished):
80 lines = iter_lines(filename)
81 for match in search_lines(lines, regex, opts, filename):
82 matches.put(match) # blocking
83 matches.put(None) # blocking
84 # Let a new process start.
85 finished.put(index)
86
87
88def iter_lines(filename):
89 if filename == '-':
90 yield from sys.stdin
91 else:
92 with open(filename) as infile:
93 yield from infile
94
95
96def search_lines(lines, regex, opts, filename):
97 try:
98 if opts.filesonly:
99 if opts.invert:
100 for line in lines:
101 m = regex.search(line)
102 if m:
103 break
104 else:
105 yield (filename, None)
106 else:
107 for line in lines:
108 m = regex.search(line)
109 if m:
110 yield (filename, None)
111 break
112 else:
113 assert not opts.invert, opts
114 for line in lines:
115 m = regex.search(line)
116 if not m:
117 continue
118 if line.endswith(os.linesep):
119 line = line[:-len(os.linesep)]
120 yield (filename, line)
121 except UnicodeDecodeError:
122 # It must be a binary file.
123 return
124
125
126def resolve_filenames(filenames, recursive=False):
127 for filename in filenames:
128 assert isinstance(filename, str), repr(filename)
129 if filename == '-':
130 yield '-'
131 elif not os.path.isdir(filename):
132 yield filename
133 elif recursive:
134 for d, _, files in os.walk(filename):
135 for base in files:
136 yield os.path.join(d, base)
137
138
139if __name__ == '__main__':
140 multiprocessing.set_start_method('spawn')
141
142 # Parse the args.
143 import argparse
144 ap = argparse.ArgumentParser(prog='grep')
145
146 ap.add_argument('-r', '--recursive',
147 action='store_true')
148 ap.add_argument('-L', '--files-without-match',
149 dest='filesonly',
150 action='store_const', const='invert')
151 ap.add_argument('-l', '--files-with-matches',
152 dest='filesonly',
153 action='store_const', const='match')
154 ap.add_argument('-q', '--quiet', action='store_true')
155 ap.set_defaults(invert=False)
156
157 reopts = ap.add_mutually_exclusive_group(required=True)
158 reopts.add_argument('-e', '--regexp', dest='regex',
159 metavar='REGEX')
160 reopts.add_argument('regex', nargs='?',
161 metavar='REGEX')
162
163 ap.add_argument('files', nargs='+', metavar='FILE')
164
165 opts = ap.parse_args()
166 ns = vars(opts)
167
168 regex = ns.pop('regex')
169 filenames = ns.pop('files')
170 recursive = ns.pop('recursive')
171 if opts.filesonly:
172 if opts.filesonly == 'invert':
173 opts.invert = True
174 else:
175 assert opts.filesonly == 'match', opts
176 opts.invert = False
177 opts.filesonly = bool(opts.filesonly)
178
179 def main(regex=regex, filenames=filenames):
180 # step 1
181 regex = re.compile(regex)
182 # step 2
183 filenames = resolve_filenames(filenames, recursive)
184 # step 3
185 matches = search(filenames, regex, opts)
186 matches = iter(matches)
187
188 # step 4
189
190 # Handle the first match.
191 for filename, line in matches:
192 if opts.quiet:
193 return 0
194 elif opts.filesonly:
195 print(filename)
196 else:
197 for second in matches:
198 print(f'{filename}: {line}')
199 filename, line = second
200 print(f'{filename}: {line}')
201 break
202 else:
203 print(line)
204 break
205 else:
206 return 1
207
208 # Handle the remaining matches.
209 if opts.filesonly:
210 for filename, _ in matches:
211 print(filename)
212 else:
213 for filename, line in matches:
214 print(f'{filename}: {line}')
215
216 return 0
217 rc = main()
218
219 # step 5
220 sys.exit(rc)
|
(expand) 1import os
2import os.path
3import re
4import sys
5
6from concurrent.futures import ThreadPoolExector
7import queue
8import threading
9
10
11def search(filenames, regex, opts):
12 matches_by_file = queue.Queue()
13
14 def do_background():
15 MAX_FILES = 10
16 MAX_MATCHES = 100
17
18 def search_file(filename, matches):
19 lines = iter_lines(filename)
20 for match in search_lines(
21 lines, regex, opts, filename):
22 matches.put(match) # blocking
23 matches.put(None) # blocking
24
25 with ThreadPoolExecutor(MAX_FILES) as workers:
26 for filename in filenames:
27 # Prepare for the file.
28 matches = queue.Queue(MAX_MATCHES)
29 matches_by_file.put(matches)
30
31 # Start a thread to process the file.
32 workers.submit(
33 search_file, filename, matches)
34 matches_by_file.put(None)
35
36 background = threading.Thread(target=do_background)
37 background.start()
38
39 # Yield the results as they are received, in order.
40 matches = matches_by_file.get() # blocking
41 while matches is not None:
42 match = matches.get() # blocking
43 while match is not None:
44 yield match
45 match = matches.get() # blocking
46 matches = matches_by_file.get() # blocking
47
48 background.join()
49
50
51def iter_lines(filename):
52 if filename == '-':
53 yield from sys.stdin
54 else:
55 with open(filename) as infile:
56 yield from infile
57
58
59def search_lines(lines, regex, opts, filename):
60 try:
61 if opts.filesonly:
62 if opts.invert:
63 for line in lines:
64 m = regex.search(line)
65 if m:
66 break
67 else:
68 yield (filename, None)
69 else:
70 for line in lines:
71 m = regex.search(line)
72 if m:
73 yield (filename, None)
74 break
75 else:
76 assert not opts.invert, opts
77 for line in lines:
78 m = regex.search(line)
79 if not m:
80 continue
81 if line.endswith(os.linesep):
82 line = line[:-len(os.linesep)]
83 yield (filename, line)
84 except UnicodeDecodeError:
85 # It must be a binary file.
86 return
87
88
89def resolve_filenames(filenames, recursive=False):
90 for filename in filenames:
91 assert isinstance(filename, str), repr(filename)
92 if filename == '-':
93 yield '-'
94 elif not os.path.isdir(filename):
95 yield filename
96 elif recursive:
97 for d, _, files in os.walk(filename):
98 for base in files:
99 yield os.path.join(d, base)
100
101
102if __name__ == '__main__':
103 # Parse the args.
104 import argparse
105 ap = argparse.ArgumentParser(prog='grep')
106
107 ap.add_argument('-r', '--recursive',
108 action='store_true')
109 ap.add_argument('-L', '--files-without-match',
110 dest='filesonly',
111 action='store_const', const='invert')
112 ap.add_argument('-l', '--files-with-matches',
113 dest='filesonly',
114 action='store_const', const='match')
115 ap.add_argument('-q', '--quiet', action='store_true')
116 ap.set_defaults(invert=False)
117
118 reopts = ap.add_mutually_exclusive_group(required=True)
119 reopts.add_argument('-e', '--regexp', dest='regex',
120 metavar='REGEX')
121 reopts.add_argument('regex', nargs='?',
122 metavar='REGEX')
123
124 ap.add_argument('files', nargs='+', metavar='FILE')
125
126 opts = ap.parse_args()
127 ns = vars(opts)
128
129 regex = ns.pop('regex')
130 filenames = ns.pop('files')
131 recursive = ns.pop('recursive')
132 if opts.filesonly:
133 if opts.filesonly == 'invert':
134 opts.invert = True
135 else:
136 assert opts.filesonly == 'match', opts
137 opts.invert = False
138 opts.filesonly = bool(opts.filesonly)
139
140 def main(regex=regex, filenames=filenames):
141 # step 1
142 regex = re.compile(regex)
143 # step 2
144 filenames = resolve_filenames(filenames, recursive)
145 # step 3
146 matches = search(filenames, regex, opts)
147 matches = iter(matches)
148
149 # step 4
150
151 # Handle the first match.
152 for filename, line in matches:
153 if opts.quiet:
154 return 0
155 elif opts.filesonly:
156 print(filename)
157 else:
158 for second in matches:
159 print(f'{filename}: {line}')
160 filename, line = second
161 print(f'{filename}: {line}')
162 break
163 else:
164 print(line)
165 break
166 else:
167 return 1
168
169 # Handle the remaining matches.
170 if opts.filesonly:
171 for filename, _ in matches:
172 print(filename)
173 else:
174 for filename, line in matches:
175 print(f'{filename}: {line}')
176
177 return 0
178 rc = main()
179
180 # step 5
181 sys.exit(rc)
|
Model-specific details¶
Here are some implementation-specific details we had to deal with.
threads:
…
interpreters:
…
multiprocessing:
…
asyncio:
…
concurrent.futures¶
For threads, multiprocessing, and
multiple interpreters *,
you can also use concurrent.futures:
(expand)
1def search_using_threads_cf(filenames, regex, opts):
2 matches_by_file = queue.Queue()
3
4 def do_background():
5 MAX_FILES = 10
6 MAX_MATCHES = 100
7
8 def search_file(filename, matches):
9 lines = iter_lines(filename)
10 for match in search_lines(lines, regex, opts, filename):
11 matches.put(match) # blocking
12 matches.put(None) # blocking
13
14 with ThreadPoolExecutor(MAX_FILES) as workers:
15 for filename in filenames:
16 # Prepare for the file.
17 matches = queue.Queue(MAX_MATCHES)
18 matches_by_file.put(matches)
19
20 # Start a thread to process the file.
21 workers.submit(search_file, filename, matches)
22 matches_by_file.put(None)
23
24 background = threading.Thread(target=do_background)
25 background.start()
26
27 # Yield the results as they are received, in order.
28 matches = matches_by_file.get() # blocking
29 while matches is not None:
30 match = matches.get() # blocking
31 while match is not None:
32 yield match
33 match = matches.get() # blocking
34 matches = matches_by_file.get() # blocking
35
36 background.join()
For processes`, use concurrent.futures.ProcessPoolExecutor.
For interpreters, use InterpreterPoolExecutor.
In both cases you must use the proper queue type and there
are a few other minor differences.
Workload 2: …¶
TBD
Design and analysis¶
Design steps from above:
concurrency fits?
TBD
identify logical tasks
TBD
select concurrent tasks
TBD
concurrency-related characteristics
TBD
pick best model
TBD
Here are additional key constraints and considerations:
…
High-level code¶
# …
Side-by-side¶
Here’s the implementations for the different concurrency models, side-by-side for easy comparison:
sequential |
threads |
multiple interpreters |
coroutines |
multiple processes |
concurrent.futures |
|---|---|---|---|---|---|
(expand)1# sequential 3
2...
|
(expand)1import threading
2
3def task():
4 ...
5
6t = threading.Thread(target=task)
7t.start()
8
9...
|
(expand)1# subinterpreters 3
2...
|
(expand)1# async 3
2...
|
(expand)1import multiprocessing
2
3def task():
4 ...
5
6...
|
(expand)1# concurrent.futures 2
2...
|
Workload 3: …¶
TBD
Design and analysis¶
Design steps from above:
concurrency fits?
TBD
identify logical tasks
TBD
select concurrent tasks
TBD
concurrency-related characteristics
TBD
pick best model
TBD
Here are additional key constraints and considerations:
…
High-level code¶
# …
Side-by-side¶
Here’s the implementations for the different concurrency models, side-by-side for easy comparison:
sequential |
threads |
multiple interpreters |
coroutines |
multiple processes |
concurrent.futures |
|---|---|---|---|---|---|
(expand)1# sequential 3
2...
|
(expand)1import threading
2
3def task():
4 ...
5
6t = threading.Thread(target=task)
7t.start()
8
9...
|
(expand)1# subinterpreters 3
2...
|
(expand)1# async 3
2...
|
(expand)1import multiprocessing
2
3def task():
4 ...
5
6...
|
(expand)1# concurrent.futures 3
2...
|