Jak zoptymalizować wydajność aplikacji Python przy użyciu asynchroniczności i programowania współbieżnego

0
9
Rate this post

Nawigacja:

Dlaczego aplikacja „muli”? Krótkie wprowadzenie do wydajności w Pythonie

Główne źródła wolnego działania: I/O, blokujące operacje, CPU

Wydajność aplikacji Python zwykle nie zabija jeden „magiczny” problem, tylko zestaw małych decyzji, które sumują się w spowolnienie. Typowo spowalniają:

  • blokujące operacje I/O – sieć, bazy danych, system plików, kolejki, zewnętrzne API,
  • intensywne obliczenia CPU – przetwarzanie dużych datasetów, analiza obrazów, szyfrowanie, ML,
  • nieoptymalna architektura – sekwencyjne zależności, zbędne oczekiwanie, brak cache’u.

Jeżeli aplikacja wysyła setki żądań HTTP, ale obsługuje je po kolei, to nawet szybkie API zewnętrzne sprawi wrażenie, jakby komunikowało się gołębiami pocztowymi. Z drugiej strony, jeżeli aplikacja robi ciężkie obliczenia w jednym wątku, to nawet najlepsza asynchroniczność nie pomoże – procesor po prostu ma za dużo do policzenia.

Optymalizacja wydajności w Pythonie z użyciem asynchroniczności i programowania współbieżnego polega na tym, aby:

  • nie marnować czasu na czekanie (asynchroniczne I/O),
  • wykorzystać wiele rdzeni CPU, gdy liczenie jest wąskim gardłem (multiprocessing),
  • czytelnie zaprojektować przepływ zadań, tak aby nie wykonywać pracy dwa razy i nie trzymać blokujących operacji w środku krytycznych ścieżek.

Pojęcie I/O-bound vs CPU-bound na przykładach

Kluczowe rozróżnienie: czy aplikacja jest ograniczona przez operacje I/O (I/O-bound), czy przez moc procesora (CPU-bound).

Przykłady zadań I/O-bound:

  • pobieranie danych z wielu API zewnętrznych,
  • obsługa dużej liczby jednoczesnych połączeń HTTP (serwer webowy),
  • odpytywanie bazy danych, wysyłanie zapytań do Elasticsearch,
  • czytanie i zapisywanie dużej liczby plików (logi, importy, eksporty).

Przykłady zadań CPU-bound:

  • kompresja/dekompresja plików,
  • analiza lub transformacja obrazów,
  • trenowanie modeli ML,
  • ciężkie agregacje na dużych strukturach danych w pamięci.

Asynchroniczność w Pythonie (moduł asyncio i biblioteki „async-first”) przynosi ogromne zyski dla zadań I/O-bound, bo pozwala wykonywać inne operacje w czasie oczekiwania na sieć lub dysk. Natomiast dla zadań CPU-bound potrzebne jest inne podejście: multiprocessing, delegacja do C/NumPy czy wydzielenie osobnych usług.

Gdzie asynchroniczność naprawdę pomaga, a gdzie jest zbędnym gadżetem

Asynchroniczność w Pythonie nie jest magicznym przyspieszaczem wszystkiego. Działa najlepiej tam, gdzie dominują:

  • częste, krótkie zapytania HTTP lub do bazy,
  • duża liczba jednoczesnych klientów (API, websockety),
  • operacje, które długo czekają na odpowiedź, ale niewiele liczą.

Jeżeli logika biznesowa sprowadza się do: „weź dane z trzech usług, złóż w JSON i zwróć”, to dobrze zaprojektowana asynchroniczność potrafi skrócić czas odpowiedzi z sekund do ułamków sekundy, głównie dzięki równoległemu zapytaniu do usług.

Z kolei tam, gdzie:

  • przetwarzasz duże macierze w NumPy,
  • odpalasz intensywny kod C/CPP/PyTorch,
  • wykonujesz złożone analizy danych w pamięci,

asynchroniczność nie przyspieszy obliczeń. Może co najwyżej pomóc w obsłudze otoczenia (I/O i koordynacja), ale CPU nadal „robi swoje” w jednym wątku na proces. W takich sytuacjach trzeba sięgnąć po inne narzędzia współbieżności.

GIL w skrócie: co faktycznie blokuje, a czego nie dotyka

Python (CPython) ma Global Interpreter Lock (GIL), który pozwala naraz wykonywać Pythonowy bytecode tylko jednemu wątkowi w procesie. To powoduje wiele nieporozumień:

  • wątki Pythonowe nie skalują obliczeń CPU na wiele rdzeni,
  • GIL nie blokuje operacji, które „schodzą” do C i zwalniają GIL (np. I/O, część bibliotek numerycznych),
  • GIL nie uniemożliwia równoległego I/O, jeśli kod jest poprawnie napisany.

Asynchroniczność w Pythonie (asyncio) działa w jednym wątku, ale wykonuje zadania „naprzemiennie”, oddając sterowanie przy każdym await. Dzięki temu w czasie oczekiwania na I/O pętla może obsłużyć inne zadania. GIL nie jest tu specjalnym problemem, bo i tak nie próbujemy wykonywać Pythonowego kodu równolegle.

Myślenie o przepływie zadań zamiast magicznych przyspieszaczy

Optymalizacja aplikacji Python przy użyciu asynchroniczności i programowania współbieżnego zaczyna się na poziomie sposobu myślenia. Zamiast szukać „szybkiej flagi” w konfiguracji, lepiej rozpisać sobie:

  • jakie fragmenty kodu czekają na I/O,
  • które fragmenty liczą intensywnie,
  • które operacje można wykon(y)wać równolegle,
  • co można zbuforować, cache’ować, skleić w jedno większe zapytanie.

