Concurrency HOWTO

There are many outstanding resources, both online and in print, that would do an excellent job of introducing you to concurrency. This howto document builds on those by walking you through how to apply that knowledge using Python.

Python supports the following concurrency models directly:

  • free-threading (stdlib, C-API)

  • isolated threads, AKA CSP/actor model (stdlib*, C-API)

  • coroutines, AKA async/await (language, stdlib, C-API)

  • multi-processing (stdlib)

  • distributed, e.g. SMP (stdlib (limited))

In this document, we’ll look at how to take advantage of Python’s concurrency support. The overall focus is on the following:

Note

You should always make sure concurrency is the right tool for the job before you reach for it when solving your problem. There are many cases where concurrency simply isn’t applicable or will only complicate the solution. In-depth discussion of this point is outside the scope of this document.

Note

Free-threading is one of the oldest concurrency models, fundamental to operating systems, and widely supported in programming languages. However, it is generally considered perilous and not human-friendly. Other concurrency models have demonstrated better usability and newer programming languages typically avoid exposing threads directly. Take that into consideration before reaching for threads and look at the alternatives first.

Note

Python supports other concurrency models indirectly through community-maintained PyPI packages. One well-known example is dask, which supports “distributed” computing.

Quick reference

Terminology

We’ll be using the following terms and ideas throughout:

task (logical thread)
a cohesive linear sequence of abstract steps in a program;
effectively, a mini-program;
the logical equivalent of executed instructions corresponding to code;
also known as “logical process”
physical thread (OS thread)
where the actual code for a logical thread runs on the CPU (and operating system);
we avoid using using plain “thread” for this, to avoid ambiguity
Python thread
the Python runtime running in a physical thread
particularly the portion of the runtime state active in the physical thread
concurrency (multitasking)
a program with multiple logical threads running simultaneously
(not necessarily in parallel)
parallelism (multi-core)

running a program’s multiple logical threads on multiple physical threads (CPU cores)

For convenience, here is a summary of what we’ll cover later.

Concurrency Primitives

primitive

used with

purpose

High-level App Examples

workload (app)

per-request inputs

per-request outputs

N core tasks

core task

grep

N filenames (stdin)
file bytes x N (disk)

M matches (stdout)

1+ per file

time: ~ file size
mem: small

Each has side-by-side implementations for the different models:

workload (app)

side-by-side examples

grep

by concurrency models

by concurrency models

by concurrency models



Python Concurrency Models

As mentioned, there are essentially five concurrency models that Python supports directly:

model

Python stdlib

description

free threading

threading

using multiple physical threads in the same process,
with no isolation between them
isolated threads
(multiple interpreters)

interpreters

threads, often physical, with strict isolation between them
(e.g. CSP and actor model)

coroutines (async/await)

asyncio

switching between logical threads is explicitly controlled by each

multi-processing

multiprocessing

using multiple isolated processes

distributed

(limited)

multiprocessing across multiple computers

There are tradeoffs to each, whether in performance or complexity. We’ll take a look at those tradeoffs in detail later.

Before that, we’ll review various comparisons of the concurrency models, and we’ll briefly talk about critical caveats for specific models.

Comparison tables

The following tables provide a detailed look with side-by-side comparisons.

key characteristics

scale

multi-core

races

overhead

free-threading

small-medium

yes*

yes

very low

multiple interpreters

small-medium

yes

limited

low+

coroutines

small-medium

no

no

low

multi-processing

small

yes

limited

medium

distributed

large

yes

limited

medium

overhead details

memory

startup

cross-task

management

system

free threading

very low

very low

none

very low

none

multiple interpreters

low*

medium*

low

very low

none

coroutines

low

low

none

low

none

multi-processing

medium

medium

medium

medium

low

distributed

medium+

medium+

medium-high

medium

low-medium

complexity

parallel

shared
mem
shared
I/O
shared
env
cross
thread

sync

tracking

compat

extra
LOC

free-threading

yes*

all

all

yes

high

explicit

yes

low?

multiple interpreters

yes

limited

all

yes

low

implicit

???

yes

low?

coroutines

no

all

all

yes

low-med?

implicit

???

no

low-med

multi-processing

yes

limited

no

no?

low

implicit
+optional

???

yes

low-med?

distributed

yes

limited

no

no?

low

implicit
+optional

???

yes

medium?

exposure

academic
research
academic
curriculum

industry

examples

Python
history

free-threading

very high

high

high

high

0.9?

isolated threads
(multiple interpreters)

high

low?

low-medium?

low-medium?

2.2

coroutines

medium-high?

medium?

medium?

medium-high?

3.3-3.5 (2.2)

multi-processing

???

low?

low-medium?

low?

2.6

distributed

medium-high?

low?

medium?

medium?

n/a

Critical caveats

Here are some important details to consider, specific to individual concurrency models in Python.

Data races and non-deterministic scheduling (free-threading)

The principal caveat for physical threads is that each thread shares the full memory of the process with all its other threads. Combined with their non-deterministic scheduling (and parallel execution), threads expose programs to a significant risk of races.

The potential consequences of a race are data corruption and invalidated expectations of data consistency. In each case, the non-deterministic scheduling of threads means it is both hard to reproduce races and to track down where a race happened. These qualities make these bugs especially frustrating and worth diligently avoiding.

Python threads are light wrappers around physical threads and thus have the same caveats. The majority of data in a Python program is mutable and all of the program’s data is subject to potential modification by any thread at any moment. This requires extra effort, to synchronize around reads and writes. Furthermore, given the maximally-broad scope of the data involved, it’s difficult to be sure all possible races have been dealt with, especially as a code base changes over time.

The other concurrency models essentially don’t have this problem. In the case of coroutines, explicit cooperative scheduling eliminates the risk of a simultaneous read-write or write-write. It also means program logic can rely on memory consistency between synchronization points (await).

With the remaining concurrency models, data is never shared between logical threads unless done explicitly (typically at the existing inherent points of synchronization). By default that shared data is either read-only or managed in a thread-safe way. Most notably, the opt-in sharing means the set of shared data to manage is explicitly defined (and often small) instead of covering all memory in the process.

The Global Interpreter Lock (GIL)

While physical threads are the direct route to multi-core parallelism, Python’s threads have always had an extra wrinkle that gets in the way: the global interpreter lock (GIL).

The GIL is very efficient tool for keeping the Python implementation simple, which is an important constraint for the project. In fact, it protects Python’s maintainers and users from a large category of concurrency problems that one must normally face when threads are involved.

The big tradeoff is that the bytecode interpreter, which executes your Python code, only runs while holding the GIL. That means only one thread can be running Python code at a time. Threads will take short turns, so none have to wait too long, but it still prevents any actual parallelism of CPU-bound code.

That said, the Python runtime (and extension modules) can release the GIL when the thread is doing slow or long-running work unrelated to Python, like a blocking IO operation.

There is also an ongoing effort to eliminate the GIL: PEP 703. Any attempt to remove the GIL necessarily involves some slowdown to single-threaded performance and extra maintenance burden to the Python project and extension module maintainers. However, there is sufficient interest in unlocking full multi-core parallelism to justify the current experiment.

You can also move from free-threading to isolated threads using multiple interpreters. Each interpreter has has its own GIL. Thus, If you want multi-core parallelism, run a different interpreter in each thread. Their isolation means that each can run unblocked in that thread.

Thread isolation and multiple interpreters

