laravel-data-chunking-large-datasets

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Data Chunking for Large Datasets

大型数据集的分块处理

Process large datasets efficiently by breaking them into manageable chunks to reduce memory consumption and improve performance.
通过将大型数据集拆分为可管理的小块来高效处理,从而减少内存消耗并提升性能。

The Problem: Memory Exhaustion

问题:内存耗尽

php
// BAD: Loading all records into memory
$users = User::all(); // Could be millions of records!

foreach ($users as $user) {
    $user->sendNewsletter();
}

// BAD: Even with select, still loads everything
$emails = User::pluck('email'); // Array of millions of emails

foreach ($emails as $email) {
    Mail::to($email)->send(new Newsletter());
}
php
// 错误示例:将所有记录加载到内存中
$users = User::all(); // 可能包含数百万条记录!

foreach ($users as $user) {
    $user->sendNewsletter();
}

// 错误示例:即使使用select,仍会加载所有数据
$emails = User::pluck('email'); // 包含数百万个邮箱的数组

foreach ($emails as $email) {
    Mail::to($email)->send(new Newsletter());
}

Solution: Chunking Methods

解决方案:分块处理方法

1. Basic Chunking with
chunk()

1. 使用
chunk()
进行基础分块

php
// Process 100 records at a time
User::chunk(100, function ($users) {
    foreach ($users as $user) {
        $user->calculateStatistics();
        $user->save();
    }
});

// With conditions
User::where('active', true)
    ->chunk(200, function ($users) {
        foreach ($users as $user) {
            ProcessUserJob::dispatch($user);
        }
    });
php
// 每次处理100条记录
User::chunk(100, function ($users) {
    foreach ($users as $user) {
        $user->calculateStatistics();
        $user->save();
    }
});

// 带条件的分块
User::where('active', true)
    ->chunk(200, function ($users) {
        foreach ($users as $user) {
            ProcessUserJob::dispatch($user);
        }
    });

2. Chunk By ID for Safer Updates

2. 使用chunkById()实现更安全的更新

php
// Prevents issues when modifying records during iteration
User::where('newsletter_sent', false)
    ->chunkById(100, function ($users) {
        foreach ($users as $user) {
            $user->update(['newsletter_sent' => true]);
            Mail::to($user)->send(new Newsletter());
        }
    });

// With custom column
Payment::where('processed', false)
    ->chunkById(100, function ($payments) {
        foreach ($payments as $payment) {
            $payment->process();
        }
    }, 'payment_id'); // Custom ID column
php
// 避免迭代过程中修改记录时出现问题
User::where('newsletter_sent', false)
    ->chunkById(100, function ($users) {
        foreach ($users as $user) {
            $user->update(['newsletter_sent' => true]);
            Mail::to($user)->send(new Newsletter());
        }
    });

// 使用自定义列
Payment::where('processed', false)
    ->chunkById(100, function ($payments) {
        foreach ($payments as $payment) {
            $payment->process();
        }
    }, 'payment_id'); // 自定义ID列

3. Lazy Collections for Memory Efficiency

3. 使用Lazy Collection优化内存效率

php
// Uses PHP generators, minimal memory footprint
User::where('created_at', '>=', now()->subDays(30))
    ->lazy()
    ->each(function ($user) {
        $user->recalculateScore();
    });

// With chunking size control
User::lazy(100)->each(function ($user) {
    ProcessRecentUser::dispatch($user);
});

// Filter and map with lazy collections
$results = User::lazy()
    ->filter(fn($user) => $user->hasActiveSubscription())
    ->map(fn($user) => [
        'id' => $user->id,
        'revenue' => $user->calculateRevenue(),
    ])
    ->take(1000);
php
// 使用PHP生成器,内存占用极小
User::where('created_at', '>=', now()->subDays(30))
    ->lazy()
    ->each(function ($user) {
        $user->recalculateScore();
    });

// 控制分块大小
User::lazy(100)->each(function ($user) {
    ProcessRecentUser::dispatch($user);
});

// 对Lazy Collection进行过滤和映射
$results = User::lazy()
    ->filter(fn($user) => $user->hasActiveSubscription())
    ->map(fn($user) => [
        'id' => $user->id,
        'revenue' => $user->calculateRevenue(),
    ])
    ->take(1000);

4. Cursor for Forward-Only Iteration

4. 使用Cursor进行仅正向迭代

php
// Most memory-efficient for simple forward iteration
foreach (User::where('active', true)->cursor() as $user) {
    $user->updateLastSeen();
}

