フリーキーズ | 独学プログラミング

Pythonのthreading入門!マルチスレッド処理で効率的なプログラムを作ろう

リンドくん

リンドくん

たなべ先生、Pythonでプログラムを作ってるんですけど、時間のかかる処理があると画面が固まっちゃうんです...

たなべ

たなべ

あー、それはよくある悩みだね!
そんなときにマルチスレッド処理を使うと、複数の作業を同時に進められるんだよ。Pythonのthreadingモジュールを使えば、意外と簡単に実現できるんだ。

プログラミングを学んでいると、「処理に時間がかかって画面が固まってしまう」「複数の作業を同時に進めたい」といった場面に遭遇することがあるのではないでしょうか?

そんなときに活躍するのがマルチスレッド処理です。
Pythonではthreadingモジュールを使うことで、複数の処理を並行して実行できるようになります。

この記事では、threading初心者の方でも理解できるよう、基本概念から実践的な使い方まで、段階的に解説していきます。

プログラミング学習でお悩みの方へ

HackATAは、エンジニアを目指す方のためのプログラミング学習コーチングサービスです。 経験豊富な現役エンジニアがあなたの学習をサポートします。

✓ 質問し放題

✓ β版公開中(2025年内の特別割引)

HackATAの詳細を見る

なぜマルチスレッド処理が必要なのか?

リンドくん

リンドくん

そもそも、なぜマルチスレッドって必要なんですか?普通に上から順番に処理すればいいんじゃ...

たなべ

たなべ

いい質問だね!例えば、大きなファイルをダウンロードしながら、同時に他の作業もしたい場合を考えてみよう。
従来の方法だと、ダウンロードが終わるまで他の処理が一切できないんだ。

従来の処理方式の問題点

通常のPythonプログラムはシーケンシャル(順次)処理で動作します。
つまり、一つの処理が完了するまで次の処理に進めません。

import time

def long_task(name):
    print(f"{name}の処理を開始...")
    time.sleep(3)  # 3秒間の重い処理をシミュレート
    print(f"{name}の処理が完了!")

# 従来の方式
print("=== 従来の処理方式 ===")
start_time = time.time()

long_task("タスク1")
long_task("タスク2") 
long_task("タスク3")

end_time = time.time()
print(f"総実行時間: {end_time - start_time:.2f}秒")

このコードを実行すると、約9秒かかってしまいます。それぞれの処理が順番に実行されるためです。

マルチスレッドによる解決

マルチスレッド処理を使用すると、複数の処理を並行して実行できます。

import threading
import time

def long_task(name):
    print(f"{name}の処理を開始...")
    time.sleep(3)
    print(f"{name}の処理が完了!")

# マルチスレッド方式
print("=== マルチスレッド処理方式 ===")
start_time = time.time()

# スレッドを作成
thread1 = threading.Thread(target=long_task, args=("タスク1",))
thread2 = threading.Thread(target=long_task, args=("タスク2",))
thread3 = threading.Thread(target=long_task, args=("タスク3",))

# スレッドを開始
thread1.start()
thread2.start()
thread3.start()

# すべてのスレッドの完了を待機
thread1.join()
thread2.join()
thread3.join()

end_time = time.time()
print(f"総実行時間: {end_time - start_time:.2f}秒")

このコードでは、3つの処理が同時に実行されるため、短い時間で完了します。
簡単に高速化が実現できるのです!

マルチスレッド処理のメリットは以下の通りです。

  • 処理時間の短縮 - 複数の処理を並行実行できる
  • ユーザビリティの向上 - 重い処理中でも画面が固まらない
  • リソースの有効活用 - CPUやI/Oの待機時間を無駄にしない

threadingモジュールの基本的な使い方

リンドくん

リンドくん

マルチスレッドのメリットは分かりました!でも、実際にはどう書けばいいんでしょうか?

たなべ

たなべ

threadingモジュールの基本は意外とシンプルなんだ。
スレッドの作成、開始、完了待機という3つのステップを覚えておけば大丈夫だよ。

スレッドの基本的な作成方法

Pythonでスレッドを作成する方法は主に2つあります。

方法1 関数を指定してスレッド作成

import threading
import time

def worker_function(worker_id, work_time):
    print(f"ワーカー{worker_id}が作業開始")
    time.sleep(work_time)
    print(f"ワーカー{worker_id}が作業完了")