As just noted, races effectively stop being a problem if the memory used by each physical thread is effectively isolated from the others. That isolation can also help with the other caveats related to physical threads. In Python you can get this isolation by using multiple interpreters.

In this context, an “interpreter” represents nearly all the capability and state of the Python runtime, for its C-API and to execute Python code. The full runtime supports multiple interpreters and includes some state that all interpreters share. Most importantly, the state of each interpreter is effectively isolated from the others.

That isolation includes things like sys.modules. By default, interpreters mostly don’t share any data (including objects) at all. Anything that gets shared is done on a strictly opt-in basis. That means programmers wouldn’t need to worry about possible races with any data in the program. They would only need to worry about data that was explicitly shared.

Interpreters themselves are not specific to any thread, but instead each physical thread has (at most) one interpreter active at any given moment. Each interpreter can be associated in this way with any number of threads. Since each interpreter is isolated from the others, any thread using one interpreter is thus isolated from threads using any other interpreter.

Using multiple interpreters is fairly straight-forward:

  1. create a new interpreter

  2. switch the current thread to use that interpreter

  3. call exec(), but targeting the new interpreter

  4. switch back

Note that no threads were involved; running in an interpreter happens relative to the current thread. New threads aren’t implicitly involved.

Multi-processing and distributed computing provide similar isolation, though with some tradeoffs.

A stdlib module for using multiple interpreters

While use of multiple interpreters has been part of Python’s C-API for decades, the feature hasn’t been exposed to Python code through the stdlib. PEP 734 proposes changing that by adding a new interpreters module.

In the meantime, an implementation of that PEP is available for Python 3.13+ on PyPI: interpreters-pep-734.

Improving performance for multiple interpreters

The long-running effort to improve on Python’s implementation of multiple interpreters focused on isolation and stability; very little done to improve performance. This has the most impact on:

  • how much memory each interpreter uses (i.e. how many can run at the same time)

  • how long it takes to create a new interpreter

It also impacts how efficiently data/objects can be passed between interpreters, and how effectively objects can be shared.

As the work on isolation wraps up, improvements will shift to focus on performance and memory usage. Thus, the overhead of using multiple interpreters will drastically decrease over time.

Shared resources

Aside from memory, all physical threads in a process share the following resources:

  • command line arguments (“argv”)

  • env vars

  • current working directory

  • signals, IPC, etc.

  • open I/O resources (file descriptors, sockets, etc.)

When relevant, these must be managed in a thread-safe way.

Tracing execution

TBD

Coroutines are contagious

Coroutines can be an effective mechanism for letting a program’s non-blocking code run while simultaneously waiting for blocking code to finish. The tricky part is that the underlying machinery (the event loop) relies on each coroutine explicitly yielding control at the appropriate moments.

Normal functions do not follow this pattern, so they cannot take advantage of that cooperative scheduling to avoid blocking the program. Thus, coroutines and non-coroutines don’t mix well. While there are tools for wrapping normal functions to act like coroutines, they are often converted into coroutines instead. At that point, if any non-async code relies on the function then either you’ll need to convert the other code a coroutine or you’ll need to keep the original non-async implementation around along with the new, almost identical async one.

You can see how that can proliferate, leading to possible extra maintenance/development costs.

Processes consume extra resources

When using multi-processing for concurrency, keep in mind that the operating system will assign a certain set of limited resources to each process. For example, each process has its own PID and handle to the executable. You can run only so many processes before you run out of these resources. Concurrency in a single process doesn’t have this problem, and a distributed program can work around it.

Using multiprocessing for distributed computing

Not only does the multiprocessing module support concurrency with multiple local processes, it can also support a distributed model using remote computers. That said, consider first looking into tools that have been designed specifically for distributed computing, like dask.

Resilience to crashes

A process can crash if it does something it shouldn’t, like try to access memory outside what the OS has provided it. If your program is running in multiple processes (incl. distributed) then you can more easily recover from a crash in any one process. Recovering from a crash when using free-threading, multiple interpreters, or coroutines isn’t nearly so easy.

High-level APIs

Also note that Python’s stdlib provides various higher-level APIs that support these concurrency models in various contexts:

concurrent.futures

socketserver

http.server

free-threading

yes

yes

yes

multiple interpreters

(pending)

coroutines

???

multi-processing

yes

distributed

???



Designing A Program For Concurrency

Whether you are starting a new project using concurrency or refactoring an existing one to use it, it’s important to design for concurrency before taking one more step. Doing so will save you a lot of headache later.

  1. decide if your program might benefit from concurrency

  2. break down your *logical* program into distinct tasks

  3. determine which tasks could run at the same time

  4. identify the other concurrency-related characteristics of your program

  5. decide which concurrency model fits best

  6. go for it!

At each step you should be continuously asking yourself if concurrency is still a good fit for your program.

Some problems are obviously not solvable with concurrency. Otherwise, even if you could use concurrency, it might not provide much value. Furthermore, even if it seems like it would provide meaningful value, the additional costs in performance, complexity, or maintainability might outweigh that benefit.

Thus, when you’re thinking of solving a problem using concurrency, it’s crucial that you understand the problem well.

Getting started

How can concurrency help?

TBD

How can concurrency hurt?

TBD

Analyze your problem

Identifying the logical tasks in your program

TBD

The concurrency characteristics of your program

TBD

Other considerations

TBD

Pick a concurrency model

TBD



Python Concurrency Primitives

TBD

Group A

primitive 1

TBD

Group B

primitive 1

TBD



Python Concurrency Workload Examples

Below, we have a series of examples of how to implement the most common Python workloads that take advantage of concurrency. For each workload, you will find an implementation for each of the concurrency models.

The implementations are meant to accurately demonstrate how best to solve the problem using the given concurrency model. The examples for the workload are presented side-by-side, for easier comparison. The examples for threads, multiprocessing, and multiple interpreters will use concurrent.futures when that is the better approach. Performance comparisons are not included here.

Here’s a summary of the examples, by workload:

workload

req in

req out

N core tasks

core task

grep

N filenames (stdin)
file bytes x N (disk)

M matches (stdout)

1+ per file

time: ~ file size
mem: small

Also see:

Note

Each example is implemented as a basic command line tool, but can be easily adapted to run as a web service.

Workload: grep

This is a basic Python implementation of the linux grep tool. We read from one or more files and report about lines that match (or don’t match) the given regular expression.

This represents a workload involving a mix of moderate IO and CPU work.

For full example code see the side-by-side implementations below.

Design and analysis

Design steps from above:

  1. concurrency fits?

    Yes! There is potentially a bunch of work happening at the same time, and we want results as fast as possible.

  2. identify logical tasks

    At a high level, the application works like this:

    1. handle args (including compile regex)

    2. if recursive, walk tree to find filenames

    3. for each file, yield each match

    4. print each match

    5. exit with 0 if matched and 1 otherwise

    At step 3 we do the following for each file:

    a. open the file b. iterate over the lines c. apply the regex to each line d. yield each match e. close the file

  3. select concurrent tasks

    Concurrent work happens at step 3. Sub-steps a, b, and e are IO-intensive. Sub-step c is CPU-intensive. The simplest approach would be one concurrent worker per file. Relative to a strictly sequential approach, there’s extra complexity here in managing the workers, fanning out the work to them, and merging the results back into a single iterator.

    If we were worried about any particularly large file or sufficiently large regular expression, we could take things further. That would involve splitting up step 3 even further by breaking the file into chunks that are divided up among multiple workers. However, doing so would introduce extra complexity that might not pay for itself.

  4. concurrency-related characteristics

    TBD

  5. pick best model

    TBD

