フリーキーズ | 独学プログラミング

Pythonのmultiprocessingで並列処理入門!初心者でもわかるCPU並列処理の基本とサンプルコード

リンドくん

リンドくん

たなべ先生、Pythonで重い処理をするとすごく時間がかかってしまうんです...何か解決方法ありませんか?

たなべ

たなべ

実は、Pythonにはmultiprocessingという強力な機能があるんだ。これを使うと、複数のCPUコアを同時に使って処理を高速化できるよ。
今日はその使い方を一緒に学んでいこう!

プログラミングを学んでいると、「このデータ処理、すごく時間がかかるなあ...」と感じたことはありませんか?
特に大量のデータを扱ったり、複雑な計算を行ったりする際に、処理の完了を待っている時間がもったいないと思うことは多いことでしょう。

そんな時に役立つのが、multiprocessing(マルチプロセッシング)という技術です。
これは複数のCPUコアを同時に活用することで、処理速度を劇的に向上させることができる仕組みです。

この記事では、Python初心者の方でも理解できるよう、multiprocessingの基本概念から実際の使い方まで、ステップバイステップで解説していきます。

プログラミング学習でお悩みの方へ

HackATAは、エンジニアを目指す方のためのプログラミング学習コーチングサービスです。 経験豊富な現役エンジニアがあなたの学習をサポートします。

✓ 質問し放題

✓ β版公開中(2025年内の特別割引)

HackATAの詳細を見る

multiprocessingとは?並列処理の基本概念を理解しよう

リンドくん

リンドくん

そもそも「並列処理」って何なんですか?普通の処理とどう違うんでしょうか?

たなべ

たなべ

普通の処理は一つずつ順番に作業を進めるけど、並列処理は複数の作業を同時に進められるんだ。
まるで料理で、一人で作るより複数人で分担して作る方が早いのと同じだよ。

並列処理の基本概念

通常のプログラムはシーケンシャル処理、あるいは逐次処理と呼ばれる方法で動作します。これは、タスクを一つずつ順番に実行していく方式です。
例えば、1000個のデータを処理する場合、1番目から1000番目まで順番に処理していきます。

一方、並列処理では複数のプロセス(処理の単位)を同時に実行することで、全体の処理時間を短縮できます。
先ほどの例でいえば、1000個のデータを4つのプロセスに分けて、それぞれが250個ずつ処理することで、理論上は4倍の速度で処理が完了します。

multiprocessingとthreadingの違い

Pythonには並列処理を実現する方法として、multiprocessingthreadingの2つがあります。これらの違いを理解することは重要です。

  • multiprocessing = 複数のプロセスを使用して並列処理を行う
  • threading = 複数のスレッドを使用して並列処理を行う

特にPythonでは、GIL(Global Interpreter Lock)という仕組みがあるため、CPU集約的な処理ではmultiprocessingの方が効果的です。
GILによって、threadingではCPU処理が実質的に並列化されないことが多いのです。

いつmultiprocessingを使うべきか

multiprocessingが効果的なケースは以下のような場面です。

  • 大量の数値計算(機械学習の訓練、統計解析など)
  • 画像・動画処理(フィルタ処理、変換など)
  • データの変換・加工(CSVファイルの大量処理など)
  • 独立性のあるタスク(各処理が他に依存しない場合)

逆に、ファイルの読み書きやネットワーク通信が中心の処理では、multiprocessingよりもasynciothreadingの方が適している場合があります。

基本的な使い方とシンプルなサンプルコード

リンドくん

リンドくん

実際にはどうやって使うんですか?難しそうですが...

たなべ

たなべ

心配しないで!基本的な使い方は意外とシンプルなんだよ。
まずは簡単な例から始めてみよう。

最もシンプルな例

まず、multiprocessingを使わない通常の処理と比較してみましょう。

import time

def heavy_calculation(n):
    """重い計算処理のシミュレーション"""
    result = 0
    for i in range(n * 1000000):
        result += i
    return result

# 通常の処理(シーケンシャル)
start_time = time.time()
results = []
for i in range(1, 5):
    result = heavy_calculation(i)
    results.append(result)
    
end_time = time.time()
print(f"通常の処理時間: {end_time - start_time:.2f}秒")

次に、同じ処理をmultiprocessingで並列化してみます。

import multiprocessing
import time

def heavy_calculation(n):
    """重い計算処理のシミュレーション"""
    result = 0
    for i in range(n * 1000000):
        result += i
    return result

if __name__ == '__main__':
    # 並列処理
    start_time = time.time()
    
    with multiprocessing.Pool() as pool:
        results = pool.map(heavy_calculation, [1, 2, 3, 4])
    
    end_time = time.time()
    print(f"並列処理時間: {end_time - start_time:.2f}秒")
    print(f"結果: {results}")

Pool.map()の使い方

Pool.map()は、multiprocessingで最もよく使われるメソッドの一つです。使い方は以下の通りです。

# 基本的な構文
with multiprocessing.Pool() as pool:
    results = pool.map(関数名, 引数のリスト)