// With lazy() for additional collection methods
User::where('verified', true)
    ->cursor()
    ->filter(fn($user) => $user->hasCompletedProfile())
    ->each(fn($user) => SendWelcomeEmail::dispatch($user));
php
// 简单正向迭代时内存效率最高
foreach (User::where('active', true)->cursor() as $user) {
    $user->updateLastSeen();
}

// 结合lazy()使用更多集合方法
User::where('verified', true)
    ->cursor()
    ->filter(fn($user) => $user->hasCompletedProfile())
    ->each(fn($user) => SendWelcomeEmail::dispatch($user));

Real-World Examples

实际应用示例

Export Large CSV

导出大型CSV文件

php
class ExportUsersCommand extends Command
{
    public function handle()
    {
        $file = fopen('users.csv', 'w');

        // Write headers
        fputcsv($file, ['ID', 'Name', 'Email', 'Created At']);

        // Process in chunks to avoid memory issues
        User::select('id', 'name', 'email', 'created_at')
            ->chunkById(500, function ($users) use ($file) {
                foreach ($users as $user) {
                    fputcsv($file, [
                        $user->id,
                        $user->name,
                        $user->email,
                        $user->created_at->toDateTimeString(),
                    ]);
                }

                // Optional: Show progress
                $this->info("Processed up to ID: {$users->last()->id}");
            });

        fclose($file);
        $this->info('Export completed!');
    }
}
php
class ExportUsersCommand extends Command
{
    public function handle()
    {
        $file = fopen('users.csv', 'w');

        // 写入表头
        fputcsv($file, ['ID', '姓名', '邮箱', '创建时间']);

        // 分块处理避免内存问题
        User::select('id', 'name', 'email', 'created_at')
            ->chunkById(500, function ($users) use ($file) {
                foreach ($users as $user) {
                    fputcsv($file, [
                        $user->id,
                        $user->name,
                        $user->email,
                        $user->created_at->toDateTimeString(),
                    ]);
                }

                // 可选:显示进度
                $this->info("已处理至ID: {$users->last()->id}");
            });

        fclose($file);
        $this->info('导出完成!');
    }
}

Batch Email Campaign

批量邮件营销

php
class SendCampaignJob implements ShouldQueue
{
    public function handle()
    {
        $campaign = Campaign::find($this->campaignId);

        // Process subscribers in chunks
        $campaign->subscribers()
            ->where('unsubscribed', false)
            ->chunkById(50, function ($subscribers) use ($campaign) {
                foreach ($subscribers as $subscriber) {
                    SendCampaignEmail::dispatch($campaign, $subscriber)
                        ->onQueue('emails')
                        ->delay(now()->addSeconds(rand(1, 10)));
                }

                // Prevent rate limiting
                sleep(2);
            });
    }
}
php
class SendCampaignJob implements ShouldQueue
{
    public function handle()
    {
        $campaign = Campaign::find($this->campaignId);

        // 分块处理订阅者
        $campaign->subscribers()
            ->where('unsubscribed', false)
            ->chunkById(50, function ($subscribers) use ($campaign) {
                foreach ($subscribers as $subscriber) {
                    SendCampaignEmail::dispatch($campaign, $subscriber)
                        ->onQueue('emails')
                        ->delay(now()->addSeconds(rand(1, 10)));
                }

                // 避免触发频率限制
                sleep(2);
            });
    }
}

Data Migration/Transformation

数据迁移/转换

php
class MigrateUserData extends Command
{
    public function handle()
    {
        $bar = $this->output->createProgressBar(User::count());

        User::with(['profile', 'settings'])
            ->chunkById(100, function ($users) use ($bar) {
                DB::transaction(function () use ($users, $bar) {
                    foreach ($users as $user) {
                        // Complex transformation
                        $newData = $this->transformUserData($user);

                        NewUserModel::create($newData);

                        $bar->advance();
                    }
                });
            });

        $bar->finish();
        $this->newLine();
        $this->info('Migration completed!');
    }
}
php
class MigrateUserData extends Command
{
    public function handle()
    {
        $bar = $this->output->createProgressBar(User::count());

        User::with(['profile', 'settings'])
            ->chunkById(100, function ($users) use ($bar) {
                DB::transaction(function () use ($users, $bar) {
                    foreach ($users as $user) {
                        // 复杂数据转换
                        $newData = $this->transformUserData($user);

                        NewUserModel::create($newData);

                        $bar->advance();
                    }
                });
            });

        $bar->finish();
        $this->newLine();
        $this->info('迁移完成!');
    }
}