Here are additional key constraints and considerations:

  • there’s usually a limit to how many files can be open concurrently, so we’ll have to be careful not to process too many at once

  • the order of the yielded/printed matches must match the order of the requested files and the order of each files lines

High-level code

With the initial design and analysis done, let’s move on to code. We’ll start with the high-level code corresponding to the application’s five top-level tasks we identified earlier.

Most of the high-level code has nothing to do with concurrency. The part that does, search(), is highlighted.

(expand)
 1def main(regex=regex, filenames=filenames):
 2    # step 1
 3    regex = re.compile(regex)
 4    # step 2
 5    filenames = resolve_filenames(filenames, recursive)
 6    # step 3
 7    matches = search(filenames, regex, opts)
 8
 9    # step 4
10
11    if hasattr(type(matches), '__aiter__'):
12        async def iter_and_show(matches=matches):
13            matches = type(matches).__aiter__(matches)
14
15            # Handle the first match.
16            async for filename, line in matches:
17                if opts.quiet:
18                    return 0
19                elif opts.filesonly:
20                    print(filename)
21                else:
22                    async for second in matches:
23                        print(f'{filename}: {line}')
24                        filename, line = second
25                        print(f'{filename}: {line}')
26                        break
27                    else:
28                        print(line)
29                break
30            else:
31                return 1
32
33            # Handle the remaining matches.
34            if opts.filesonly:
35                async for filename, _ in matches:
36                    print(filename)
37            else:
38                async for filename, line in matches:
39                    print(f'{filename}: {line}')
40
41            return 0
42        return asyncio.run(search_and_show())
43    else:
44        matches = iter(matches)
45
46        # Handle the first match.
47        for filename, line in matches:
48            if opts.quiet:
49                return 0
50            elif opts.filesonly:
51                print(filename)
52            else:
53                for second in matches:
54                    print(f'{filename}: {line}')
55                    filename, line = second
56                    print(f'{filename}: {line}')
57                    break
58                else:
59                    print(line)
60            break
61        else:
62            return 1
63
64        # Handle the remaining matches.
65        if opts.filesonly:
66            for filename, _ in matches:
67                print(filename)
68        else:
69            for filename, line in matches:
70                print(f'{filename}: {line}')
71
72        return 0
73rc = main()
74
75# step 5
76sys.exit(rc)

The search() function that gets called returns an iterator (or async iterator) that yields the matches, which get printed. Here’s the high-level code again, but with highlighting on each line that uses the iterator.

(expand)
 1def main(regex=regex, filenames=filenames):
 2    # step 1
 3    regex = re.compile(regex)
 4    # step 2
 5    filenames = resolve_filenames(filenames, recursive)
 6    # step 3
 7    matches = search(filenames, regex, opts)
 8
 9    # step 4
10
11    if hasattr(type(matches), '__aiter__'):
12        async def iter_and_show(matches=matches):
13            matches = type(matches).__aiter__(matches)
14
15            # Handle the first match.
16            async for filename, line in matches:
17                if opts.quiet:
18                    return 0
19                elif opts.filesonly:
20                    print(filename)
21                else:
22                    async for second in matches:
23                        print(f'{filename}: {line}')
24                        filename, line = second
25                        print(f'{filename}: {line}')
26                        break
27                    else:
28                        print(line)
29                break
30            else:
31                return 1
32
33            # Handle the remaining matches.
34            if opts.filesonly:
35                async for filename, _ in matches:
36                    print(filename)
37            else:
38                async for filename, line in matches:
39                    print(f'{filename}: {line}')
40
41            return 0
42        return asyncio.run(search_and_show())
43    else:
44        matches = iter(matches)
45
46        # Handle the first match.
47        for filename, line in matches:
48            if opts.quiet:
49                return 0
50            elif opts.filesonly:
51                print(filename)
52            else:
53                for second in matches:
54                    print(f'{filename}: {line}')
55                    filename, line = second
56                    print(f'{filename}: {line}')
57                    break
58                else:
59                    print(line)
60            break
61        else:
62            return 1
63
64        # Handle the remaining matches.
65        if opts.filesonly:
66            for filename, _ in matches:
67                print(filename)
68        else:
69            for filename, line in matches:
70                print(f'{filename}: {line}')
71
72        return 0
73rc = main()
74
75# step 5
76sys.exit(rc)

Here’s the search function for a non-concurrent implementation:

1def search_sequential(filenames, regex, opts):
2    for filename in filenames:
3        lines = iter_lines(filename)
4        yield from search_lines(lines, regex, opts, filename)

iter_lines() is a straight-forward helper that opens the file and yields each line.

search_lines() is a sequential-search helper used by all the example implementations here:

 1def search_lines(lines, regex, opts, filename):
 2    try:
 3        if opts.filesonly:
 4            if opts.invert:
 5                for line in lines:
 6                    m = regex.search(line)
 7                    if m:
 8                        break
 9                else:
10                    yield (filename, None)
11            else:
12                for line in lines:
13                    m = regex.search(line)
14                    if m:
15                        yield (filename, None)
16                        break
17        else:
18            assert not opts.invert, opts
19            for line in lines:
20                m = regex.search(line)
21                if not m:
22                    continue
23                if line.endswith(os.linesep):
24                    line = line[:-len(os.linesep)]
25                yield (filename, line)
26    except UnicodeDecodeError:
27        # It must be a binary file.
28        return

Concurrent Code

Now lets look at how concurrency actually fits in. We’ll start with an example using threads. However, the pattern is essentially the same for all the concurrency models.

 1def search_using_threads(filenames, regex, opts):
 2    matches_by_file = queue.Queue()
 3
 4    def do_background():
 5        MAX_FILES = 10
 6        MAX_MATCHES = 100
 7
 8        # Make sure we don't have too many threads at once,
 9        # i.e. too many files open at once.
10        counter = threading.Semaphore(MAX_FILES)
11
12        def search_file(filename, matches):
13            lines = iter_lines(filename)
14            for match in search_lines(lines, regex, opts, filename):
15                matches.put(match)  # blocking
16            matches.put(None)  # blocking
17            # Let a new thread start.
18            counter.release()
19
20        for filename in filenames:
21            # Prepare for the file.
22            matches = queue.Queue(MAX_MATCHES)
23            matches_by_file.put(matches)
24
25            # Start a thread to process the file.
26            t = threading.Thread(target=search_file, args=(filename, matches))
27            counter.acquire()
28            t.start()
29        matches_by_file.put(None)
30
31    background = threading.Thread(target=do_background)
32    background.start()
33
34    # Yield the results as they are received, in order.
35    matches = matches_by_file.get()  # blocking
36    while matches is not None:
37        match = matches.get()  # blocking
38        while match is not None:
39            yield match
40            match = matches.get()  # blocking
41        matches = matches_by_file.get()  # blocking
42
43    background.join()

We loop over the filenames and start a thread for each one. Each one sends the matches it finds back using a queue.

We want to start yielding matches as soon as possible, so we also use a background thread to run the code that loops over the filenames.

We use a queue of queues (matches_by_file) to make sure we get results back in the right order, regardless of when the worker threads provide them.

The operating system will only let us have so many files open at once, so we limit how many workers are running. (MAX_FILES)