Dopiero wtedy sensownie dobiera się narzędzia: asyncio, wątki, procesy, kolejki. Dodanie „async” do kilku funkcji bez zrozumienia przepływu zadań zwykle kończy się tylko bardziej złożonym kodem – bez realnego zysku.

Podstawy współbieżności: procesy, wątki, asynchroniczność – kto tu rządzi?

Różnice między równoległością a współbieżnością

Dwa słowa, które często są wrzucane do jednego worka: parallelism (równoległość) i concurrency (współbieżność).

  • Współbieżność – umiejętność obsługi wielu zadań naraz, ale niekoniecznie w tym samym momencie na wielu rdzeniach; raczej „przełączanie się” między zadaniami.
  • Równoległość – faktyczne wykonywanie wielu zadań w tym samym czasie na wielu rdzeniach CPU.

asyncio to narzędzie do współbieżności. Multiprocessing to narzędzie do równoległości. Threading w Pythonie to hybryda, ograniczona GIL dla zadań CPU, ale użyteczna dla części zadań I/O.

Procesy vs wątki: kiedy które podejście ma sens

W Pythonie mamy trzy główne „dźwignie”:

  • procesy (multiprocessing, ProcessPoolExecutor) – osobne interpretery Pythona, osobna pamięć, brak współdzielonego GIL,
  • wątki (threading, ThreadPoolExecutor) – współdzielony proces, współdzielona pamięć, ale jeden GIL,
  • asynchroniczność (asyncio) – jeden wątek, pętla zdarzeń, przełączanie się między zadaniami na poziomie aplikacji.

Najprostsze reguły:

  • dużo I/O, mało CPU – asyncio + ewentualnie trochę wątków,
  • dużo CPU, mało I/O – multiprocessing,
  • kod legacy, biblioteki blokujące, ale nie bardzo CPU-heavy – wątki w połączeniu z asyncio.

Model asynchroniczny: event loop i jeden wątek, wielu klientów

Model async opiera się na pętli zdarzeń (event loop). To główny „koordynator”:

  • utrzymuje listę zadań do wykonania,
  • przekazuje im sterowanie, gdy mogą się ruszyć dalej (np. I/O zakończone),
  • odbiera sterowanie, gdy zadanie „czeka” (await na operację I/O).

Całość działa w jednym wątku, bez preempcji. Oznacza to, że:

  • zadanie musi samo oddać sterowanie przez await,
  • jeżeli zrobisz długą, blokującą pętlę for bez żadnego await, blokujesz całą pętlę zdarzeń.

To dlatego w kodzie async time.sleep() jest jak wciśnięcie pauzy na całym serwerze – zamiast niego należy używać await asyncio.sleep().

Jak Python „pogodził” GIL z multiprocessingiem i threadingiem

Strategia CPythona:

  • wątki korzystają z jednego GIL – więc kod CPU-bound w wątkach nie skaluje się liniowo z liczbą rdzeni,
  • procesy mają własny GIL – dzięki temu multiprocessing pozwala faktycznie liczyć równolegle na wielu rdzeniach,
  • wiele bibliotek I/O (np. sieć, część baz danych) zwalnia GIL podczas czekania, dzięki czemu inne wątki mogą działać.

W praktyce do skalowania CPU używa się procesów, a do współbieżnego I/O – asyncio + ewentualnie wątki tam, gdzie trzeba owinąć kod blokujący.

Prosty schemat wyboru modelu dla problemu

Szybki „routing” problemów:

  • „Mam API, które robi głównie zapytania do innych usług/bazy” → asyncio / framework async (FastAPI, Starlette).
  • „Mam pipeline przetwarzania danych (CPU-heavy)” → multiprocessing, job queue (Celery, RQ), ewentualnie osobne mikrousługi.
  • „Mam legacy kod z requests i blokującym I/O, ale chcę serwer async” → asyncio + run_in_executor + ThreadPoolExecutor.
  • „Mam skrypt CLI, który odpytuje 1000 URL-i” → asyncio + httpx/aiohttp.
Kolorowy kod w Pythonie wyświetlony na monitorze podczas programowania
Źródło: Pexels | Autor: Nemuel Sereti

Wprowadzenie do asyncio: event loop, zadania, korutyny

Czym jest event loop i jak zarządza wykonaniem zadań

Event loop to serce asynchroniczności w Pythonie:

  • startuje i kończy zadania (coroutines),
  • komunikuje się z systemem operacyjnym (epoll/kqueue/IOCP),
  • decuduje, które zadanie może być wykonywane w danym momencie.

Z perspektywy programisty istotne są trzy komendy:

  • asyncio.run(main()) – uruchomienie pętli zdarzeń z główną korutyną,
  • asyncio.create_task(coro()) – odpalenie korutyny jako osobnego zadania,
  • await something() – oddanie sterowania do event loopa i kontynuacja po zakończeniu oczekiwanej operacji.

Korutyny, async def, await – co technicznie oznaczają

Funkcja zdefiniowana jako:

async def fetch_data():
    ...

nie zwraca wyniku od razu. Zwraca obiekt coroutine, który dopiero po await faktycznie wykonuje się do końca. Oznacza to, że:

  • fetch_data() (bez await) tworzy korutynę,
  • await fetch_data() – wykonuje ją i czeka na rezultat,
  • asyncio.create_task(fetch_data()) – odpala ją w tle (jako task), a bieżąca korutyna idzie dalej.

await działa tylko wewnątrz funkcji oznaczonej async def. Próba użycia await w zwykłej funkcji zakończy się błędem składni.