この方法では、引数のリストの各要素に対して指定した関数を並列実行し、結果をリストで返します。

プロセス数の指定

デフォルトでは、CPUのコア数と同じ数のプロセスが作成されますが、明示的に指定することも可能です。

# プロセス数を明示的に指定
with multiprocessing.Pool(processes=2) as pool:
    results = pool.map(heavy_calculation, [1, 2, 3, 4])

適切なプロセス数は、CPUのコア数や処理内容によって変わります。
一般的には、CPUのコア数と同じか、少し少ない数を指定するのが効果的です。

活用例とパフォーマンス比較

リンドくん

リンドくん

もう少し実用的な例も見てみたいです!実際の開発でどんな場面で使えるんでしょうか?

たなべ

たなべ

そうだね!実際のプロジェクトでよくあるデータ処理画像処理の例を見てみよう。
パフォーマンスの違いも数字で確認できるよ。

大量データの処理例

CSVファイルの処理など、実際の業務でよくある例を見てみましょう。

import multiprocessing
import time
import random

def process_data_chunk(data_chunk):
    """データチャンクの処理"""
    processed = []
    for item in data_chunk:
        # 複雑な処理のシミュレーション
        result = item ** 2 + item ** 0.5 + random.random()
        processed.append(result)
    return processed

def split_list(data, chunk_size):
    """リストを指定されたサイズのチャンクに分割"""
    for i in range(0, len(data), chunk_size):
        yield data[i:i + chunk_size]

if __name__ == '__main__':
    # 大量のデータを生成
    large_data = list(range(1, 100001))  # 10万個のデータ
    
    # シーケンシャル処理
    start_time = time.time()
    sequential_result = process_data_chunk(large_data)
    sequential_time = time.time() - start_time
    
    # 並列処理
    start_time = time.time()
    
    # データを4つのチャンクに分割
    chunk_size = len(large_data) // 4
    data_chunks = list(split_list(large_data, chunk_size))
    
    with multiprocessing.Pool() as pool:
        parallel_results = pool.map(process_data_chunk, data_chunks)
    
    # 結果をまとめる
    parallel_result = []
    for chunk_result in parallel_results:
        parallel_result.extend(chunk_result)
    
    parallel_time = time.time() - start_time
    
    print(f"シーケンシャル処理時間: {sequential_time:.2f}秒")
    print(f"並列処理時間: {parallel_time:.2f}秒")
    print(f"高速化倍率: {sequential_time / parallel_time:.2f}倍")

ファイル処理の並列化

複数のファイルを処理する場合の例です。

import multiprocessing
import os
import time

def process_file(filename):
    """ファイル処理のシミュレーション"""
    try:
        with open(filename, 'r', encoding='utf-8') as f:
            content = f.read()
            # 処理のシミュレーション(文字数をカウント)
            char_count = len(content)
            word_count = len(content.split())
            
        return {
            'filename': filename,
            'char_count': char_count,
            'word_count': word_count
        }
    except Exception as e:
        return {
            'filename': filename,
            'error': str(e)
        }

if __name__ == '__main__':
    # 処理対象のファイルリスト(実際のファイルパスに変更してください)
    file_list = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']
    
    # 並列処理でファイルを処理
    with multiprocessing.Pool() as pool:
        results = pool.map(process_file, file_list)
    
    # 結果の表示
    for result in results:
        if 'error' in result:
            print(f"{result['filename']}: エラー - {result['error']}")
        else:
            print(f"{result['filename']}: 文字数={result['char_count']}, 単語数={result['word_count']}")

パフォーマンス測定のベストプラクティス

正確なパフォーマンス測定を行うためのポイントをご紹介します。

import multiprocessing
import time
from functools import wraps