If the workers find matches substantially faster than we can use them then we may end up using more memory than we need to. To avoid any backlog, we limit how many matches can be queued up for any given file. (MAX_MATCHES)

One notable point is that the actual files are not opened until we need to iterate over the lines. For the most part, this is so we can avoid dealing with passing an open file to a concurrency worker. Instead we pass the filename, which is much simpler.

Finally, we have to manage the workers manually. If we used concurrent.futures, it would take care of that for us.

Here are some things we don’t do but might be worth doing:

  • stop iteration when requested (or for ctrl-C)

  • split up each file between multiple workers

Recall that the search() function returns an iterator that yields all the matches. Concurrency may be happening as long as that iterator hasn’t been exhausted. That means it is happening more or less the entire time we loop over the matches to print them in main() (in the high-level code above).

Side-by-side

Here are the implementations for the different concurrency models, side-by-side for easy comparison (main differences highlighted):

sequential

threads

multiple interpreters

coroutines

multiple processes

concurrent.futures

(expand)
  1import os
  2import os.path
  3import re
  4import sys
  5
  6
  7def search(filenames, regex, opts):
  8    for filename in filenames:
  9        # iter_lines() opens the file too.
 10        lines = iter_lines(filename)
 11        yield from search_lines(
 12                            lines, regex, opts, filename)
 13
 14
 15def iter_lines(filename):
 16    if filename == '-':
 17        yield from sys.stdin
 18    else:
 19        with open(filename) as infile:
 20            yield from infile
 21
 22
 23def search_lines(lines, regex, opts, filename):
 24    try:
 25        if opts.filesonly:
 26            if opts.invert:
 27                for line in lines:
 28                    m = regex.search(line)
 29                    if m:
 30                        break
 31                else:
 32                    yield (filename, None)
 33            else:
 34                for line in lines:
 35                    m = regex.search(line)
 36                    if m:
 37                        yield (filename, None)
 38                        break
 39        else:
 40            assert not opts.invert, opts
 41            for line in lines:
 42                m = regex.search(line)
 43                if not m:
 44                    continue
 45                if line.endswith(os.linesep):
 46                    line = line[:-len(os.linesep)]
 47                yield (filename, line)
 48    except UnicodeDecodeError:
 49        # It must be a binary file.
 50        return
 51
 52
 53def resolve_filenames(filenames, recursive=False):
 54    for filename in filenames:
 55        assert isinstance(filename, str), repr(filename)
 56        if filename == '-':
 57            yield '-'
 58        elif not os.path.isdir(filename):
 59            yield filename
 60        elif recursive:
 61            for d, _, files in os.walk(filename):
 62                for base in files:
 63                    yield os.path.join(d, base)
 64
 65
 66if __name__ == '__main__':
 67    # Parse the args.
 68    import argparse
 69    ap = argparse.ArgumentParser(prog='grep')
 70
 71    ap.add_argument('-r', '--recursive',
 72                    action='store_true')
 73    ap.add_argument('-L', '--files-without-match',
 74                    dest='filesonly',
 75                    action='store_const', const='invert')
 76    ap.add_argument('-l', '--files-with-matches',
 77                    dest='filesonly',
 78                    action='store_const', const='match')
 79    ap.add_argument('-q', '--quiet', action='store_true')
 80    ap.set_defaults(invert=False)
 81
 82    reopts = ap.add_mutually_exclusive_group(required=True)
 83    reopts.add_argument('-e', '--regexp', dest='regex',
 84                        metavar='REGEX')
 85    reopts.add_argument('regex', nargs='?',
 86                        metavar='REGEX')
 87
 88    ap.add_argument('files', nargs='+', metavar='FILE')
 89
 90    opts = ap.parse_args()
 91    ns = vars(opts)
 92
 93    regex = ns.pop('regex')
 94    filenames = ns.pop('files')
 95    recursive = ns.pop('recursive')
 96    if opts.filesonly:
 97        if opts.filesonly == 'invert':
 98            opts.invert = True
 99        else:
100            assert opts.filesonly == 'match', opts
101            opts.invert = False
102    opts.filesonly = bool(opts.filesonly)
103
104    def main(regex=regex, filenames=filenames):
105        # step 1
106        regex = re.compile(regex)
107        # step 2
108        filenames = resolve_filenames(filenames, recursive)
109        # step 3
110        matches = search(filenames, regex, opts)
111        matches = iter(matches)
112
113        # step 4
114
115        # Handle the first match.
116        for filename, line in matches:
117            if opts.quiet:
118                return 0
119            elif opts.filesonly:
120                print(filename)
121            else:
122                for second in matches:
123                    print(f'{filename}: {line}')
124                    filename, line = second
125                    print(f'{filename}: {line}')
126                    break
127                else:
128                    print(line)
129            break
130        else:
131            return 1
132
133        # Handle the remaining matches.
134        if opts.filesonly:
135            for filename, _ in matches:
136                print(filename)
137        else:
138            for filename, line in matches:
139                print(f'{filename}: {line}')
140
141        return 0
142    rc = main()
143
144    # step 5
145    sys.exit(rc)
(expand)
  1import os
  2import os.path
  3import re
  4import sys
  5
  6import queue
  7import threading
  8
  9
 10def search(filenames, regex, opts):
 11    matches_by_file = queue.Queue()
 12
 13    def do_background():
 14        MAX_FILES = 10
 15        MAX_MATCHES = 100
 16
 17        # Make sure we don't have too many threads at once,
 18        # i.e. too many files open at once.
 19        counter = threading.Semaphore(MAX_FILES)
 20
 21        def search_file(filename, matches):
 22            lines = iter_lines(filename)
 23            for match in search_lines(
 24                            lines, regex, opts, filename):
 25                matches.put(match)  # blocking
 26            matches.put(None)  # blocking
 27            # Let a new thread start.
 28            counter.release()
 29
 30        for filename in filenames:
 31            # Prepare for the file.
 32            matches = queue.Queue(MAX_MATCHES)
 33            matches_by_file.put(matches)
 34
 35            # Start a thread to process the file.
 36            t = threading.Thread(target=search_file,
 37                                 args=(filename, matches))
 38            counter.acquire()
 39            t.start()
 40        matches_by_file.put(None)
 41
 42    background = threading.Thread(target=do_background)
 43    background.start()
 44
 45    # Yield the results as they are received, in order.
 46    matches = matches_by_file.get()  # blocking
 47    while matches is not None:
 48        match = matches.get()  # blocking
 49        while match is not None:
 50            yield match
 51            match = matches.get()  # blocking
 52        matches = matches_by_file.get()  # blocking
 53
 54    background.join()
 55
 56
 57def iter_lines(filename):
 58    if filename == '-':
 59        yield from sys.stdin
 60    else:
 61        with open(filename) as infile:
 62            yield from infile
 63
 64
 65def search_lines(lines, regex, opts, filename):
 66    try:
 67        if opts.filesonly:
 68            if opts.invert:
 69                for line in lines:
 70                    m = regex.search(line)
 71                    if m:
 72                        break
 73                else:
 74                    yield (filename, None)
 75            else:
 76                for line in lines:
 77                    m = regex.search(line)
 78                    if m:
 79                        yield (filename, None)
 80                        break
 81        else:
 82            assert not opts.invert, opts
 83            for line in lines:
 84                m = regex.search(line)
 85                if not m:
 86                    continue
 87                if line.endswith(os.linesep):
 88                    line = line[:-len(os.linesep)]
 89                yield (filename, line)
 90    except UnicodeDecodeError:
 91        # It must be a binary file.
 92        return
 93
 94
 95def resolve_filenames(filenames, recursive=False):
 96    for filename in filenames:
 97        assert isinstance(filename, str), repr(filename)
 98        if filename == '-':
 99            yield '-'