# スレッドを作成
thread = threading.Thread(target=worker_function, args=(1, 2))

# スレッドを開始
thread.start()

# メインスレッドで他の処理を実行
print("メインスレッドで別の作業中...")
time.sleep(1)
print("メインスレッドの作業完了")

# スレッドの完了を待機
thread.join()
print("すべての処理が完了しました")

方法2 Threadクラスを継承してスレッド作成

import threading
import time

class WorkerThread(threading.Thread):
    def __init__(self, worker_id, work_time):
        super().__init__()
        self.worker_id = worker_id
        self.work_time = work_time
    
    def run(self):
        print(f"ワーカー{self.worker_id}が作業開始")
        time.sleep(self.work_time)
        print(f"ワーカー{self.worker_id}が作業完了")

# スレッドオブジェクトを作成
worker = WorkerThread(1, 2)

# スレッドを開始
worker.start()

# スレッドの完了を待機
worker.join()

重要なメソッドの説明

threadingモジュールでよく使う重要なメソッドを押さえておきましょう。

  • start() - スレッドの実行を開始します
  • join() - スレッドの完了を待機します
  • is_alive() - スレッドが実行中かどうかを確認します
import threading
import time

def background_task():
    for i in range(5):
        print(f"バックグラウンド処理: {i+1}/5")
        time.sleep(1)

# スレッドを作成・開始
thread = threading.Thread(target=background_task)
thread.start()

# スレッドの状態を確認
while thread.is_alive():
    print("まだ処理中です...")
    time.sleep(0.5)

print("バックグラウンド処理が完了しました!")

このように、threadingモジュールの基本的な使い方は非常にシンプルです。
作成 → 開始 → 待機という流れを覚えておけば、様々な場面で応用できます。

実践的なマルチスレッド処理の例

リンドくん

リンドくん

基本は分かったんですけど、実際のプログラムではどんな場面で使うんですか?

たなべ

たなべ

いい質問だね!よくあるのはWebからのデータ取得ファイル処理なんだ。
実際に役立つ例を見てみよう。

例1 複数のWebサイトから同時にデータを取得

import threading
import requests
import time

def fetch_website(url, results, index):
    """指定されたURLからデータを取得する関数"""
    try:
        print(f"取得開始: {url}")
        response = requests.get(url, timeout=5)
        results[index] = {
            'url': url,
            'status': response.status_code,
            'size': len(response.content)
        }
        print(f"取得完了: {url} (ステータス: {response.status_code})")
    except Exception as e:
        print(f"エラー: {url} - {str(e)}")
        results[index] = {'url': url, 'error': str(e)}

def fetch_multiple_websites(urls):
    """複数のWebサイトから並行してデータを取得"""
    results = [None] * len(urls)
    threads = []
    
    start_time = time.time()
    
    # 各URLに対してスレッドを作成
    for i, url in enumerate(urls):
        thread = threading.Thread(target=fetch_website, args=(url, results, i))
        threads.append(thread)
        thread.start()
    
    # すべてのスレッドの完了を待機
    for thread in threads:
        thread.join()
    
    end_time = time.time()
    
    print(f"\n=== 結果 ===")
    for result in results:
        if result and 'error' not in result:
            print(f"{result['url']}: {result['size']}バイト")
        elif result:
            print(f"{result['url']}: エラー - {result['error']}")
    
    print(f"総実行時間: {end_time - start_time:.2f}秒")

# 使用例
urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2', 
    'https://httpbin.org/delay/1'
]

fetch_multiple_websites(urls)

例2 ファイル処理のマルチスレッド化

import threading
import os
import time

def process_file(filename, results, lock):
    """ファイルを処理する関数(例:行数をカウント)"""
    try:
        if not os.path.exists(filename):
            # デモ用にファイルを作成
            with open(filename, 'w', encoding='utf-8') as f:
                for i in range(1000):
                    f.write(f"これは{filename}の{i+1}行目です。\n")
        
        # ファイルの行数をカウント
        with open(filename, 'r', encoding='utf-8') as f:
            line_count = sum(1 for line in f)
        
        # 結果を安全に保存(ロックを使用)
        with lock:
            results.append({
                'filename': filename,
                'lines': line_count,
                'status': 'success'
            })
            print(f"処理完了: {filename} ({line_count}行)")
    
    except Exception as e:
        with lock:
            results.append({
                'filename': filename,
                'error': str(e),
                'status': 'error'
            })
            print(f"エラー: {filename} - {str(e)}")