Task vs zwykła korutyna – różnica w zarządzaniu życiem zadania

Zwykła korutyna wymaga jawnego await, np.:

result = await fetch_data()

Task (obiekt asyncio.Task) to korutyna „odpalona” w tle:

task = asyncio.create_task(fetch_data())
# ... inny kod ...
result = await task

Różnice:

Jeśli chcesz pójść krok dalej, pomocny może być też wpis: Efektywne iteratory w Pythonie.

  • task startuje od razu po utworzeniu,
  • może działać równolegle z innymi korutynami,
  • można go anulować (task.cancel()),
  • bez await task wyjątki wewnątrz mogą być „zagubione” lub zalogowane z opóźnieniem.

Równoległe pobieranie wielu adresów URL – prosty przykład

Przykład: sekwencyjne vs asynchroniczne pobieranie danych

Najpierw wersja „łopatologiczna”, blokująca, z użyciem requests:

import time
import requests

URLS = [
    "https://example.com",
    "https://httpbin.org/delay/1",
    "https://python.org",
]

def fetch(url: str) -> str:
    response = requests.get(url)
    response.raise_for_status()
    return response.text

def main():
    start = time.perf_counter()
    for url in URLS:
        html = fetch(url)
        print(f"Pobrano {len(html)} bajtów z {url}")
    elapsed = time.perf_counter() - start
    print(f"Czas: {elapsed:.2f}s")

if __name__ == "__main__":
    main()

Przy kilku wolniejszych serwisach skończy się to sumą czasów odpowiedzi. Teraz wariant asynchroniczny z aiohttp:

import asyncio
import time
import aiohttp

URLS = [
    "https://example.com",
    "https://httpbin.org/delay/1",
    "https://python.org",
]

async def fetch(session: aiohttp.ClientSession, url: str) -> str:
    async with session.get(url) as resp:
        resp.raise_for_status()
        return await resp.text()

async def main():
    start = time.perf_counter()
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch(session, url)) for url in URLS]
        for url, task in zip(URLS, tasks):
            html = await task
            print(f"Pobrano {len(html)} bajtów z {url}")
    elapsed = time.perf_counter() - start
    print(f"Czas: {elapsed:.2f}s")

if __name__ == "__main__":
    asyncio.run(main())

Całkowity czas wykonania zbliży się do najwolniejszego pojedynczego żądania, a nie do ich sumy. W realnym API różnica bywa drastyczna, gdy serwer musi pogadać z kilkoma mikrousługami na raz.

Grupowanie zadań: asyncio.gather, as_completed i limity równoległości

Przy większej liczbie zadań asyncio.create_task szybko zamienia się w bałagan. Na szczęście event loop ma wbudowane „kombajny”:

  • asyncio.gather – uruchamia zestaw korutyn równolegle i zwraca wyniki w oryginalnej kolejności,
  • asyncio.as_completed – zwraca wyniki w miarę kończenia się zadań, przydatne przy streamowaniu postępów.
async def fetch_many(urls: list[str]) -> list[str]:
    async with aiohttp.ClientSession() as session:
        coros = [fetch(session, url) for url in urls]
        return await asyncio.gather(*coros)

Przy dużej liczbie URL-i trzeba jednak ograniczyć równoległość, bo system (lub zewnętrzne API) się obrazi. Najprostszy sposób: asyncio.Semaphore.

sem = asyncio.Semaphore(20)

async def fetch_limited(session, url):
    async with sem:
        return await fetch(session, url)

async def fetch_many_limited(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch_limited(session, u)) for u in urls]
        return await asyncio.gather(*tasks)

W efekcie w locie działa maksymalnie 20 zapytań HTTP. Reszta grzecznie czeka w kolejce.

Anulowanie zadań i timeouty – nieśmiertelne źródło wycieków

Zadania async mają jeszcze jedną zaletę: można je odwołać. Jeżeli użytkownik przerwał request lub górny limit czasu minął, bez sensu dociągać wszystko do końca.

async def worker():
    try:
        await asyncio.sleep(10)
        print("Skończone")
    except asyncio.CancelledError:
        print("Anulowano worker")
        # sprzątanie, zamykanie połączeń itd.
        raise

async def main():
    task = asyncio.create_task(worker())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task faktycznie przerwany")

Timeouty ogarnia się wygodnie przez asyncio.wait_for lub narzędzia zewnętrzne (np. w httpx):

async def call_with_timeout():
    try:
        return await asyncio.wait_for(fetch_data(), timeout=2.0)
    except asyncio.TimeoutError:
        # logowanie, fallback, circuit breaker
        return None

Projektowanie kodu pod asynchroniczność: architektura, wzorce i antywzorce

Oddzielanie logiki biznesowej od warstwy I/O

Najczęstszy błąd przy „przerabianiu” projektu na async to wsadzenie async wszędzie. Dużo rozsądniej jest wyraźnie rozdzielić:

  • warstwę I/O – funkcje, które faktycznie wykonują operacje zewnętrzne (HTTP, baza, pliki),
  • czystą logikę biznesową – funkcje operujące na danych w pamięci.

Logika biznesowa powinna być możliwie „zwykła”, najlepiej testowalna bez event loopa. Warstwa I/O może być async (dla serwera) albo mieć wariant sync (dla batchowych skryptów). Przy okazji łatwiej później wymienić klienta HTTP czy bazę, bo nie jest wpleciona w każdy zakamarek kodu.

Interfejsy portów I/O: adaptery sync i async

Przydatny schemat to prosty interfejs dla konkretnego portu, np. „usługa wysyłająca maile”:

from abc import ABC, abstractmethod

class Mailer(ABC):
    @abstractmethod
    def send_welcome(self, email: str) -> None:
        ...

class AsyncMailer(ABC):
    @abstractmethod
    async def send_welcome(self, email: str) -> None:
        ...

Implementacje mogą używać innych bibliotek, ale reszta systemu zna tylko interfejs. Dzięki temu:

  • logikę biznesową można wstrzykiwać w wersji sync lub async,
  • testy mogą używać in-memory fake’ów,
  • migracja z sync na async nie wymaga przepisywania całej aplikacji za jednym zamachem.

Nie mieszaj warstw: „async wszędzie” vs „async na krawędziach”

Dwie główne strategie:

  • async na krawędziach – tylko endpointy HTTP i adaptery I/O są async; wnętrze aplikacji pozostaje synchroniczne,
  • async end-to-end – logika biznesowa również jest async (np. bo często odpala równoległe zapytania I/O).

Pierwsze podejście jest prostsze przy migracji legacy. Drugie ma większy potencjał wydajności, gdy warstwa biznesowa intensywnie gada z zewnętrznymi usługami. Jak zwykle: coś za coś.

Antywzorzec: „async jak dekoracja świąteczna”

Często spotykany kod:

async def get_user_with_orders(user_id: int):
    user = await get_user(user_id)     # <-- async
    orders = await get_orders(user_id) # <-- async
    return {"user": user, "orders": orders}

Na pierwszy rzut oka w porządku, ale obie operacje są zupełnie niezależne. Lepiej puścić je równolegle:

async def get_user_with_orders(user_id: int):
    user_task = asyncio.create_task(get_user(user_id))
    orders_task = asyncio.create_task(get_orders(user_id))
    user = await user_task
    orders = await orders_task
    return {"user": user, "orders": orders}

Albo od razu:

async def get_user_with_orders(user_id: int):
    user, orders = await asyncio.gather(
        get_user(user_id),
        get_orders(user_id),
    )
    return {"user": user, "orders": orders}

Różnica jest szczególnie widoczna przy bardziej rozgałęzionych wywołaniach mikrousług.

Antywzorzec: wywoływanie kodu blokującego prosto z korutyny

Jeżeli w środku korutyny znajdzie się blokujące:

  • time.sleep,
  • requests.get,
  • długi zapis do pliku / obróbka obrazu,

cała pętla zdarzeń stoi. Z punktu widzenia serwera HTTP – każdy klient czeka, aż ten jeden nieszczęsny fragment się skończy.

Wyjście: użyć run_in_executor lub dedykowanego klienta async. Szybki wrapper:

import asyncio
import requests

def fetch_sync(url: str) -> str:
    return requests.get(url).text

async def fetch_in_thread(url: str) -> str:
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, fetch_sync, url)

Na zewnątrz appka oferuje async API, w środku nadal może żyć trochę kodu „z epoki kamienia łupanego”.

Asynchroniczne I/O w praktyce: HTTP, bazy danych, pliki

HTTP: aiohttp, httpx i wzorce obsługi błędów

Kluczowe biblioteki:

  • aiohttp – dojrzała, szybka, bardzo elastyczna,
  • httpx – nowocześniejszy interfejs, sync + async w jednym, wygodne timeouty i retry.

Prosty klient API w httpx:

import httpx

class ApiClient:
    def __init__(self, base_url: str):
        self._client = httpx.AsyncClient(base_url=base_url, timeout=5.0)

    async def get_user(self, user_id: int) -> dict:
        resp = await self._client.get(f"/users/{user_id}")
        resp.raise_for_status()
        return resp.json()

    async def close(self):
        await self._client.aclose()

Przy większych systemach dochodzą retry i mechanizmy typu circuit breaker:

async def safe_get_user(client: ApiClient, user_id: int) -> dict | None:
    try:
        return await client.get_user(user_id)
    except httpx.RequestError as exc:
        # logowanie, metryki
        return None

Bazy danych: connection pooling i transakcje async

Dla większości popularnych baz istnieją już natywne biblioteki async:

  • PostgreSQL – asyncpg,
  • MySQL – np. aiomysql,
  • MongoDB – oficjalny driver ma async (motor),
  • Redis – redis-py ma wsparcie async.

Ważny element to connection pool. Trzymanie jednego połączenia na request to prosta droga do zajechania bazy.

import asyncpg

async def create_pool(dsn: str):
    return await asyncpg.create_pool(dsn, min_size=5, max_size=20)

async def get_user(pool, user_id: int):
    async with pool.acquire() as conn:
        row = await conn.fetchrow("SELECT * FROM users WHERE id=$1", user_id)
        return dict(row) if row else None

Transakcje:

async def transfer(pool, from_id: int, to_id: int, amount: int):
    async with pool.acquire() as conn:
        async with conn.transaction():
            await conn.execute(
                "UPDATE accounts SET balance = balance - $1 WHERE id=$2",
                amount, from_id
            )
            await conn.execute(
                "UPDATE accounts SET balance = balance + $1 WHERE id=$2",
                amount, to_id
            )

Bez transakcji przy współbieżnym obciążeniu szybko widać „dziury” w danych.

Pliki i system lokalny: kiedy async ma sens

Asynchroniczny dostęp do lokalnego systemu plików jest ograniczony przez API systemu operacyjnego. W CPythonie wiele operacji plikowych i tak wykonuje się w wątku, więc różnica bywa mniejsza niż przy HTTP lub bazie.

Jeżeli jednak:

  • masz dużo małych plików,
  • serwer HTTP streamuje upload/download,