100        elif not os.path.isdir(filename):
101            yield filename
102        elif recursive:
103            for d, _, files in os.walk(filename):
104                for base in files:
105                    yield os.path.join(d, base)
106
107
108if __name__ == '__main__':
109    # Parse the args.
110    import argparse
111    ap = argparse.ArgumentParser(prog='grep')
112
113    ap.add_argument('-r', '--recursive',
114                    action='store_true')
115    ap.add_argument('-L', '--files-without-match',
116                    dest='filesonly',
117                    action='store_const', const='invert')
118    ap.add_argument('-l', '--files-with-matches',
119                    dest='filesonly',
120                    action='store_const', const='match')
121    ap.add_argument('-q', '--quiet', action='store_true')
122    ap.set_defaults(invert=False)
123
124    reopts = ap.add_mutually_exclusive_group(required=True)
125    reopts.add_argument('-e', '--regexp', dest='regex',
126                        metavar='REGEX')
127    reopts.add_argument('regex', nargs='?',
128                        metavar='REGEX')
129
130    ap.add_argument('files', nargs='+', metavar='FILE')
131
132    opts = ap.parse_args()
133    ns = vars(opts)
134
135    regex = ns.pop('regex')
136    filenames = ns.pop('files')
137    recursive = ns.pop('recursive')
138    if opts.filesonly:
139        if opts.filesonly == 'invert':
140            opts.invert = True
141        else:
142            assert opts.filesonly == 'match', opts
143            opts.invert = False
144    opts.filesonly = bool(opts.filesonly)
145
146    def main(regex=regex, filenames=filenames):
147        # step 1
148        regex = re.compile(regex)
149        # step 2
150        filenames = resolve_filenames(filenames, recursive)
151        # step 3
152        matches = search(filenames, regex, opts)
153        matches = iter(matches)
154
155        # step 4
156
157        # Handle the first match.
158        for filename, line in matches:
159            if opts.quiet:
160                return 0
161            elif opts.filesonly:
162                print(filename)
163            else:
164                for second in matches:
165                    print(f'{filename}: {line}')
166                    filename, line = second
167                    print(f'{filename}: {line}')
168                    break
169                else:
170                    print(line)
171            break
172        else:
173            return 1
174
175        # Handle the remaining matches.
176        if opts.filesonly:
177            for filename, _ in matches:
178                print(filename)
179        else:
180            for filename, line in matches:
181                print(f'{filename}: {line}')
182
183        return 0
184    rc = main()
185
186    # step 5
187    sys.exit(rc)
(expand)
  1import os
  2import os.path
  3import re
  4import sys
  5
  6import test.support.interpreters as interpreters
  7import test.support.interpreters.queues as interp_queues
  8import types
  9import queue
 10import threading
 11
 12
 13def search(filenames, regex, opts):
 14    matches_by_file = queue.Queue()
 15
 16    def do_background():
 17        MAX_FILES = 10
 18        MAX_MATCHES = 100
 19        new_queue = interpreters.queues.create
 20
 21        def new_interpreter():
 22            interp = interpreters.create()
 23            interp.exec(f"""if True:
 24                with open({__file__!r}) as infile:
 25                    text = infile.read()
 26                ns = dict()
 27                exec(text, ns, ns)
 28                prep_interpreter = ns['prep_interpreter']
 29                del ns, text
 30
 31                search_file = prep_interpreter(
 32                    {regex.pattern!r},
 33                    {regex.flags},
 34                    {tuple(vars(opts).items())},
 35                )
 36                """)
 37            return interp
 38
 39        ready_workers = queue.Queue(MAX_FILES)
 40        workers = []
 41
 42        def next_worker():
 43            if len(workers) < MAX_FILES:
 44                interp = new_interpreter()
 45                workers.append(interp)
 46                ready_workers.put(interp)
 47            return ready_workers.get()  # blocking
 48
 49        def do_work(filename, matches, interp):
 50            interp.prepare_main(matches=matches)
 51            interp.exec(
 52                    f'search_file({filename!r}, matches)')
 53            # Let a new thread start.
 54            ready_workers.put(interp)
 55
 56        for filename in filenames:
 57            # Prepare for the file.
 58            matches = interp_queues.create(MAX_MATCHES)
 59            matches_by_file.put(matches)
 60            interp = next_worker()
 61
 62            # Start a thread to process the file.
 63            t = threading.Thread(
 64                target=do_work,
 65                args=(filename, matches, interp),
 66            )
 67            t.start()
 68        matches_by_file.put(None)
 69
 70    background = threading.Thread(target=do_background)
 71    background.start()
 72
 73    # Yield the results as they are received, in order.
 74    matches = matches_by_file.get()  # blocking
 75    while matches is not None:
 76        match = matches.get()  # blocking
 77        while match is not None:
 78            yield match
 79            match = matches.get()  # blocking
 80        matches = matches_by_file.get()  # blocking
 81
 82    background.join()
 83
 84
 85def prep_interpreter(regex_pat, regex_flags, opts):
 86    regex = re.compile(regex_pat, regex_flags)
 87    opts = types.SimpleNamespace(**dict(opts))
 88
 89    def search_file(filename, matches):
 90        lines = iter_lines(filename)
 91        for match in search_lines(
 92                            lines, regex, opts, filename):
 93            matches.put(match)  # blocking
 94        matches.put(None)  # blocking
 95    return search_file
 96
 97
 98def iter_lines(filename):
 99    if filename == '-':