def process_multiple_files(filenames):
    """複数のファイルを並行処理"""
    results = []
    threads = []
    lock = threading.Lock()  # スレッド間でのデータ共有を安全にする
    
    start_time = time.time()
    
    # 各ファイルに対してスレッドを作成
    for filename in filenames:
        thread = threading.Thread(target=process_file, args=(filename, results, lock))
        threads.append(thread)
        thread.start()
    
    # すべてのスレッドの完了を待機
    for thread in threads:
        thread.join()
    
    end_time = time.time()
    
    print(f"\n=== 処理結果 ===")
    total_lines = 0
    for result in results:
        if result['status'] == 'success':
            print(f"{result['filename']}: {result['lines']}行")
            total_lines += result['lines']
        else:
            print(f"{result['filename']}: エラー")
    
    print(f"総行数: {total_lines}行")
    print(f"総実行時間: {end_time - start_time:.2f}秒")
    
    # 作成したファイルを削除
    for filename in filenames:
        if os.path.exists(filename):
            os.remove(filename)

# 使用例
filenames = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']
process_multiple_files(filenames)

例3 リアルタイム処理のシミュレーション

import threading
import time
import random
from datetime import datetime

class DataProcessor:
    def __init__(self):
        self.is_running = False
        self.data_queue = []
        self.lock = threading.Lock()
    
    def data_generator(self):
        """データを生成し続けるスレッド"""
        while self.is_running:
            data = f"データ_{datetime.now().strftime('%H%M%S')}_{random.randint(1, 100)}"
            with self.lock:
                self.data_queue.append(data)
                print(f"データ生成: {data}")
            time.sleep(1)
    
    def data_consumer(self):
        """データを処理し続けるスレッド"""
        while self.is_running:
            with self.lock:
                if self.data_queue:
                    data = self.data_queue.pop(0)
                    print(f"データ処理: {data}")
            time.sleep(1.5)  # 処理時間をシミュレート
    
    def start_processing(self, duration=10):
        """処理を開始"""
        self.is_running = True
        
        # データ生成スレッドを開始
        generator_thread = threading.Thread(target=self.data_generator)
        generator_thread.start()
        
        # データ処理スレッドを開始
        consumer_thread = threading.Thread(target=self.data_consumer)
        consumer_thread.start()
        
        # 指定時間後に停止
        time.sleep(duration)
        self.is_running = False
        
        # スレッドの終了を待機
        generator_thread.join()
        consumer_thread.join()
        
        print(f"\n処理完了。残りデータ数: {len(self.data_queue)}")

# 使用例
processor = DataProcessor()
processor.start_processing(duration=8)

これらの例では、実際の開発現場でよく遭遇する場面でのマルチスレッド活用法を示しています。
Webからのデータ取得、ファイル処理、リアルタイム処理など、様々な用途でthreadingモジュールを活用できることが分かるのではないでしょうか?

スレッド間でのデータ共有と同期処理

リンドくん

リンドくん

複数のスレッドが同じデータを触るときって、何か気をつけることはありますか?

たなべ

たなべ

とてもいい質問だね!これはスレッドセーフティという重要な概念なんだ。
複数のスレッドが同じデータを変更すると、予期しない結果になることがあるんだよ。

データ競合の問題

複数のスレッドが同じ変数を同時に変更しようとすると、データ競合(レースコンディション)と呼ばれる問題が発生する可能性があります。

import threading
import time

# 危険な例データ競合が発生する可能性
shared_counter = 0

def unsafe_increment():
    global shared_counter
    for _ in range(100000):
        shared_counter += 1  # 複数スレッドが同時に実行すると問題

# 複数スレッドで実行
threads = []
for i in range(5):
    thread = threading.Thread(target=unsafe_increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"期待値: 500000, 実際の値: {shared_counter}")
# 実際の値は期待値より小さくなることが多い

Lockを使った安全なデータ共有

この問題を解決するために、threading.Lock()を使用します。

import threading
import time

# 安全な例:Lockを使用
shared_counter = 0
counter_lock = threading.Lock()