asynchroniczny interfejs (np. aiofiles) porządkuje kod i ułatwia zarządzanie:

import aiofiles

async def read_file(path: str) -> str:
    async with aiofiles.open(path, mode="r") as f:
        return await f.read()

Przy cięższych operacjach typu kompresja, obróbka wideo i tak lepiej wrzucić to do osobnych procesów.

Jeśli chcesz pogłębić temat i zobaczyć więcej przykładów z tej niszy, zajrzyj na Sprzęt komputerowy i Oprogramowanie.

Kolejki zadań i messaging: integracja z asyncio

W świecie mikrousług królowa jest jedna: kolejka. Niezależnie czy to RabbitMQ, Kafka, czy Redis, model async świetnie pasuje do workerów obsługujących tysiące małych komunikatów.

Przykładowy „manualny” worker na Redisie (w uproszczeniu):

import asyncio
import aioredis

async def worker():
    redis = aioredis.from_url("redis://localhost")
    while True:
        _queue, payload = await redis.brpop("jobs")  # blokujące read, ale async
        asyncio.create_task(handle_job(payload))

async def handle_job(payload: bytes):
    # parsowanie JSON, praca I/O
    await asyncio.sleep(0)  # placeholder

asyncio.run(worker())

W realnym systemie wchodzi obsługa retry, backoff, deduplikacja. Sama konstrukcja z asyncio.create_task pozwala przetwarzać wiele jobów równolegle w jednym procesie.

Zbliżenie ekranu komputera z kolorowym kodem w edytorze programu
Źródło: Pexels | Autor: Mathews Jumba

Wątki i multiprocessing obok asyncio – kiedy mieszać modele

Delegowanie zadań CPU-bound do procesów

Pętla zdarzeń świetnie radzi sobie z I/O, ale na długie obliczenia reaguje alergicznie. Jeżeli w środku endpointu trzeba przeliczyć np. duży model ML albo wygenerować skomplikowany raport PDF, multiprocessing bywa nieunikniony.

Procesy obliczeniowe z asyncio: ProcessPoolExecutor i współdzielone struktury

Najprostszy sposób zepchnięcia ciężkich obliczeń z pętli zdarzeń to ProcessPoolExecutor. Pod spodem siedzi klasyczny multiprocessing, ale API ładnie wpasowuje się w asyncio.

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_heavy(x: int) -> int:
    # symulacja ciężkiego algorytmu
    s = 0
    for i in range(10_000_000):
        s += (i * x) % 17
    return s

async def handle_request(x: int) -> int:
    loop = asyncio.get_running_loop()
    # None => globalny executor procesów
    return await loop.run_in_executor(None, cpu_heavy, x)

Gdy ciężkie zadania są częste, lepiej jawnie stworzyć pulę:

executor = ProcessPoolExecutor(max_workers=4)

async def handle_request(x: int) -> int:
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(executor, cpu_heavy, x)

Kilka praktycznych uwag:

  • funkcja przekazywana do procesów musi być picklowalna (nie lambda z lokalnego scope, nie metoda instancji bez kombinacji),
  • inicjalizacja dużych modeli / słowników lepiej w if __name__ == "__main__" albo w hooku startowym procesu,
  • pula procesów to zasób – w żywych serwerach zamykaj ją przy shutdownie.

Oddzielny serwis obliczeniowy zamiast lokalnego multiprocessing

Gdy ciężkie zadania zaczynają dominować, rozdzielenie serwera HTTP i „kalkulatora” pomaga zachować porządek:

  • serwer HTTP – lekki, obsługuje I/O, kolejkuje zadania,
  • worker CPU – osobny proces / kontener, odbiera joby z kolejki (np. Redis, RabbitMQ) i mieli procesor.

Z punktu widzenia kodu async scenariusz wygląda tak:

  1. endpoint FastAPI wrzuca komunikat do kolejki,
  2. worker (często już nie-async, bo i tak siedzi w CPU) wykonuje zadanie,
  3. wynik odkłada do cache / bazy,
  4. HTTP zwraca od razu ID joba, a klient dopytuje o status.

Znikają problemy z GIL-em, limitami RAM jednego procesu i restartami serwera przy większych rolloutach. Kosztem jest oczywiście dodatkowa infrastruktura.

Łączenie asyncio z ThreadPoolExecutor

Zadania I/O, dla których nie ma async API (stare biblioteki, sterowniki), da się „odasynchronicznić” przy użyciu wątków:

import asyncio
import time

def legacy_io_op(path: str) -> str:
    time.sleep(2)  # blokujące czekanie na I/O
    return f"data from {path}"

async def call_legacy(path: str) -> str:
    loop = asyncio.get_running_loop()
    return await loop.run_in_executor(None, legacy_io_op, path)

W odróżnieniu od procesów, wątki dzielą pamięć z głównym procesem, więc:

  • łatwiej przekazywać złożone obiekty (bez picklowania),
  • ryzyko deadlocków przy źle użytych lockach rośnie w tempie wykładniczym.

Sensowny kompromis to trzymanie puli wątków tylko dla konkretnej biblioteki (np. sterownik SOAP) i dokładna obserwacja metryk czasu odpowiedzi. Gdy wątki zaczną „puchnąć” – to sygnał do migrowania na natywne async.

Modele hybrydowe: pętla zdarzeń per proces

Na produkcji często kończy się z układem:

  • kilka procesów workers (np. uvicorn + gunicorn),
  • w każdym procesie własna pętla zdarzeń asyncio,
  • w środku wątki lub procesy pomocnicze (executory).

Każdy proces odpowiada za określoną część ruchu (inny port, inny shard, inny tenancy). Priorytet: nie wpuszczać CPU-bound do głównej pętli – tam tylko I/O, delegacje i sklejanie wyników.