100        yield from sys.stdin
101    else:
102        with open(filename) as infile:
103            yield from infile
104
105
106def search_lines(lines, regex, opts, filename):
107    try:
108        if opts.filesonly:
109            if opts.invert:
110                for line in lines:
111                    m = regex.search(line)
112                    if m:
113                        break
114                else:
115                    yield (filename, None)
116            else:
117                for line in lines:
118                    m = regex.search(line)
119                    if m:
120                        yield (filename, None)
121                        break
122        else:
123            assert not opts.invert, opts
124            for line in lines:
125                m = regex.search(line)
126                if not m:
127                    continue
128                if line.endswith(os.linesep):
129                    line = line[:-len(os.linesep)]
130                yield (filename, line)
131    except UnicodeDecodeError:
132        # It must be a binary file.
133        return
134
135
136def resolve_filenames(filenames, recursive=False):
137    for filename in filenames:
138        assert isinstance(filename, str), repr(filename)
139        if filename == '-':
140            yield '-'
141        elif not os.path.isdir(filename):
142            yield filename
143        elif recursive:
144            for d, _, files in os.walk(filename):
145                for base in files:
146                    yield os.path.join(d, base)
147
148
149if __name__ == '__main__':
150    # Parse the args.
151    import argparse
152    ap = argparse.ArgumentParser(prog='grep')
153
154    ap.add_argument('-r', '--recursive',
155                    action='store_true')
156    ap.add_argument('-L', '--files-without-match',
157                    dest='filesonly',
158                    action='store_const', const='invert')
159    ap.add_argument('-l', '--files-with-matches',
160                    dest='filesonly',
161                    action='store_const', const='match')
162    ap.add_argument('-q', '--quiet', action='store_true')
163    ap.set_defaults(invert=False)
164
165    reopts = ap.add_mutually_exclusive_group(required=True)
166    reopts.add_argument('-e', '--regexp', dest='regex',
167                        metavar='REGEX')
168    reopts.add_argument('regex', nargs='?',
169                        metavar='REGEX')
170
171    ap.add_argument('files', nargs='+', metavar='FILE')
172
173    opts = ap.parse_args()
174    ns = vars(opts)
175
176    regex = ns.pop('regex')
177    filenames = ns.pop('files')
178    recursive = ns.pop('recursive')
179    if opts.filesonly:
180        if opts.filesonly == 'invert':
181            opts.invert = True
182        else:
183            assert opts.filesonly == 'match', opts
184            opts.invert = False
185    opts.filesonly = bool(opts.filesonly)
186
187    def main(regex=regex, filenames=filenames):
188        # step 1
189        regex = re.compile(regex)
190        # step 2
191        filenames = resolve_filenames(filenames, recursive)
192        # step 3
193        matches = search(filenames, regex, opts)
194        matches = iter(matches)
195
196        # step 4
197
198        # Handle the first match.
199        for filename, line in matches:
200            if opts.quiet:
201                return 0
202            elif opts.filesonly:
203                print(filename)
204            else:
205                for second in matches:
206                    print(f'{filename}: {line}')
207                    filename, line = second
208                    print(f'{filename}: {line}')
209                    break
210                else:
211                    print(line)
212            break
213        else:
214            return 1
215
216        # Handle the remaining matches.
217        if opts.filesonly:
218            for filename, _ in matches:
219                print(filename)
220        else:
221            for filename, line in matches:
222                print(f'{filename}: {line}')
223
224        return 0
225    rc = main()
226
227    # step 5
228    sys.exit(rc)
(expand)
  1import os
  2import os.path
  3import re
  4import sys
  5
  6import asyncio
  7
  8
  9async def search(filenames, regex, opts):
 10    matches_by_file = asyncio.Queue()
 11
 12    async def do_background():
 13        MAX_FILES = 10
 14        MAX_MATCHES = 100
 15
 16        # Make sure we don't have too many coros at once,
 17        # i.e. too many files open at once.
 18        counter = asyncio.Semaphore(MAX_FILES)
 19
 20        async def search_file(filename, matches):
 21            # aiter_lines() opens the file too.
 22            lines = iter_lines(filename)
 23            async for m in search_lines(
 24                            lines, regex, opts, filename):
 25                await matches.put(match)
 26            await matches.put(None)
 27            # Let a new coroutine start.
 28            counter.release()
 29
 30        async with asyncio.TaskGroup() as tg:
 31            for filename in filenames:
 32                # Prepare for the file.
 33                matches = asyncio.Queue(MAX_MATCHES)
 34                await matches_by_file.put(matches)
 35
 36                # Start a coroutine to process the file.
 37                tg.create_task(
 38                    search_file(filename, matches),
 39                )
 40                await counter.acquire()
 41            await matches_by_file.put(None)
 42
 43    background = asyncio.create_task(do_background())
 44
 45    # Yield the results as they are received, in order.
 46    matches = await matches_by_file.get()  # blocking
 47    while matches is not None:
 48        match = await matches.get()  # blocking
 49        while match is not None:
 50            yield match
 51            match = await matches.get()  # blocking
 52        matches = await matches_by_file.get()  # blocking
 53
 54    await asyncio.wait([background])
 55
 56
 57async def iter_lines(filename):
 58    if filename == '-':
 59        infile = sys.stdin
 60        line = await read_line_async(infile)
 61        while line:
 62            yield line
 63            line = await read_line_async(infile)
 64    else:
 65        # XXX Open using async?
 66        with open(filename) as infile:
 67            line = await read_line_async(infile)
 68            while line:
 69                yield line
 70                line = await read_line_async(infile)
 71
 72
 73async def read_line_async(infile):
 74    # XXX Do this async!
 75    # maybe make use of asyncio.to_thread()
 76    # or loop.run_in_executor()?
 77    return infile.readline()
 78
 79
 80async def search_lines(lines, regex, opts, filename):
 81    try:
 82        if opts.filesonly:
 83            if opts.invert:
 84                async for line in lines:
 85                    m = regex.search(line)
 86                    if m:
 87                        break
 88                else:
 89                    yield (filename, None)
 90            else:
 91                async for line in lines:
 92                    m = regex.search(line)
 93                    if m:
 94                        yield (filename, None)
 95                        break
 96        else:
 97            assert not opts.invert, opts
 98            async for line in lines:
 99                m = regex.search(line)
