Threads¶
ThreadPoolExcecutor¶
Simple example using threads to do something
#!/usr/bin/env python3
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import uuid
import random
def something(payload):
print(f'Completing task with payload {payload}...', file=sys.stderr)
time.sleep(random.uniform(1, 5))
data = str(uuid.uuid4())
print(f'Got data {data} for payload {payload}', file=sys.stderr)
return str(uuid.uuid4())
def do_stuff(n):
with ThreadPoolExecutor(max_workers=None) as executor:
results = [executor.submit(something, i) for i in range(1, n)]
for future in as_completed(results):
yield future.result()
for stuff in do_stuff(11):
print(stuff)
Context ThreadPool¶
class ScopedThreadPool(ThreadPoolExecutor):
"""
Wrapper around ThreadPoolExecutor that ensures all futures are completed before exiting the context
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.futures = []
def submit(self, *args, **kwargs):
future = super().submit(*args, **kwargs)
self.futures.append(future)
return future
def __exit__(self, *args, **kwargs):
while True:
done, pending = wait(self.futures)
# may have futures that creates more futures
if len(done) == len(self.futures):
break
result = super().__exit__(*args, **kwargs)
if exceptions := [f.exception() for f in self.futures if f.exception()]:
if len(exceptions) == 1:
raise exceptions[0]
raise BaseExceptionGroup('failed', exceptions)
return result
with ScopedThreadPool() as executor:
executor.submit(func)
with ScopedThreadPool() as executor:
for region in {'ap-southeast-2', 'us-east-1'}:
@executor.submit
def job(region=region):
print(region)