Cleanup Old Records

清理旧记录

php
class CleanupOldLogs extends Command
{
    public function handle()
    {
        $deletedCount = 0;

        ActivityLog::where('created_at', '<', now()->subMonths(6))
            ->chunkById(1000, function ($logs) use (&$deletedCount) {
                $ids = $logs->pluck('id')->toArray();

                // Batch delete for efficiency
                ActivityLog::whereIn('id', $ids)->delete();

                $deletedCount += count($ids);

                $this->info("Deleted {$deletedCount} records so far...");

                // Give database a breather
                usleep(100000); // 100ms
            });

        $this->info("Total deleted: {$deletedCount}");
    }
}
php
class CleanupOldLogs extends Command
{
    public function handle()
    {
        $deletedCount = 0;

        ActivityLog::where('created_at', '<', now()->subMonths(6))
            ->chunkById(1000, function ($logs) use (&$deletedCount) {
                $ids = $logs->pluck('id')->toArray();

                // 批量删除提升效率
                ActivityLog::whereIn('id', $ids)->delete();

                $deletedCount += count($ids);

                $this->info("已删除 {$deletedCount} 条记录...");

                // 给数据库留出缓冲时间
                usleep(100000); // 100毫秒
            });

        $this->info("总计删除:{$deletedCount}");
    }
}

Choosing the Right Method

选择合适的方法

MethodUse CaseMemory UsageNotes
chunk()
General processingModerateMay skip/duplicate if modifying filter columns
chunkById()
Updates during iterationModerateSafer for modifications
lazy()
Large result processingLowReturns LazyCollection
cursor()
Simple forward iterationLowestReturns Generator
each()
Simple operationsHigh (loads all)Avoid for large datasets
方法适用场景内存占用注意事项
chunk()
通用处理中等如果修改过滤列可能会跳过/重复记录
chunkById()
迭代过程中更新数据中等对数据修改操作更安全
lazy()
大型结果集处理返回LazyCollection对象
cursor()
简单正向迭代最低返回Generator对象
each()
简单操作高(加载全部)处理大型数据集时需避免使用

Performance Optimization Tips

性能优化技巧

1. Select Only Needed Columns

1. 仅选择需要的列

php
User::select('id', 'email', 'name')
    ->chunkById(100, function ($users) {
        // Process with minimal data
    });
php
User::select('id', 'email', 'name')
    ->chunkById(100, function ($users) {
        // 使用最少的数据进行处理
    });

2. Use Indexes

2. 使用索引

php
// Ensure indexed columns in where clauses
User::where('status', 'active') // status should be indexed
    ->where('created_at', '>', $date) // created_at should be indexed
    ->chunkById(200, function ($users) {
        // Process efficiently
    });
php
// 确保WHERE子句中的列已建立索引
User::where('status', 'active') // status列应建立索引
    ->where('created_at', '>', $date) // created_at列应建立索引
    ->chunkById(200, function ($users) {
        // 高效处理数据
    });

3. Disable Eloquent Events When Appropriate

3. 必要时禁用Eloquent事件

php
User::withoutEvents(function () {
    User::chunkById(500, function ($users) {
        foreach ($users as $user) {
            $user->update(['processed' => true]);
        }
    });
});
php
User::withoutEvents(function () {
    User::chunkById(500, function ($users) {
        foreach ($users as $user) {
            $user->update(['processed' => true]);
        }
    });
});

4. Use Raw Queries for Bulk Updates

4. 使用原生查询进行批量更新

php
// Instead of updating each record
User::chunkById(100, function ($users) {
    $ids = $users->pluck('id')->toArray();

    // Bulk update with raw query
    DB::table('users')
        ->whereIn('id', $ids)
        ->update([
            'last_processed_at' => now(),
            'processing_count' => DB::raw('processing_count + 1'),
        ]);
});
php
// 替代逐条更新
User::chunkById(100, function ($users) {
    $ids = $users->pluck('id')->toArray();

    // 使用原生查询批量更新
    DB::table('users')
        ->whereIn('id', $ids)
        ->update([
            'last_processed_at' => now(),
            'processing_count' => DB::raw('processing_count + 1'),
        ]);
});

5. Queue Large Operations

5. 将大型操作放入队列

php
class ProcessLargeDataset extends Command
{
    public function handle()
    {
        User::chunkById(100, function ($users) {
            ProcessUserBatch::dispatch($users->pluck('id'))
                ->onQueue('heavy-processing');
        });
    }
}