def safe_increment():
    global shared_counter
    for _ in range(100000):
        with counter_lock:  # ロックを取得
            shared_counter += 1  # この部分は一度に1つのスレッドだけが実行

# 複数スレッドで実行
threads = []
for i in range(5):
    thread = threading.Thread(target=safe_increment)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print(f"期待値: 500000, 実際の値: {shared_counter}")
# 正確に500000になる

より高レベルな同期オブジェクト

threadingモジュールには、より高度な同期機能も用意されています。

Event スレッド間でのシグナル伝達

import threading
import time
import random

def waiter(event, name):
    """イベントを待機するスレッド"""
    print(f"{name}が待機開始...")
    event.wait()  # イベントがセットされるまで待機
    print(f"{name}が処理を開始!")

def setter(event):
    """イベントをセットするスレッド"""
    sleep_time = random.randint(3, 6)
    print(f"準備中...({sleep_time}秒後に開始)")
    time.sleep(sleep_time)
    print("準備完了!全スレッドに開始信号を送信")
    event.set()  # イベントをセット

# イベントオブジェクトを作成
start_event = threading.Event()

# 待機スレッドを複数作成
waiters = []
for i in range(3):
    thread = threading.Thread(target=waiter, args=(start_event, f"スレッド{i+1}"))
    waiters.append(thread)
    thread.start()

# セッタースレッドを作成
setter_thread = threading.Thread(target=setter, args=(start_event,))
setter_thread.start()

# すべてのスレッドの完了を待機
setter_thread.join()
for thread in waiters:
    thread.join()

Queue スレッド間でのデータ受け渡し

import threading
import queue
import time
import random

def producer(q, producer_id):
    """データを生成してキューに追加"""
    for i in range(5):
        item = f"Producer{producer_id}_Item{i+1}"
        q.put(item)
        print(f"生産: {item}")
        time.sleep(random.uniform(0.5, 1.5))
    
    # 終了シグナル
    q.put(None)

def consumer(q, consumer_id):
    """キューからデータを取得して処理"""
    while True:
        item = q.get()
        if item is None:
            q.put(None)  # 他のコンシューマーのために終了シグナルを戻す
            break
        
        print(f"消費者{consumer_id}が処理: {item}")
        time.sleep(random.uniform(0.8, 1.2))
        q.task_done()

# キューを作成
data_queue = queue.Queue()

# プロデューサーとコンシューマーを作成
producer_thread = threading.Thread(target=producer, args=(data_queue, 1))
consumer1 = threading.Thread(target=consumer, args=(data_queue, 1))
consumer2 = threading.Thread(target=consumer, args=(data_queue, 2))

# スレッドを開始
producer_thread.start()
consumer1.start()
consumer2.start()

# すべてのスレッドの完了を待機
producer_thread.join()
consumer1.join()
consumer2.join()

print("すべての処理が完了しました")

これらの同期機能を適切に使用することで、安全で効率的なマルチスレッドプログラムを作成できます。

注意点とベストプラクティス

リンドくん

リンドくん

マルチスレッドって便利ですけど、何か注意することはありますか?

たなべ

たなべ

そうだね、マルチスレッドには落とし穴もあるんだ。
特にPythonではGIL(Global Interpreter Lock)という制限があったり、デッドロックという問題もあるから気をつけないといけないよ。

PythonのGIL(Global Interpreter Lock)について

PythonにはGILという仕組みがあり、CPUバウンドな処理では期待したほどの性能向上が得られない場合があります。

import threading
import time

def cpu_bound_task(n):
    """CPUを集中的に使う処理"""
    result = 0
    for i in range(n):
        result += i * i
    return result

def io_bound_task():
    """I/Oを待つ処理"""
    time.sleep(2)  # I/O待機をシミュレート
    return "IO完了"

print("=== CPU集約的処理の比較 ===")
# シングルスレッド
start = time.time()
for _ in range(4):
    cpu_bound_task(1000000)
single_time = time.time() - start