def measure_time(func):
    """実行時間を測定するデコレータ"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.perf_counter()
        result = func(*args, **kwargs)
        end = time.perf_counter()
        print(f"{func.__name__}の実行時間: {end - start:.4f}秒")
        return result
    return wrapper

@measure_time
def sequential_processing(data):
    """シーケンシャル処理"""
    return [x ** 2 for x in data]

@measure_time
def parallel_processing(data):
    """並列処理"""
    def square(x):
        return x ** 2
    
    with multiprocessing.Pool() as pool:
        return pool.map(square, data)

if __name__ == '__main__':
    test_data = list(range(1000000))  # 100万個のデータ
    
    # 複数回実行して平均を取る
    sequential_times = []
    parallel_times = []
    
    for i in range(3):
        print(f"\n=== 実行 {i+1} ===")
        start = time.perf_counter()
        sequential_processing(test_data.copy())
        sequential_times.append(time.perf_counter() - start)
        
        start = time.perf_counter()
        parallel_processing(test_data.copy())
        parallel_times.append(time.perf_counter() - start)
    
    print(f"\n=== 平均実行時間 ===")
    print(f"シーケンシャル: {sum(sequential_times)/len(sequential_times):.4f}秒")
    print(f"並列処理: {sum(parallel_times)/len(parallel_times):.4f}秒")

注意点とトラブルシューティング

リンドくん

リンドくん

multiprocessingを使う時に気をつけることはありますか?何かハマりやすいポイントがあったら教えてください。

たなべ

たなべ

実は、いくつか注意すべきポイントがあるんだ。
特にメモリ使用量オーバーヘッドについて理解しておくことが大切だよ。

if name == 'main': の重要性

multiprocessingを使用する際、必ず以下のように書く必要があります

if __name__ == '__main__':
    # multiprocessingのコードをここに書く

これを書かないと、Windowsでプログラムが無限ループに陥る可能性があります。これは、新しいプロセスが作成される際に、スクリプト全体が再実行されるためです。

メモリ使用量に注意

multiprocessingでは、各プロセスが独立したメモリ空間を持ちます。そのため、大きなデータを扱う場合はメモリ使用量が増加します。

import multiprocessing
import sys

def memory_heavy_task(data):
    """メモリを大量に使用するタスクの例"""
    # 大きなリストを作成
    large_list = [x * 2 for x in data]
    return sum(large_list)

if __name__ == '__main__':
    # 大きなデータセット
    big_data = list(range(1000000))
    
    print(f"元データのサイズ: {sys.getsizeof(big_data)} bytes")
    
    # プロセス数を制限してメモリ使用量をコントロール
    with multiprocessing.Pool(processes=2) as pool:  # プロセス数を制限
        results = pool.map(memory_heavy_task, [big_data])

オーバーヘッドを考慮した設計

並列処理には、プロセス作成や通信のオーバーヘッドが発生します。軽い処理では、並列化によってむしろ遅くなる場合があります。

import multiprocessing
import time

def light_task(x):
    """軽い処理"""
    return x * 2

def heavy_task(x):
    """重い処理"""
    time.sleep(0.1)  # 重い処理のシミュレーション
    return x * 2

if __name__ == '__main__':
    data = list(range(10))
    
    # 軽い処理の場合
    print("=== 軽い処理 ===")
    
    start = time.time()
    sequential_light = [light_task(x) for x in data]
    print(f"シーケンシャル: {time.time() - start:.4f}秒")
    
    start = time.time()
    with multiprocessing.Pool() as pool:
        parallel_light = pool.map(light_task, data)
    print(f"並列処理: {time.time() - start:.4f}秒")
    
    # 重い処理の場合
    print("\n=== 重い処理 ===")
    
    start = time.time()
    sequential_heavy = [heavy_task(x) for x in data]
    print(f"シーケンシャル: {time.time() - start:.4f}秒")
    
    start = time.time()
    with multiprocessing.Pool() as pool:
        parallel_heavy = pool.map(heavy_task, data)
    print(f"並列処理: {time.time() - start:.4f}秒")

エラーハンドリング

並列処理でのエラーハンドリングも重要なポイントです。

import multiprocessing

def risky_task(x):
    """エラーが発生する可能性のあるタスク"""
    if x == 3:
        raise ValueError(f"値 {x} でエラーが発生しました")
    return x * 2

def safe_task(x):
    """エラーハンドリングを含むタスク"""
    try:
        if x == 3:
            raise ValueError(f"値 {x} でエラーが発生しました")
        return {'value': x, 'result': x * 2, 'error': None}
    except Exception as e:
        return {'value': x, 'result': None, 'error': str(e)}

if __name__ == '__main__':
    data = [1, 2, 3, 4, 5]
    
    # エラーハンドリングありの安全な実装
    with multiprocessing.Pool() as pool:
        results = pool.map(safe_task, data)
    
    for result in results:
        if result['error']:
            print(f"値 {result['value']}: エラー - {result['error']}")
        else:
            print(f"値 {result['value']}: 結果 - {result['result']}")

まとめ

リンドくん

リンドくん

multiprocessingってすごく便利ですね!これで重い処理も怖くありません。

たなべ

たなべ

そうだね!ただし、適材適所で使うことが大切だよ。
まずは今日学んだ基本から始めて、徐々に複雑な処理にチャレンジしてみよう。

この記事では、Python multiprocessingの基本から実践的な活用例まで、初心者の方でも理解できるよう段階的に解説してきました。

multiprocessingの主なメリットをまとめましょう。

  • 処理速度の大幅な向上 → 複数のCPUコアを活用することで、計算集約的なタスクを高速化できます
  • スケーラビリティ → データ量が増えても、並列処理によって処理時間の増加を抑制できます
  • 簡単な実装Pool.map()を使えば、わずか数行のコードで並列処理を実現できます

multiprocessingは、現代のマルチコアCPUの性能を最大限に活かすための強力な技術です。
データサイエンス、機械学習、画像処理など、様々な分野で活用されています。

今回学んだ基本的な使い方から始めて、プロジェクトの要件に応じてより高度な機能(ProcessクラスやQueueなど)も学んでいけば、さらに効率的なプログラムを作成できるようになるでしょう。

この記事をシェア

関連するコンテンツ