100                if not m:
101                    continue
102                if line.endswith(os.linesep):
103                    line = line[:-len(os.linesep)]
104                yield (filename, line)
105    except UnicodeDecodeError:
106        # It must be a binary file.
107        return
108
109
110def resolve_filenames(filenames, recursive=False):
111    for filename in filenames:
112        assert isinstance(filename, str), repr(filename)
113        if filename == '-':
114            yield '-'
115        elif not os.path.isdir(filename):
116            yield filename
117        elif recursive:
118            for d, _, files in os.walk(filename):
119                for base in files:
120                    yield os.path.join(d, base)
121
122
123if __name__ == '__main__':
124    # Parse the args.
125    import argparse
126    ap = argparse.ArgumentParser(prog='grep')
127
128    ap.add_argument('-r', '--recursive',
129                    action='store_true')
130    ap.add_argument('-L', '--files-without-match',
131                    dest='filesonly',
132                    action='store_const', const='invert')
133    ap.add_argument('-l', '--files-with-matches',
134                    dest='filesonly',
135                    action='store_const', const='match')
136    ap.add_argument('-q', '--quiet', action='store_true')
137    ap.set_defaults(invert=False)
138
139    reopts = ap.add_mutually_exclusive_group(required=True)
140    reopts.add_argument('-e', '--regexp', dest='regex',
141                        metavar='REGEX')
142    reopts.add_argument('regex', nargs='?',
143                        metavar='REGEX')
144
145    ap.add_argument('files', nargs='+', metavar='FILE')
146
147    opts = ap.parse_args()
148    ns = vars(opts)
149
150    regex = ns.pop('regex')
151    filenames = ns.pop('files')
152    recursive = ns.pop('recursive')
153    if opts.filesonly:
154        if opts.filesonly == 'invert':
155            opts.invert = True
156        else:
157            assert opts.filesonly == 'match', opts
158            opts.invert = False
159    opts.filesonly = bool(opts.filesonly)
160
161    async def main(regex=regex, filenames=filenames):
162        # step 1
163        regex = re.compile(regex)
164        # step 2
165        filenames = resolve_filenames(filenames, recursive)
166        # step 3
167        matches = search(filenames, regex, opts)
168        matches = type(matches).__aiter__(matches)
169
170        # step 4
171
172        # Handle the first match.
173        async for filename, line in matches:
174            if opts.quiet:
175                return 0
176            elif opts.filesonly:
177                print(filename)
178            else:
179                async for second in matches:
180                    print(f'{filename}: {line}')
181                    filename, line = second
182                    print(f'{filename}: {line}')
183                    break
184                else:
185                    print(line)
186            break
187        else:
188            return 1
189
190        # Handle the remaining matches.
191        if opts.filesonly:
192            async for filename, _ in matches:
193                print(filename)
194        else:
195            async for filename, line in matches:
196                print(f'{filename}: {line}')
197
198        return 0
199    rc = asyncio.run(main())
200
201    # step 5
202    sys.exit(rc)
(expand)
  1import os
  2import os.path
  3import re
  4import sys
  5
  6import multiprocessing
  7import queue
  8import threading
  9
 10
 11def search(filenames, regex, opts):
 12    matches_by_file = queue.Queue()
 13
 14    def do_background():
 15        MAX_FILES = 10
 16        MAX_MATCHES = 100
 17
 18        # Make sure we don't have too many procs at once,
 19        # i.e. too many files open at once.
 20        counter = threading.Semaphore(MAX_FILES)
 21        finished = multiprocessing.Queue()
 22        active = {}
 23        done = False
 24
 25        def monitor_tasks():
 26            while not done:
 27                try:
 28                    index = finished.get(timeout=0.1)
 29                except queue.Empty:
 30                    continue
 31                proc = active.pop(index)
 32                proc.join(0.1)
 33                if proc.is_alive():
 34                    # It's taking too long to terminate.
 35                    # We can wait for it at the end.
 36                    active[index] = proc
 37                # Let a new process start.
 38                counter.release()
 39        monitor = threading.Thread(target=monitor_tasks)
 40        monitor.start()
 41
 42        for index, filename in enumerate(filenames):
 43            # Prepare for the file.
 44            matches = multiprocessing.Queue(MAX_MATCHES)
 45            matches_by_file.put(matches)
 46
 47            # Start a subprocess to process the file.
 48            proc = multiprocessing.Process(
 49                target=search_file,
 50                args=(filename, matches, regex, opts,
 51                      index, finished),
 52            )
 53            counter.acquire(blocking=True)
 54            active[index] = proc
 55            proc.start()
 56        matches_by_file.put(None)
 57        # Wait for all remaining tasks to finish.
 58        done = True
 59        monitor.join()
 60        for proc in active.values():
 61            proc.join()
 62
 63    background = threading.Thread(target=do_background)
 64    background.start()
 65
 66    # Yield the results as they are received, in order.
 67    matches = matches_by_file.get()  # blocking
 68    while matches is not None:
 69        match = matches.get()  # blocking
 70        while match is not None:
 71            yield match
 72            match = matches.get()  # blocking
 73        matches = matches_by_file.get()  # blocking
 74
 75    background.join()
 76
 77
 78def search_file(filename, matches, regex, opts,
 79                index, finished):
 80    lines = iter_lines(filename)
 81    for match in search_lines(lines, regex, opts, filename):
 82        matches.put(match)  # blocking
 83    matches.put(None)  # blocking
 84    # Let a new process start.
 85    finished.put(index)
 86
 87
 88def iter_lines(filename):
 89    if filename == '-':
 90        yield from sys.stdin
 91    else:
 92        with open(filename) as infile:
 93            yield from infile
 94
 95
 96def search_lines(lines, regex, opts, filename):
 97    try:
 98        if opts.filesonly:
 99            if opts.invert:
100                for line in lines:
101                    m = regex.search(line)
102                    if m:
103                        break
104                else:
105                    yield (filename, None)
106            else:
107                for line in lines:
108                    m = regex.search(line)
109                    if m:
110                        yield (filename, None)
111                        break
112        else:
113            assert not opts.invert, opts
114            for line in lines:
115                m = regex.search(line)
116                if not m:
117                    continue
118                if line.endswith(os.linesep):
119                    line = line[:-len(os.linesep)]
120                yield (filename, line)
121    except UnicodeDecodeError:
122        # It must be a binary file.
123        return
124
125
126def resolve_filenames(filenames, recursive=False):
127    for filename in filenames:
128        assert isinstance(filename, str), repr(filename)
129        if filename == '-':
130            yield '-'
131        elif not os.path.isdir(filename):
132            yield filename
133        elif recursive:
134            for d, _, files in os.walk(filename):
135                for base in files:
136                    yield os.path.join(d, base)
137
138
139if __name__ == '__main__':
140    multiprocessing.set_start_method('spawn')
141
142    # Parse the args.
143    import argparse
144    ap = argparse.ArgumentParser(prog='grep')
145
146    ap.add_argument('-r', '--recursive',
147                    action='store_true')
148    ap.add_argument('-L', '--files-without-match',
149                    dest='filesonly',
150                    action='store_const', const='invert')
151    ap.add_argument('-l', '--files-with-matches',
152                    dest='filesonly',
153                    action='store_const', const='match')
154    ap.add_argument('-q', '--quiet', action='store_true')
155    ap.set_defaults(invert=False)
156
157    reopts = ap.add_mutually_exclusive_group(required=True)
158    reopts.add_argument('-e', '--regexp', dest='regex',
159                        metavar='REGEX')
160    reopts.add_argument('regex', nargs='?',
161                        metavar='REGEX')
162
163    ap.add_argument('files', nargs='+', metavar='FILE')
164
165    opts = ap.parse_args()
166    ns = vars(opts)
167
168    regex = ns.pop('regex')
169    filenames = ns.pop('files')
170    recursive = ns.pop('recursive')
171    if opts.filesonly:
172        if opts.filesonly == 'invert':
173            opts.invert = True
174        else:
175            assert opts.filesonly == 'match', opts
176            opts.invert = False
177    opts.filesonly = bool(opts.filesonly)
178
179    def main(regex=regex, filenames=filenames):
180        # step 1
181        regex = re.compile(regex)
182        # step 2
183        filenames = resolve_filenames(filenames, recursive)
184        # step 3
185        matches = search(filenames, regex, opts)
186        matches = iter(matches)
187
188        # step 4
189
190        # Handle the first match.
191        for filename, line in matches:
192            if opts.quiet:
193                return 0
194            elif opts.filesonly:
195                print(filename)
196            else:
197                for second in matches:
198                    print(f'{filename}: {line}')
199                    filename, line = second
200                    print(f'{filename}: {line}')
201                    break
202                else:
203                    print(line)
204            break
205        else:
206            return 1
207
208        # Handle the remaining matches.
209        if opts.filesonly:
210            for filename, _ in matches:
211                print(filename)
212        else:
213            for filename, line in matches:
214                print(f'{filename}: {line}')
215
216        return 0
217    rc = main()
218
219    # step 5
220    sys.exit(rc)
(expand)
  1import os
  2import os.path
  3import re
  4import sys
  5
  6from concurrent.futures import ThreadPoolExector
  7import queue
  8import threading
  9
 10
 11def search(filenames, regex, opts):
 12    matches_by_file = queue.Queue()
 13
 14    def do_background():
 15        MAX_FILES = 10
 16        MAX_MATCHES = 100
 17
 18        def search_file(filename, matches):
 19            lines = iter_lines(filename)
 20            for match in search_lines(
 21                            lines, regex, opts, filename):
 22                matches.put(match)  # blocking
 23            matches.put(None)  # blocking
 24
 25        with ThreadPoolExecutor(MAX_FILES) as workers:
 26            for filename in filenames:
 27                # Prepare for the file.
 28                matches = queue.Queue(MAX_MATCHES)
 29                matches_by_file.put(matches)
 30
 31                # Start a thread to process the file.
 32                workers.submit(
 33                            search_file, filename, matches)
 34            matches_by_file.put(None)
 35
 36    background = threading.Thread(target=do_background)
 37    background.start()
 38
 39    # Yield the results as they are received, in order.
 40    matches = matches_by_file.get()  # blocking
 41    while matches is not None:
 42        match = matches.get()  # blocking
 43        while match is not None:
 44            yield match
 45            match = matches.get()  # blocking
 46        matches = matches_by_file.get()  # blocking
 47
 48    background.join()
 49
 50
 51def iter_lines(filename):
 52    if filename == '-':
 53        yield from sys.stdin
 54    else:
 55        with open(filename) as infile:
 56            yield from infile
 57
 58
 59def search_lines(lines, regex, opts, filename):
 60    try:
 61        if opts.filesonly:
 62            if opts.invert:
 63                for line in lines:
 64                    m = regex.search(line)
 65                    if m:
 66                        break
 67                else:
 68                    yield (filename, None)
 69            else:
 70                for line in lines:
 71                    m = regex.search(line)
 72                    if m:
 73                        yield (filename, None)
 74                        break
 75        else:
 76            assert not opts.invert, opts
 77            for line in lines:
 78                m = regex.search(line)
 79                if not m:
 80                    continue
 81                if line.endswith(os.linesep):
 82                    line = line[:-len(os.linesep)]
 83                yield (filename, line)
 84    except UnicodeDecodeError:
 85        # It must be a binary file.
 86        return
 87
 88
 89def resolve_filenames(filenames, recursive=False):
 90    for filename in filenames:
 91        assert isinstance(filename, str), repr(filename)
 92        if filename == '-':
 93            yield '-'
 94        elif not os.path.isdir(filename):
 95            yield filename
 96        elif recursive:
 97            for d, _, files in os.walk(filename):
 98                for base in files:
 99                    yield os.path.join(d, base)
