laravel-data-chunking-large-datasets
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseData Chunking for Large Datasets
大型数据集的分块处理
Process large datasets efficiently by breaking them into manageable chunks to reduce memory consumption and improve performance.
通过将大型数据集拆分为可管理的小块来高效处理,从而减少内存消耗并提升性能。
The Problem: Memory Exhaustion
问题:内存耗尽
php
// BAD: Loading all records into memory
$users = User::all(); // Could be millions of records!
foreach ($users as $user) {
$user->sendNewsletter();
}
// BAD: Even with select, still loads everything
$emails = User::pluck('email'); // Array of millions of emails
foreach ($emails as $email) {
Mail::to($email)->send(new Newsletter());
}php
// 错误示例:将所有记录加载到内存中
$users = User::all(); // 可能包含数百万条记录!
foreach ($users as $user) {
$user->sendNewsletter();
}
// 错误示例:即使使用select,仍会加载所有数据
$emails = User::pluck('email'); // 包含数百万个邮箱的数组
foreach ($emails as $email) {
Mail::to($email)->send(new Newsletter());
}Solution: Chunking Methods
解决方案:分块处理方法
1. Basic Chunking with chunk()
chunk()1. 使用chunk()
进行基础分块
chunk()php
// Process 100 records at a time
User::chunk(100, function ($users) {
foreach ($users as $user) {
$user->calculateStatistics();
$user->save();
}
});
// With conditions
User::where('active', true)
->chunk(200, function ($users) {
foreach ($users as $user) {
ProcessUserJob::dispatch($user);
}
});php
// 每次处理100条记录
User::chunk(100, function ($users) {
foreach ($users as $user) {
$user->calculateStatistics();
$user->save();
}
});
// 带条件的分块
User::where('active', true)
->chunk(200, function ($users) {
foreach ($users as $user) {
ProcessUserJob::dispatch($user);
}
});2. Chunk By ID for Safer Updates
2. 使用chunkById()实现更安全的更新
php
// Prevents issues when modifying records during iteration
User::where('newsletter_sent', false)
->chunkById(100, function ($users) {
foreach ($users as $user) {
$user->update(['newsletter_sent' => true]);
Mail::to($user)->send(new Newsletter());
}
});
// With custom column
Payment::where('processed', false)
->chunkById(100, function ($payments) {
foreach ($payments as $payment) {
$payment->process();
}
}, 'payment_id'); // Custom ID columnphp
// 避免迭代过程中修改记录时出现问题
User::where('newsletter_sent', false)
->chunkById(100, function ($users) {
foreach ($users as $user) {
$user->update(['newsletter_sent' => true]);
Mail::to($user)->send(new Newsletter());
}
});
// 使用自定义列
Payment::where('processed', false)
->chunkById(100, function ($payments) {
foreach ($payments as $payment) {
$payment->process();
}
}, 'payment_id'); // 自定义ID列3. Lazy Collections for Memory Efficiency
3. 使用Lazy Collection优化内存效率
php
// Uses PHP generators, minimal memory footprint
User::where('created_at', '>=', now()->subDays(30))
->lazy()
->each(function ($user) {
$user->recalculateScore();
});
// With chunking size control
User::lazy(100)->each(function ($user) {
ProcessRecentUser::dispatch($user);
});
// Filter and map with lazy collections
$results = User::lazy()
->filter(fn($user) => $user->hasActiveSubscription())
->map(fn($user) => [
'id' => $user->id,
'revenue' => $user->calculateRevenue(),
])
->take(1000);php
// 使用PHP生成器,内存占用极小
User::where('created_at', '>=', now()->subDays(30))
->lazy()
->each(function ($user) {
$user->recalculateScore();
});
// 控制分块大小
User::lazy(100)->each(function ($user) {
ProcessRecentUser::dispatch($user);
});
// 对Lazy Collection进行过滤和映射
$results = User::lazy()
->filter(fn($user) => $user->hasActiveSubscription())
->map(fn($user) => [
'id' => $user->id,
'revenue' => $user->calculateRevenue(),
])
->take(1000);4. Cursor for Forward-Only Iteration
4. 使用Cursor进行仅正向迭代
php
// Most memory-efficient for simple forward iteration
foreach (User::where('active', true)->cursor() as $user) {
$user->updateLastSeen();
}
// With lazy() for additional collection methods
User::where('verified', true)
->cursor()
->filter(fn($user) => $user->hasCompletedProfile())
->each(fn($user) => SendWelcomeEmail::dispatch($user));php
// 简单正向迭代时内存效率最高
foreach (User::where('active', true)->cursor() as $user) {
$user->updateLastSeen();
}
// 结合lazy()使用更多集合方法
User::where('verified', true)
->cursor()
->filter(fn($user) => $user->hasCompletedProfile())
->each(fn($user) => SendWelcomeEmail::dispatch($user));Real-World Examples
实际应用示例
Export Large CSV
导出大型CSV文件
php
class ExportUsersCommand extends Command
{
public function handle()
{
$file = fopen('users.csv', 'w');
// Write headers
fputcsv($file, ['ID', 'Name', 'Email', 'Created At']);
// Process in chunks to avoid memory issues
User::select('id', 'name', 'email', 'created_at')
->chunkById(500, function ($users) use ($file) {
foreach ($users as $user) {
fputcsv($file, [
$user->id,
$user->name,
$user->email,
$user->created_at->toDateTimeString(),
]);
}
// Optional: Show progress
$this->info("Processed up to ID: {$users->last()->id}");
});
fclose($file);
$this->info('Export completed!');
}
}php
class ExportUsersCommand extends Command
{
public function handle()
{
$file = fopen('users.csv', 'w');
// 写入表头
fputcsv($file, ['ID', '姓名', '邮箱', '创建时间']);
// 分块处理避免内存问题
User::select('id', 'name', 'email', 'created_at')
->chunkById(500, function ($users) use ($file) {
foreach ($users as $user) {
fputcsv($file, [
$user->id,
$user->name,
$user->email,
$user->created_at->toDateTimeString(),
]);
}
// 可选:显示进度
$this->info("已处理至ID: {$users->last()->id}");
});
fclose($file);
$this->info('导出完成!');
}
}Batch Email Campaign
批量邮件营销
php
class SendCampaignJob implements ShouldQueue
{
public function handle()
{
$campaign = Campaign::find($this->campaignId);
// Process subscribers in chunks
$campaign->subscribers()
->where('unsubscribed', false)
->chunkById(50, function ($subscribers) use ($campaign) {
foreach ($subscribers as $subscriber) {
SendCampaignEmail::dispatch($campaign, $subscriber)
->onQueue('emails')
->delay(now()->addSeconds(rand(1, 10)));
}
// Prevent rate limiting
sleep(2);
});
}
}php
class SendCampaignJob implements ShouldQueue
{
public function handle()
{
$campaign = Campaign::find($this->campaignId);
// 分块处理订阅者
$campaign->subscribers()
->where('unsubscribed', false)
->chunkById(50, function ($subscribers) use ($campaign) {
foreach ($subscribers as $subscriber) {
SendCampaignEmail::dispatch($campaign, $subscriber)
->onQueue('emails')
->delay(now()->addSeconds(rand(1, 10)));
}
// 避免触发频率限制
sleep(2);
});
}
}Data Migration/Transformation
数据迁移/转换
php
class MigrateUserData extends Command
{
public function handle()
{
$bar = $this->output->createProgressBar(User::count());
User::with(['profile', 'settings'])
->chunkById(100, function ($users) use ($bar) {
DB::transaction(function () use ($users, $bar) {
foreach ($users as $user) {
// Complex transformation
$newData = $this->transformUserData($user);
NewUserModel::create($newData);
$bar->advance();
}
});
});
$bar->finish();
$this->newLine();
$this->info('Migration completed!');
}
}php
class MigrateUserData extends Command
{
public function handle()
{
$bar = $this->output->createProgressBar(User::count());
User::with(['profile', 'settings'])
->chunkById(100, function ($users) use ($bar) {
DB::transaction(function () use ($users, $bar) {
foreach ($users as $user) {
// 复杂数据转换
$newData = $this->transformUserData($user);
NewUserModel::create($newData);
$bar->advance();
}
});
});
$bar->finish();
$this->newLine();
$this->info('迁移完成!');
}
}Cleanup Old Records
清理旧记录
php
class CleanupOldLogs extends Command
{
public function handle()
{
$deletedCount = 0;
ActivityLog::where('created_at', '<', now()->subMonths(6))
->chunkById(1000, function ($logs) use (&$deletedCount) {
$ids = $logs->pluck('id')->toArray();
// Batch delete for efficiency
ActivityLog::whereIn('id', $ids)->delete();
$deletedCount += count($ids);
$this->info("Deleted {$deletedCount} records so far...");
// Give database a breather
usleep(100000); // 100ms
});
$this->info("Total deleted: {$deletedCount}");
}
}php
class CleanupOldLogs extends Command
{
public function handle()
{
$deletedCount = 0;
ActivityLog::where('created_at', '<', now()->subMonths(6))
->chunkById(1000, function ($logs) use (&$deletedCount) {
$ids = $logs->pluck('id')->toArray();
// 批量删除提升效率
ActivityLog::whereIn('id', $ids)->delete();
$deletedCount += count($ids);
$this->info("已删除 {$deletedCount} 条记录...");
// 给数据库留出缓冲时间
usleep(100000); // 100毫秒
});
$this->info("总计删除:{$deletedCount}");
}
}Choosing the Right Method
选择合适的方法
| Method | Use Case | Memory Usage | Notes |
|---|---|---|---|
| General processing | Moderate | May skip/duplicate if modifying filter columns |
| Updates during iteration | Moderate | Safer for modifications |
| Large result processing | Low | Returns LazyCollection |
| Simple forward iteration | Lowest | Returns Generator |
| Simple operations | High (loads all) | Avoid for large datasets |
| 方法 | 适用场景 | 内存占用 | 注意事项 |
|---|---|---|---|
| 通用处理 | 中等 | 如果修改过滤列可能会跳过/重复记录 |
| 迭代过程中更新数据 | 中等 | 对数据修改操作更安全 |
| 大型结果集处理 | 低 | 返回LazyCollection对象 |
| 简单正向迭代 | 最低 | 返回Generator对象 |
| 简单操作 | 高(加载全部) | 处理大型数据集时需避免使用 |
Performance Optimization Tips
性能优化技巧
1. Select Only Needed Columns
1. 仅选择需要的列
php
User::select('id', 'email', 'name')
->chunkById(100, function ($users) {
// Process with minimal data
});php
User::select('id', 'email', 'name')
->chunkById(100, function ($users) {
// 使用最少的数据进行处理
});2. Use Indexes
2. 使用索引
php
// Ensure indexed columns in where clauses
User::where('status', 'active') // status should be indexed
->where('created_at', '>', $date) // created_at should be indexed
->chunkById(200, function ($users) {
// Process efficiently
});php
// 确保WHERE子句中的列已建立索引
User::where('status', 'active') // status列应建立索引
->where('created_at', '>', $date) // created_at列应建立索引
->chunkById(200, function ($users) {
// 高效处理数据
});3. Disable Eloquent Events When Appropriate
3. 必要时禁用Eloquent事件
php
User::withoutEvents(function () {
User::chunkById(500, function ($users) {
foreach ($users as $user) {
$user->update(['processed' => true]);
}
});
});php
User::withoutEvents(function () {
User::chunkById(500, function ($users) {
foreach ($users as $user) {
$user->update(['processed' => true]);
}
});
});4. Use Raw Queries for Bulk Updates
4. 使用原生查询进行批量更新
php
// Instead of updating each record
User::chunkById(100, function ($users) {
$ids = $users->pluck('id')->toArray();
// Bulk update with raw query
DB::table('users')
->whereIn('id', $ids)
->update([
'last_processed_at' => now(),
'processing_count' => DB::raw('processing_count + 1'),
]);
});php
// 替代逐条更新
User::chunkById(100, function ($users) {
$ids = $users->pluck('id')->toArray();
// 使用原生查询批量更新
DB::table('users')
->whereIn('id', $ids)
->update([
'last_processed_at' => now(),
'processing_count' => DB::raw('processing_count + 1'),
]);
});5. Queue Large Operations
5. 将大型操作放入队列
php
class ProcessLargeDataset extends Command
{
public function handle()
{
User::chunkById(100, function ($users) {
ProcessUserBatch::dispatch($users->pluck('id'))
->onQueue('heavy-processing');
});
}
}
class ProcessUserBatch implements ShouldQueue
{
public function __construct(
public Collection $userIds
) {}
public function handle()
{
User::whereIn('id', $this->userIds)
->get()
->each(fn($user) => $user->process());
}
}php
class ProcessLargeDataset extends Command
{
public function handle()
{
User::chunkById(100, function ($users) {
ProcessUserBatch::dispatch($users->pluck('id'))
->onQueue('heavy-processing');
});
}
}
class ProcessUserBatch implements ShouldQueue
{
public function __construct(
public Collection $userIds
) {}
public function handle()
{
User::whereIn('id', $this->userIds)
->get()
->each(fn($user) => $user->process());
}
}Testing Chunked Operations
测试分块操作
php
test('processes all active users in chunks', function () {
// Create test data
User::factory()->count(150)->create(['active' => true]);
User::factory()->count(50)->create(['active' => false]);
$processed = [];
User::where('active', true)
->chunkById(50, function ($users) use (&$processed) {
foreach ($users as $user) {
$processed[] = $user->id;
}
});
expect($processed)->toHaveCount(150);
expect(count(array_unique($processed)))->toBe(150);
});
test('handles empty datasets gracefully', function () {
$callCount = 0;
User::where('id', '<', 0) // No results
->chunk(100, function ($users) use (&$callCount) {
$callCount++;
});
expect($callCount)->toBe(0);
});php
test('分块处理所有活跃用户', function () {
// 创建测试数据
User::factory()->count(150)->create(['active' => true]);
User::factory()->count(50)->create(['active' => false]);
$processed = [];
User::where('active', true)
->chunkById(50, function ($users) use (&$processed) {
foreach ($users as $user) {
$processed[] = $user->id;
}
});
expect($processed)->toHaveCount(150);
expect(count(array_unique($processed)))->toBe(150);
});
test('优雅处理空数据集', function () {
$callCount = 0;
User::where('id', '<', 0) // 无匹配结果
->chunk(100, function ($users) use (&$callCount) {
$callCount++;
});
expect($callCount)->toBe(0);
});Common Pitfalls
常见陷阱
-
Modifying filter columns during chunk()php
// WRONG: May skip records User::where('processed', false) ->chunk(100, function ($users) { foreach ($users as $user) { $user->update(['processed' => true]); // Changes the WHERE condition! } }); // CORRECT: Use chunkById() User::where('processed', false) ->chunkById(100, function ($users) { foreach ($users as $user) { $user->update(['processed' => true]); } }); -
Not handling chunk callback returnsphp
// Return false to stop chunking User::chunk(100, function ($users) { foreach ($users as $user) { if ($user->hasIssue()) { return false; // Stop processing } $user->process(); } }); -
Ignoring database connection limitsphp
// Consider connection timeouts for long operations DB::connection()->getPdo()->setAttribute(PDO::ATTR_TIMEOUT, 3600); User::chunkById(100, function ($users) { // Long running process });
Remember: When dealing with large datasets, always think about memory usage, query efficiency, and processing time. Chunk your data appropriately!
-
在chunk()中修改过滤列php
// 错误:可能会跳过记录 User::where('processed', false) ->chunk(100, function ($users) { foreach ($users as $user) { $user->update(['processed' => true]); // 修改了WHERE条件! } }); // 正确:使用chunkById() User::where('processed', false) ->chunkById(100, function ($users) { foreach ($users as $user) { $user->update(['processed' => true]); } }); -
未处理分块回调的返回值php
// 返回false可停止分块处理 User::chunk(100, function ($users) { foreach ($users as $user) { if ($user->hasIssue()) { return false; // 停止处理 } $user->process(); } }); -
忽略数据库连接限制php
// 考虑长时操作的连接超时问题 DB::connection()->getPdo()->setAttribute(PDO::ATTR_TIMEOUT, 3600); User::chunkById(100, function ($users) { // 长时间运行的处理逻辑 });
记住:处理大型数据集时,始终要考虑内存占用、查询效率和处理时间。合理对数据进行分块处理!