Async w popularnych frameworkach webowych (FastAPI, Django, Flask)

FastAPI: async jako „tryb domyślny”

FastAPI zostało zaprojektowane pod async od początku. Każdy endpoint może być async def lub zwykłym def. Silnik ASGI (uvicorn / hypercorn) rozróżnia oba przypadki i odpowiednio je obsługuje.

from fastapi import FastAPI

app = FastAPI()

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    user = await fetch_user_from_db(user_id)
    return user

@app.get("/health")
def healthcheck():
    # prosta, synchroniczna funkcja
    return {"status": "ok"}

Kilka praktyk, które ratują skórę przy większym ruchu:

  • wszystkie długie wywołania HTTP/DB – wyłącznie async (httpx, asyncpg, itp.),
  • ciężkie zadania CPU – delegowane do procesów / kolejki, nie wykonywane w endpointach,
  • zasoby współdzielone (connection pool, klienci HTTP) inicjalizowane w eventach startup / shutdown.
@app.on_event("startup")
async def startup():
    app.state.db_pool = await asyncpg.create_pool(dsn="...")

@app.on_event("shutdown")
async def shutdown():
    await app.state.db_pool.close()

Dzięki temu każdy request korzysta z tych samych, długo żyjących obiektów, a GC nie dostaje ataku paniki przy tysiącach połączeń.

Django: przejście z WSGI na ASGI i widoki async

Django przez lata było czysto synchroniczne. Od wersji 3.0 wchodzi wsparcie dla ASGI i async views, ale ekosystem jeszcze długo będzie mieszanką stylów.

Typowy widok async:

from django.http import JsonResponse
from django.views import View

class UserView(View):
    async def get(self, request, user_id):
        user = await get_user_async(user_id)
        return JsonResponse(user)

Pułapka: ORM Django nadal jest synchroniczny (poza eksperymentalnymi gałęziami). Wywołanie User.objects.get() w widoku async zablokuje pętlę zdarzeń. Rozwiązania są zasadniczo dwa:

Jeśli interesują Cię konkrety i przykłady, rzuć okiem na: Algorytmiczne wykrywanie mowy nienawiści – skuteczność a wolność słowa.

  • traktować Django jako warstwę sync i trzymać async „na krawędziach” (np. w zewnętrznych serwisach),
  • opakować ORM w wątki za pomocą sync_to_async.
from asgiref.sync import sync_to_async
from myapp.models import User

async def get_user(user_id: int):
    return await sync_to_async(User.objects.get)(pk=user_id)

To nie jest magiczny przyspieszacz – po prostu przenosi blokujące zapytania do osobnych wątków, żeby pętla zdarzeń nie wisiała. Dobrze sprawdza się przy stopniowej migracji dużych monolitów.

Flask: „z natury sync”, ale da się współżyć z async

Flask projektowano w czasach WSGI. Dopiero nowsze wersje dodają wsparcie dla endpointów async def. W praktyce:

  • Flask i tak wywoła korutynę jak zwykłą funkcję,
  • za prawdziwy „async gain” odpowiada dopiero uruchomienie w ASGI (np. Quart, orjson + hypercorn, itp.),
  • w wielu przypadkach sensowniej zostawić Flaska w pełni sync, a async wynieść do dedykowanego serwisu.
from flask import Flask, jsonify

app = Flask(__name__)

@app.get("/data")
async def data():
    # w klasycznym WSGI to "async" jest głównie ozdobne
    result = await fetch_async()
    return jsonify(result)

Jeżeli projekt startuje od zera i ma mieć duże obciążenie I/O, łatwiej od razu sięgnąć po FastAPI / Starlette niż walczyć z pół-async w Flasku.

ASGI jako wspólny mianownik

FastAPI, nowe Django, Starlette, a nawet niektóre frameworki RPC – wszystko to bazuje na ASGI, czyli „async WSGI”. Dzięki wspólnemu protokołowi:

  • łatwiej mieszać frameworki (np. montować aplikację Starlette pod ścieżką w Django),
  • serwery takie jak uvicorn / hypercorn obsługują różne aplikacje w ten sam sposób,
  • middleware (logowanie, metryki, autoryzacja) można pisać raz.
async def simple_asgi_app(scope, receive, send):
    assert scope["type"] == "http"
    await send({
        "type": "http.response.start",
        "status": 200,
        "headers": [(b"content-type", b"text/plain")],
    })
    await send({
        "type": "http.response.body",
        "body": b"Hello, async world!",
    })

Znajomość ASGI pomaga zrozumieć, co się dzieje „pod spodem” frameworka, gdy request wędruje przez stos middlewarów do widoku i z powrotem.

Typowe błędy i pułapki w asynchroniczności i współbieżności

Blokowanie pętli zdarzeń przez przypadek

Najczęstszy grzech główny: w środku korutyny ląduje blokująca operacja, bo „to tylko mały kawałek, nic się nie stanie”. Po czym w logach pojawiają się timeouty, a CPU stoi na 5%.

Podejrzane miejsca:

  • stare biblioteki HTTP (requests),
  • duże operacje na plikach (zip, PDF, obrazy),
  • ciężkie obliczenia w Pandas / NumPy wołane prosto z endpointu.

Szybki detektor to metryka event loop lag – różnica między planowanym a faktycznym czasem wykonania prostego zadania (np. loop.call_later). Gdy zaczyna rosnąć powyżej pojedynczych milisekund, coś blokuje pętlę.

Zapominanie o await – „martwe korutyny”

Kolejna klasyka:

async def send_email():
    ...

def handler():
    send_email()  # zapomniane `await`

W efekcie:

  • korutyna nie jest wykonywana,
  • Python wyrzuca ostrzeżenie RuntimeWarning: coroutine '...' was never awaited,
  • logika biznesowa „magicznie” nie działa.

Rozwiązania:

  • trzymać się zasady: korutyna wołana tylko z await lub przez asyncio.create_task,
  • w kodzie sync – korzystać z helperów typu anyio.from_thread.run albo dedykowanych adapterów frameworka (np. sync_to_async).

Tworzenie tasków bez nadzoru

asyncio.create_task to potężne narzędzie. A potężne narzędzia lubią boleśnie gryźć.

async def fire_and_forget():
    asyncio.create_task(do_something())  # i nigdy nie sprawdzamy wyniku

Jeżeli do_something rzuci wyjątek, trafi on do logów lub – w starszych wersjach Pythona – przeleci prawie niezauważony. Dodatkowo trudno kontrolować liczbę równoległych zadań.

Prostszy, bezpieczniejszy wariant to własny „scheduler”:

async def run_limited(coros, limit: int = 10):
    sem = asyncio.Semaphore(limit)

    async def worker(coro):
        async with sem:
            return await coro

    tasks = [asyncio.create_task(worker(c)) for c in coros]
    return await asyncio.gather(*tasks)

Każde zadanie jest:

  • objęte limitem równoległości,
  • zwracane do wywołującego – można zareagować na wyjątek.

Wycieki połączeń i sesji – brak zamykania zasobów

Async ułatwia równoległe I/O, ale także przyspiesza „wysysanie” limitów zewnętrznych usług. Najczęstsze objawy:

  • niezamknięte httpx.AsyncClient / aiohttp.ClientSession,
  • zapomniane pool.release / brak async with pool.acquire(),
  • uchwyty do plików otwierane bez kontekstu (aiofiles.open).

Dobra praktyka to:

  • używanie async with dosłownie wszędzie, gdzie jest dostępne,
  • trzymanie długowiecznych klientów (HTTP, DB) na poziomie aplikacji, nie per-request,
  • testy „ciśnieniowe” na devie – np. kilkaset równoległych requestów i obserwacja liczby połączeń do bazy.

Wyścigi danych: współdzielone struktury bez synchronizacji

Async nie zwalnia z myślenia o współdzielonym stanie. Dwa taski modyfikujące ten sam słownik / listę nadal mogą wywołać katastrofę logiczną, nawet jeśli GIL stoi na straży spójności pamięci.

counter = 0

async def inc():
    global counter
    tmp = counter
    await asyncio.sleep(0)  # przełączenie kontekstu
    counter = tmp + 1

Przy wielu wywołaniach licznik skończy z błędną wartością. Rozwiązania:

  • asyncio.Lock wokół modyfikacji,
  • model „aktorów” – jeden task zarządza danym stanem, inni tylko wysyłają mu komunikaty,
  • unikanie współdzielonego mutable state w miarę możliwości (np. zamiast tego baza / cache).
  • Najczęściej zadawane pytania (FAQ)

    Jak sprawdzić, czy moja aplikacja Python jest I/O-bound czy CPU-bound?

    Najprościej: uruchom profilowanie i popatrz, gdzie proces faktycznie spędza czas. Jeśli większość czasu to czekanie na odpowiedzi z sieci, bazy danych, systemu plików – masz typowy przypadek I/O-bound. Jeśli natomiast CPU jest „przyklejony” na 90–100% i główne funkcje to przetwarzanie danych, pętle, obliczenia, to problem jest CPU-bound.

    W praktyce pomaga kilka prostych testów: jeśli po zwiększeniu liczby jednoczesnych zapytań (np. w locust/jmeter) serwer szybko przestaje wyrabiać, ale CPU jest stosunkowo luźne – najpewniej blokuje I/O. Jeśli za to każdy request długo „mieli” i użycie CPU rośnie proporcjonalnie do obciążenia, wąskim gardłem jest moc obliczeniowa.

    Kiedy w Pythonie używać asyncio, a kiedy multiprocessing?

    asyncio opłaca się, gdy obsługujesz dużo I/O: API, bazy danych, websockety, kolejki, komunikację między usługami. Gdy logika biznesowa polega głównie na „pobierz coś z kilku miejsc, sklej i zwróć”, model asynchroniczny potrafi dramatycznie skrócić czas odpowiedzi dzięki równoległym zapytaniom.

    multiprocessing ma sens, gdy kod jest CPU-bound: kompresja, przetwarzanie obrazów, ML, ciężkie agregacje. Wtedy każdy proces używa osobnego rdzenia i faktycznie zyskujesz równoległość. Próba „przyspieszenia” takiego kodu samym asyncio zwykle kończy się wyłącznie bardziej skomplikowanym kodem bez realnego zysku.

    Czy asynchroniczność w Pythonie omija GIL i przyspiesza obliczenia CPU?

    Nie. asyncio nie omija GIL w tym sensie, że cały kod Pythona wciąż wykonuje się w jednym wątku. Asynchroniczność tylko lepiej wykorzystuje czas oczekiwania na I/O, przełączając się między zadaniami. Jeśli funkcja intensywnie liczy i nie robi żadnego await, blokuje całą pętlę zdarzeń.

    GIL jest realnym problemem dla zadań CPU-bound w wątkach, ale nie przeszkadza aż tak dla I/O-bound: wiele operacji I/O zwalnia GIL na czas czekania. Do przyspieszania obliczeń CPU używa się procesów (multiprocessing), kodu natywnego (C/C++, numba), bibliotek numerycznych (NumPy, PyTorch), które potrafią pracować poza GIL.

    Czy w Pythonie lepiej używać wątków czy async do I/O?

    Dla nowego kodu, który ma obsługiwać dużo połączeń równocześnie, zwykle lepszy jest model async (asyncio, FastAPI, aiohttp). Mniej narzutu na przełączanie, łatwiejsza kontrola nad przepływem zadań i bardzo dobra skalowalność przy dużej liczbie klientów.

    Wątki są przydatne, gdy masz biblioteki blokujące, bez wersji async (np. klient do jakiejś bazy lub zewnętrznego API) albo gdy przerabiasz istniejący kod i nie chcesz przepisywać wszystkiego na async. Typowy wzorzec: główny serwer async, a fragmenty blokujące owijane w ThreadPoolExecutor.

    Jakie są najczęstsze błędy przy przechodzeniu na async w Pythonie?

    Najbardziej klasyczne wpadki to:

  • mieszanie time.sleep() z kodem async (blokuje całą pętlę zdarzeń zamiast oddać sterowanie),
  • wołanie synchronicznych, blokujących klientów HTTP/DB bezpośrednio z async def,
  • dodanie async/await „na ślepo” bez analizy, które operacje faktycznie są I/O-bound.