100
101
102if __name__ == '__main__':
103    # Parse the args.
104    import argparse
105    ap = argparse.ArgumentParser(prog='grep')
106
107    ap.add_argument('-r', '--recursive',
108                    action='store_true')
109    ap.add_argument('-L', '--files-without-match',
110                    dest='filesonly',
111                    action='store_const', const='invert')
112    ap.add_argument('-l', '--files-with-matches',
113                    dest='filesonly',
114                    action='store_const', const='match')
115    ap.add_argument('-q', '--quiet', action='store_true')
116    ap.set_defaults(invert=False)
117
118    reopts = ap.add_mutually_exclusive_group(required=True)
119    reopts.add_argument('-e', '--regexp', dest='regex',
120                        metavar='REGEX')
121    reopts.add_argument('regex', nargs='?',
122                        metavar='REGEX')
123
124    ap.add_argument('files', nargs='+', metavar='FILE')
125
126    opts = ap.parse_args()
127    ns = vars(opts)
128
129    regex = ns.pop('regex')
130    filenames = ns.pop('files')
131    recursive = ns.pop('recursive')
132    if opts.filesonly:
133        if opts.filesonly == 'invert':
134            opts.invert = True
135        else:
136            assert opts.filesonly == 'match', opts
137            opts.invert = False
138    opts.filesonly = bool(opts.filesonly)
139
140    def main(regex=regex, filenames=filenames):
141        # step 1
142        regex = re.compile(regex)
143        # step 2
144        filenames = resolve_filenames(filenames, recursive)
145        # step 3
146        matches = search(filenames, regex, opts)
147        matches = iter(matches)
148
149        # step 4
150
151        # Handle the first match.
152        for filename, line in matches:
153            if opts.quiet:
154                return 0
155            elif opts.filesonly:
156                print(filename)
157            else:
158                for second in matches:
159                    print(f'{filename}: {line}')
160                    filename, line = second
161                    print(f'{filename}: {line}')
162                    break
163                else:
164                    print(line)
165            break
166        else:
167            return 1
168
169        # Handle the remaining matches.
170        if opts.filesonly:
171            for filename, _ in matches:
172                print(filename)
173        else:
174            for filename, line in matches:
175                print(f'{filename}: {line}')
176
177        return 0
178    rc = main()
179
180    # step 5
181    sys.exit(rc)

Model-specific details

Here are some implementation-specific details we had to deal with.

threads:

interpreters:

multiprocessing:

asyncio:

concurrent.futures

For threads, multiprocessing, and multiple interpreters *, you can also use concurrent.futures:

(expand)
 1def search_using_threads_cf(filenames, regex, opts):
 2    matches_by_file = queue.Queue()
 3
 4    def do_background():
 5        MAX_FILES = 10
 6        MAX_MATCHES = 100
 7
 8        def search_file(filename, matches):
 9            lines = iter_lines(filename)
10            for match in search_lines(lines, regex, opts, filename):
11                matches.put(match)  # blocking
12            matches.put(None)  # blocking
13
14        with ThreadPoolExecutor(MAX_FILES) as workers:
15            for filename in filenames:
16                # Prepare for the file.
17                matches = queue.Queue(MAX_MATCHES)
18                matches_by_file.put(matches)
19
20                # Start a thread to process the file.
21                workers.submit(search_file, filename, matches)
22            matches_by_file.put(None)
23
24    background = threading.Thread(target=do_background)
25    background.start()
26
27    # Yield the results as they are received, in order.
28    matches = matches_by_file.get()  # blocking
29    while matches is not None:
30        match = matches.get()  # blocking
31        while match is not None:
32            yield match
33            match = matches.get()  # blocking
34        matches = matches_by_file.get()  # blocking
35
36    background.join()

For processes`, use concurrent.futures.ProcessPoolExecutor. For interpreters, use InterpreterPoolExecutor. In both cases you must use the proper queue type and there are a few other minor differences.


Workload 2: …

TBD

Design and analysis

Design steps from above:

  1. concurrency fits?

    TBD

  2. identify logical tasks

    TBD

  3. select concurrent tasks

    TBD

  4. concurrency-related characteristics

    TBD

  5. pick best model

    TBD

Here are additional key constraints and considerations:

High-level code

# …

Side-by-side

Here’s the implementations for the different concurrency models, side-by-side for easy comparison:

sequential

threads

multiple interpreters

coroutines

multiple processes

concurrent.futures

(expand)
1# sequential 3
2...
(expand)
1import threading
2
3def task():
4    ...
5
6t = threading.Thread(target=task)
7t.start()
8
9...
(expand)
1# subinterpreters 3
2...
(expand)
1# async 3
2...
(expand)
1import multiprocessing
2
3def task():
4    ...
5
6...
(expand)
1# concurrent.futures 2
2...

Workload 3: …

TBD

Design and analysis

Design steps from above:

  1. concurrency fits?

    TBD

  2. identify logical tasks

    TBD

  3. select concurrent tasks

    TBD

  4. concurrency-related characteristics

    TBD

  5. pick best model

    TBD

Here are additional key constraints and considerations:

High-level code

# …

Side-by-side

Here’s the implementations for the different concurrency models, side-by-side for easy comparison:

sequential

threads

multiple interpreters

coroutines

multiple processes

concurrent.futures

(expand)
1# sequential 3
2...
(expand)
1import threading
2
3def task():
4    ...
5
6t = threading.Thread(target=task)
7t.start()
8
9...
(expand)
1# subinterpreters 3
2...
(expand)
1# async 3
2...
(expand)
1import multiprocessing
2
3def task():
4    ...
5
6...
(expand)
1# concurrent.futures 3
2...