# マルチスレッド
start = time.time()
threads = []
for _ in range(4):
    thread = threading.Thread(target=cpu_bound_task, args=(1000000,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()
multi_time = time.time() - start

print(f"シングルスレッド: {single_time:.2f}秒")
print(f"マルチスレッド: {multi_time:.2f}秒")
print(f"改善率: {single_time/multi_time:.2f}倍")

print("\n=== I/O集約的処理の比較 ===")
# シングルスレッド
start = time.time()
for _ in range(4):
    io_bound_task()
single_io_time = time.time() - start

# マルチスレッド
start = time.time()
threads = []
for _ in range(4):
    thread = threading.Thread(target=io_bound_task)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()
multi_io_time = time.time() - start

print(f"シングルスレッド: {single_io_time:.2f}秒")
print(f"マルチスレッド: {multi_io_time:.2f}秒")
print(f"改善率: {single_io_time/multi_io_time:.2f}倍")

重要なポイント

  • I/Oバウンドな処理(ファイル読み書き、ネットワーク通信など)ではマルチスレッドが効果的
  • CPUバウンドな処理では、multiprocessingモジュールの使用を検討する

デッドロックの回避

複数のロックを使用する際は、デッドロックに注意が必要です。

import threading
import time

# 危険な例:デッドロックが発生する可能性
lock1 = threading.Lock()
lock2 = threading.Lock()

def task1():
    with lock1:
        print("Task1がlock1を取得")
        time.sleep(0.1)
        with lock2:
            print("Task1がlock2を取得")

def task2():
    with lock2:
        print("Task2がlock2を取得")
        time.sleep(0.1)
        with lock1:
            print("Task2がlock1を取得")

# 安全な例:ロックの順序を統一
def safe_task1():
    with lock1:
        print("SafeTask1がlock1を取得")
        time.sleep(0.1)
        with lock2:
            print("SafeTask1がlock2を取得")

def safe_task2():
    with lock1:  # 同じ順序でロックを取得
        print("SafeTask2がlock1を取得")
        time.sleep(0.1)
        with lock2:
            print("SafeTask2がlock2を取得")

ベストプラクティス

  1. 適切な粒度でスレッドを使用する
# 良い例:I/Oバウンドな処理
def download_file(url):
    # ネットワークI/O
    pass

# 悪い例:軽すぎる処理
def add_numbers(a, b):
    return a + b  # スレッド作成のオーバーヘッドの方が大きい
  1. リソースの適切な管理
import threading
from contextlib import contextmanager

@contextmanager
def managed_thread(target, args=()):
    thread = threading.Thread(target=target, args=args)
    try:
        thread.start()
        yield thread
    finally:
        thread.join()

# 使用例
def worker():
    print("作業中...")
    time.sleep(1)

with managed_thread(worker):
    print("スレッドが実行中...")
# スレッドは自動的に終了を待機される
  1. ThreadPoolExecutorの活用
from concurrent.futures import ThreadPoolExecutor
import time

def task(n):
    time.sleep(1)
    return n * n

# より簡潔で安全な書き方
with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(task, i) for i in range(5)]
    results = [future.result() for future in futures]
    print(f"結果: {results}")

これらのベストプラクティスを守ることで、安全で効率的なマルチスレッドプログラムを作成できます。

まとめ

リンドくん

リンドくん

threadingについて色々学べました!でも実際に使うときはどこから始めればいいでしょうか?

たなべ

たなべ

まずはシンプルな例から始めるのがおすすめだよ。自分のプロジェクトで「時間のかかる処理があるな」と感じたら、それをマルチスレッド化してみるといいね。
最初は基本的なThreadクラスから始めて、慣れてきたらThreadPoolExecutorも使ってみよう!

この記事では、Pythonのthreadingモジュールを使ったマルチスレッド処理について、基本概念から実践的な活用方法まで幅広く解説してきました。

重要なポイントをおさらいしましょう。

  • マルチスレッドの価値 - I/Oバウンドな処理で大幅な時間短縮が可能
  • 基本的な使い方 - Threadクラスの作成、開始、完了待機の3ステップ
  • 実践的な活用例 - Web データ取得、ファイル処理、リアルタイム処理など
  • 安全なデータ共有 - LockEventQueueを使った同期処理
  • 注意点 - GILの制限、デッドロックの回避、適切な粒度での使用

threadingは、プログラムのパフォーマンス向上ユーザビリティの改善に大きく貢献する技術です。
特に現代のWeb開発やデータ処理では、非同期処理は必須のスキルとなっています。

マルチスレッド処理をマスターすることで、あなたのPythonプログラミングスキルは確実に向上し、より実用的で効率的なアプリケーションを開発できるようになるでしょう。

この記事をシェア

関連するコンテンツ