Dobrym testem jest pytanie: „czy w tym miejscu czekam na coś z zewnątrz, czy tylko liczę?”. Jeśli czekasz – szukaj wersji async lub przenieś to do wątku. Jeśli liczysz – rozważ procesy albo optymalizację samego algorytmu, zamiast dorzucać kolejne await.

Czy asynchroniczne API (np. FastAPI) zawsze będzie szybsze niż klasyczny serwer WSGI?

Nie zawsze. Jeśli Twoje API robi głównie ciężkie obliczenia CPU w ramach jednego requestu, to sam fakt użycia async niewiele zmieni. W takim scenariuszu kluczowe jest skalowanie po procesach/workerach, a nie model asynchroniczny.

Asynchroniczne frameworki świecą, gdy każdy request wykonuje sporo zapytań do innych usług lub bazy, a pojedynczy worker ma obsłużyć dużą liczbę równoległych połączeń. Wtedy ten sam serwer może obsłużyć znacznie więcej klientów bez mnożenia procesów jak króliki.

Jak w praktyce podejść do optymalizacji wydajności w Pythonie krok po kroku?

Po pierwsze: zmierz, gdzie ucieka czas – proste profilery, logowanie czasu poszczególnych kroków, metryki z APM. Po drugie: rozdziel fragmenty I/O-bound i CPU-bound. Po trzecie: zobacz, co da się wykonywać równolegle zamiast sekwencyjnie.

Typowy scenariusz wygląda tak: łączysz kilka zapytań do usług/bazy w jedno, wprowadzasz async do obsługi I/O, cięższe obliczenia przenosisz do procesów lub osobnego serwisu, a na końcu dodajesz cache tam, gdzie dane nie muszą być liczone za każdym razem. Brzmi jak trochę roboty, ale efekt bywa lepszy niż „dokupmy dwa serwery i zobaczymy”.

Co warto zapamiętać

  • Wydajność rzadko zabija jeden „wielki” problem – zwykle spowalnia miks blokującego I/O, ciężkich obliczeń CPU i nieprzemyślanej architektury (sekwencyjne wywołania, brak cache, zbędne czekanie).
  • Kluczowe jest rozróżnienie zadań I/O-bound i CPU-bound: asynchroniczność świetnie maskuje opóźnienia sieci/dysku, ale nie przyspieszy kodu, który głównie mieli dane na procesorze.
  • Asynchroniczność (asyncio, biblioteki async-first) najbardziej opłaca się przy wielu krótkich zapytaniach HTTP/DB i dużej liczbie jednoczesnych klientów – np. gdy zamiast pytać trzy usługi po kolei, odpytujesz je równolegle.
  • Przy zadaniach CPU-bound (np. kompresja, przetwarzanie obrazów, trenowanie modeli ML) sensowniejsze jest użycie multiprocessing, bibliotek numerycznych w C/NumPy albo wydzielenie osobnych usług, bo samo „async” nie odblokuje dodatkowych rdzeni.
  • GIL blokuje równoległe wykonywanie Pythonowego bytecode’u w wielu wątkach, ale nie uniemożliwia równoległego I/O ani pracy bibliotek, które zwalniają GIL; wątki nadal mogą poprawić responsywność przy zadaniach I/O-bound.
  • asyncio daje współbieżność (sprytne przełączanie zadań w jednym wątku), multiprocessing – prawdziwą równoległość na wielu rdzeniach, a threading jest użyteczną hybrydą dla niektórych scenariuszy I/O, choć pozostaje pod butem GIL przy obliczeniach CPU.
Poprzedni artykułJak ukryć sumy częściowe i sumy końcowe w tabeli przestawnej, gdy przeszkadzają
Tadeusz Borkowski
Tadeusz Borkowski specjalizuje się w pracy z danymi i funkcjami Excela, szczególnie tam, gdzie liczy się precyzja obliczeń i spójność wyników. W NaukaExcel.pl tworzy instrukcje oparte na praktycznych przypadkach: rozliczeniach, zestawieniach i analizach porównawczych. Zanim opublikuje materiał, weryfikuje formuły na danych skrajnych, sprawdza zachowanie przy pustych komórkach i opisuje konsekwencje błędnych założeń. Stawia na odpowiedzialne podejście: jasne definicje, kontrolę błędów i rozwiązania, które da się utrzymać w dłuższym czasie.