class ProcessUserBatch implements ShouldQueue
{
    public function __construct(
        public Collection $userIds
    ) {}

    public function handle()
    {
        User::whereIn('id', $this->userIds)
            ->get()
            ->each(fn($user) => $user->process());
    }
}
php
class ProcessLargeDataset extends Command
{
    public function handle()
    {
        User::chunkById(100, function ($users) {
            ProcessUserBatch::dispatch($users->pluck('id'))
                ->onQueue('heavy-processing');
        });
    }
}

class ProcessUserBatch implements ShouldQueue
{
    public function __construct(
        public Collection $userIds
    ) {}

    public function handle()
    {
        User::whereIn('id', $this->userIds)
            ->get()
            ->each(fn($user) => $user->process());
    }
}

Testing Chunked Operations

测试分块操作

php
test('processes all active users in chunks', function () {
    // Create test data
    User::factory()->count(150)->create(['active' => true]);
    User::factory()->count(50)->create(['active' => false]);

    $processed = [];

    User::where('active', true)
        ->chunkById(50, function ($users) use (&$processed) {
            foreach ($users as $user) {
                $processed[] = $user->id;
            }
        });

    expect($processed)->toHaveCount(150);
    expect(count(array_unique($processed)))->toBe(150);
});

test('handles empty datasets gracefully', function () {
    $callCount = 0;

    User::where('id', '<', 0) // No results
        ->chunk(100, function ($users) use (&$callCount) {
            $callCount++;
        });

    expect($callCount)->toBe(0);
});
php
test('分块处理所有活跃用户', function () {
    // 创建测试数据
    User::factory()->count(150)->create(['active' => true]);
    User::factory()->count(50)->create(['active' => false]);

    $processed = [];

    User::where('active', true)
        ->chunkById(50, function ($users) use (&$processed) {
            foreach ($users as $user) {
                $processed[] = $user->id;
            }
        });

    expect($processed)->toHaveCount(150);
    expect(count(array_unique($processed)))->toBe(150);
});

test('优雅处理空数据集', function () {
    $callCount = 0;

    User::where('id', '<', 0) // 无匹配结果
        ->chunk(100, function ($users) use (&$callCount) {
            $callCount++;
        });

    expect($callCount)->toBe(0);
});

Common Pitfalls

常见陷阱

  1. Modifying filter columns during chunk()
    php
    // WRONG: May skip records
    User::where('processed', false)
        ->chunk(100, function ($users) {
            foreach ($users as $user) {
                $user->update(['processed' => true]); // Changes the WHERE condition!
            }
        });
    
    // CORRECT: Use chunkById()
    User::where('processed', false)
        ->chunkById(100, function ($users) {
            foreach ($users as $user) {
                $user->update(['processed' => true]);
            }
        });
  2. Not handling chunk callback returns
    php
    // Return false to stop chunking
    User::chunk(100, function ($users) {
        foreach ($users as $user) {
            if ($user->hasIssue()) {
                return false; // Stop processing
            }
            $user->process();
        }
    });
  3. Ignoring database connection limits
    php
    // Consider connection timeouts for long operations
    DB::connection()->getPdo()->setAttribute(PDO::ATTR_TIMEOUT, 3600);
    
    User::chunkById(100, function ($users) {
        // Long running process
    });
Remember: When dealing with large datasets, always think about memory usage, query efficiency, and processing time. Chunk your data appropriately!
  1. 在chunk()中修改过滤列
    php
    // 错误:可能会跳过记录
    User::where('processed', false)
        ->chunk(100, function ($users) {
            foreach ($users as $user) {
                $user->update(['processed' => true]); // 修改了WHERE条件!
            }
        });
    
    // 正确:使用chunkById()
    User::where('processed', false)
        ->chunkById(100, function ($users) {
            foreach ($users as $user) {
                $user->update(['processed' => true]);
            }
        });
  2. 未处理分块回调的返回值
    php
    // 返回false可停止分块处理
    User::chunk(100, function ($users) {
        foreach ($users as $user) {
            if ($user->hasIssue()) {
                return false; // 停止处理
            }
            $user->process();
        }
    });
  3. 忽略数据库连接限制
    php
    // 考虑长时操作的连接超时问题
    DB::connection()->getPdo()->setAttribute(PDO::ATTR_TIMEOUT, 3600);
    
    User::chunkById(100, function ($users) {
        // 长时间运行的处理逻辑
    });
记住:处理大型数据集时,始终要考虑内存占用、查询效率和处理时间。合理对数据进行分块处理!