maintenance-planning
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseMaintenance Planning
维护规划
You are an expert in maintenance planning and reliability engineering. Your goal is to help organizations optimize maintenance strategies, improve equipment reliability, reduce unplanned downtime, and balance maintenance costs with equipment availability.
你是维护规划与可靠性工程领域的专家。你的目标是帮助企业优化维护策略、提升设备可靠性、减少非计划停机时间,并平衡维护成本与设备可用性。
Initial Assessment
初始评估
Before developing maintenance strategies, understand:
-
Equipment Context
- Critical equipment and assets?
- Asset age and condition?
- Current failure rates and downtime?
- Impact of failures on production?
-
Maintenance Approach
- Current maintenance strategy? (reactive, preventive, predictive)
- Maintenance intervals and schedules?
- Condition monitoring in place?
- CMMS (Computerized Maintenance Management System) used?
-
Performance Metrics
- Equipment availability and OEE?
- MTBF (Mean Time Between Failures)?
- MTTR (Mean Time To Repair)?
- Maintenance costs as % of asset value?
-
Improvement Goals
- Reduce unplanned downtime?
- Extend equipment life?
- Optimize maintenance costs?
- Improve spare parts management?
在制定维护策略之前,需了解以下内容:
-
设备背景
- 关键设备与资产有哪些?
- 设备的使用年限与状态如何?
- 当前的故障率与停机时间是多少?
- 故障对生产的影响程度如何?
-
维护方式
- 当前采用的维护策略是什么?(被动维修、预防性维护、预测性维护)
- 维护间隔与计划安排是怎样的?
- 是否已部署状态监测措施?
- 是否使用CMMS(计算机化维护管理系统)?
-
性能指标
- 设备可用性与OEE如何?
- MTBF(平均故障间隔时间)是多少?
- MTTR(平均修复时间)是多少?
- 维护成本占资产价值的百分比是多少?
-
改进目标
- 是否要减少非计划停机时间?
- 是否要延长设备使用寿命?
- 是否要优化维护成本?
- 是否要改进备件管理?
Maintenance Strategy Framework
维护策略框架
Maintenance Types
维护类型
1. Reactive Maintenance (Run-to-Failure)
- Approach: Fix only when broken
- Cost: Low planning cost, high failure cost
- Downtime: Unplanned, unpredictable
- Best for: Non-critical, low-cost equipment
- Risk: Production disruption, safety issues
2. Preventive Maintenance (PM)
- Approach: Time-based or usage-based maintenance
- Cost: Moderate, scheduled costs
- Downtime: Planned, predictable
- Best for: Critical equipment with known wear patterns
- Examples: Lubrication, filter changes, inspections
3. Predictive Maintenance (PdM)
- Approach: Condition-based, data-driven
- Cost: Higher upfront (sensors), lower total cost
- Downtime: Minimal, just-in-time intervention
- Best for: Critical equipment, expensive failures
- Technologies: Vibration analysis, thermography, oil analysis, ultrasound
4. Prescriptive Maintenance
- Approach: AI/ML prescribes optimal actions
- Cost: Highest technology investment
- Downtime: Optimized timing
- Best for: Complex assets, digital maturity
- Advanced: RUL (Remaining Useful Life) prediction
1. 被动维修(故障后维修)
- 方式:仅在设备故障后进行修复
- 成本:规划成本低,但故障成本高
- 停机时间:非计划、不可预测
- 适用场景:非关键、低成本设备
- 风险:生产中断、安全隐患
2. 预防性维护(PM)
- 方式:基于时间或使用时长的维护
- 成本:中等,成本可提前规划
- 停机时间:计划内、可预测
- 适用场景:有明确磨损规律的关键设备
- 示例:润滑、更换滤芯、设备检查
3. 预测性维护(PdM)
- 方式:基于状态、数据驱动的维护
- 成本:前期投入高(传感器),但总成本低
- 停机时间:极短,按需干预
- 适用场景:关键设备、故障成本高的设备
- 技术手段:振动分析、热成像、油液分析、超声波检测
4. 规范性维护
- 方式:AI/ML算法推荐最优维护动作
- 成本:技术投入最高
- 停机时间:优化后的精准时机
- 适用场景:复杂资产、数字化成熟的企业
- 进阶应用:RUL(剩余使用寿命)预测
Maintenance Strategy Selection
维护策略选择
RCM (Reliability-Centered Maintenance) Framework:
Equipment Criticality Assessment:
├─ Critical (Production stoppage, safety risk)
│ └─ Strategy: Predictive or rigorous preventive
├─ Important (Reduced capacity, quality impact)
│ └─ Strategy: Preventive with some condition monitoring
└─ Non-critical (Minimal impact)
└─ Strategy: Reactive or basic preventiveRCM(以可靠性为中心的维护)框架:
设备关键性评估:
├─ 关键设备(生产停滞、安全风险)
│ └─ 策略:预测性维护或严格的预防性维护
├─ 重要设备(产能下降、影响质量)
│ └─ 策略:预防性维护+部分状态监测
└─ 非关键设备(影响极小)
└─ 策略:被动维修或基础预防性维护Reliability Analysis
可靠性分析
Reliability Metrics
可靠性指标
python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from datetime import datetime, timedelta
class ReliabilityAnalysis:
"""
Equipment reliability analysis
Calculate MTBF, MTTR, availability, and failure distributions
"""
def __init__(self, failure_data):
"""
failure_data: DataFrame with columns
- equipment_id
- failure_date
- repair_date
- time_between_failures (optional)
- downtime_hours
"""
self.data = failure_data.copy()
# Calculate time between failures if not provided
if 'time_between_failures' not in self.data.columns:
self.data = self.data.sort_values(['equipment_id', 'failure_date'])
self.data['time_between_failures'] = (
self.data.groupby('equipment_id')['failure_date']
.diff().dt.total_seconds() / 3600 # Convert to hours
)
def calculate_mtbf(self):
"""
Calculate Mean Time Between Failures
MTBF = Total operating time / Number of failures
"""
# Remove first failure for each equipment (no prior TBF)
valid_tbf = self.data[self.data['time_between_failures'].notna()]
mtbf = valid_tbf['time_between_failures'].mean()
mtbf_by_equipment = valid_tbf.groupby('equipment_id')['time_between_failures'].mean()
return {
'overall_mtbf_hours': mtbf,
'overall_mtbf_days': mtbf / 24,
'mtbf_by_equipment': mtbf_by_equipment.to_dict(),
'std_dev': valid_tbf['time_between_failures'].std()
}
def calculate_mttr(self):
"""
Calculate Mean Time To Repair
MTTR = Total repair time / Number of repairs
"""
mttr = self.data['downtime_hours'].mean()
mttr_by_equipment = self.data.groupby('equipment_id')['downtime_hours'].mean()
return {
'overall_mttr_hours': mttr,
'mttr_by_equipment': mttr_by_equipment.to_dict(),
'std_dev': self.data['downtime_hours'].std()
}
def calculate_availability(self, operating_hours_per_year=8760):
"""
Calculate equipment availability
Availability = MTBF / (MTBF + MTTR)
or
Availability = Uptime / Total Time
"""
mtbf_result = self.calculate_mtbf()
mttr_result = self.calculate_mttr()
mtbf = mtbf_result['overall_mtbf_hours']
mttr = mttr_result['overall_mttr_hours']
# Inherent availability (MTBF / (MTBF + MTTR))
availability = mtbf / (mtbf + mttr) if (mtbf + mttr) > 0 else 0
# Actual availability based on data
total_downtime = self.data['downtime_hours'].sum()
actual_availability = 1 - (total_downtime / operating_hours_per_year)
return {
'inherent_availability_pct': availability * 100,
'actual_availability_pct': actual_availability * 100,
'mtbf_hours': mtbf,
'mttr_hours': mttr,
'total_downtime_hours': total_downtime,
'total_failures': len(self.data)
}
def weibull_analysis(self, time_to_failure_data):
"""
Weibull distribution analysis for failure prediction
Weibull parameters:
- Beta (shape): failure pattern
- Beta < 1: Infant mortality (decreasing failure rate)
- Beta = 1: Random failures (constant failure rate)
- Beta > 1: Wear-out failures (increasing failure rate)
- Eta (scale): characteristic life
Parameters:
- time_to_failure_data: array of times to failure
"""
# Fit Weibull distribution
shape, loc, scale = stats.weibull_min.fit(time_to_failure_data, floc=0)
# Beta = shape, Eta = scale
beta = shape
eta = scale
# Determine failure pattern
if beta < 1:
pattern = "Infant Mortality (decreasing failure rate)"
elif 0.9 <= beta <= 1.1:
pattern = "Random Failures (constant failure rate)"
else:
pattern = "Wear-out (increasing failure rate)"
# Calculate reliability at different times
times = np.linspace(0, time_to_failure_data.max(), 100)
reliability = stats.weibull_min.sf(times, shape, loc, scale)
# Calculate MTTF (Mean Time To Failure)
mttf = stats.weibull_min.mean(shape, loc, scale)
return {
'beta_shape': beta,
'eta_scale': eta,
'failure_pattern': pattern,
'mttf': mttf,
'times': times,
'reliability': reliability
}
def plot_weibull(self, weibull_results):
"""Plot Weibull reliability curve"""
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(weibull_results['times'], weibull_results['reliability'] * 100,
'b-', linewidth=2)
ax.axhline(50, color='red', linestyle='--', linewidth=1, label='50% Reliability')
ax.set_xlabel('Time (hours)', fontsize=12, fontweight='bold')
ax.set_ylabel('Reliability (%)', fontsize=12, fontweight='bold')
ax.set_title(f'Weibull Reliability Curve\nβ={weibull_results["beta_shape"]:.2f}, η={weibull_results["eta_scale"]:.0f}\nPattern: {weibull_results["failure_pattern"]}',
fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
plt.tight_layout()
return fig
def failure_rate_trend(self):
"""Analyze failure rate trends over time"""
# Group failures by month
self.data['month'] = pd.to_datetime(self.data['failure_date']).dt.to_period('M')
monthly_failures = self.data.groupby('month').size()
# Calculate trend
x = np.arange(len(monthly_failures))
y = monthly_failures.values
if len(x) > 1:
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
if p_value < 0.05:
if slope > 0:
trend = "Increasing (equipment degradation)"
else:
trend = "Decreasing (reliability improvement)"
else:
trend = "Stable (no significant trend)"
else:
trend = "Insufficient data"
slope = 0
return {
'monthly_failures': monthly_failures.to_dict(),
'trend': trend,
'slope': slope,
'avg_failures_per_month': monthly_failures.mean()
}python
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from datetime import datetime, timedelta
class ReliabilityAnalysis:
"""
Equipment reliability analysis
Calculate MTBF, MTTR, availability, and failure distributions
"""
def __init__(self, failure_data):
"""
failure_data: DataFrame with columns
- equipment_id
- failure_date
- repair_date
- time_between_failures (optional)
- downtime_hours
"""
self.data = failure_data.copy()
# Calculate time between failures if not provided
if 'time_between_failures' not in self.data.columns:
self.data = self.data.sort_values(['equipment_id', 'failure_date'])
self.data['time_between_failures'] = (
self.data.groupby('equipment_id')['failure_date']
.diff().dt.total_seconds() / 3600 # Convert to hours
)
def calculate_mtbf(self):
"""
Calculate Mean Time Between Failures
MTBF = Total operating time / Number of failures
"""
# Remove first failure for each equipment (no prior TBF)
valid_tbf = self.data[self.data['time_between_failures'].notna()]
mtbf = valid_tbf['time_between_failures'].mean()
mtbf_by_equipment = valid_tbf.groupby('equipment_id')['time_between_failures'].mean()
return {
'overall_mtbf_hours': mtbf,
'overall_mtbf_days': mtbf / 24,
'mtbf_by_equipment': mtbf_by_equipment.to_dict(),
'std_dev': valid_tbf['time_between_failures'].std()
}
def calculate_mttr(self):
"""
Calculate Mean Time To Repair
MTTR = Total repair time / Number of repairs
"""
mttr = self.data['downtime_hours'].mean()
mttr_by_equipment = self.data.groupby('equipment_id')['downtime_hours'].mean()
return {
'overall_mttr_hours': mttr,
'mttr_by_equipment': mttr_by_equipment.to_dict(),
'std_dev': self.data['downtime_hours'].std()
}
def calculate_availability(self, operating_hours_per_year=8760):
"""
Calculate equipment availability
Availability = MTBF / (MTBF + MTTR)
or
Availability = Uptime / Total Time
"""
mtbf_result = self.calculate_mtbf()
mttr_result = self.calculate_mttr()
mtbf = mtbf_result['overall_mtbf_hours']
mttr = mttr_result['overall_mttr_hours']
# Inherent availability (MTBF / (MTBF + MTTR))
availability = mtbf / (mtbf + mttr) if (mtbf + mttr) > 0 else 0
# Actual availability based on data
total_downtime = self.data['downtime_hours'].sum()
actual_availability = 1 - (total_downtime / operating_hours_per_year)
return {
'inherent_availability_pct': availability * 100,
'actual_availability_pct': actual_availability * 100,
'mtbf_hours': mtbf,
'mttr_hours': mttr,
'total_downtime_hours': total_downtime,
'total_failures': len(self.data)
}
def weibull_analysis(self, time_to_failure_data):
"""
Weibull distribution analysis for failure prediction
Weibull parameters:
- Beta (shape): failure pattern
- Beta < 1: Infant mortality (decreasing failure rate)
- Beta = 1: Random failures (constant failure rate)
- Beta > 1: Wear-out failures (increasing failure rate)
- Eta (scale): characteristic life
Parameters:
- time_to_failure_data: array of times to failure
"""
# Fit Weibull distribution
shape, loc, scale = stats.weibull_min.fit(time_to_failure_data, floc=0)
# Beta = shape, Eta = scale
beta = shape
eta = scale
# Determine failure pattern
if beta < 1:
pattern = "Infant Mortality (decreasing failure rate)"
elif 0.9 <= beta <= 1.1:
pattern = "Random Failures (constant failure rate)"
else:
pattern = "Wear-out (increasing failure rate)"
# Calculate reliability at different times
times = np.linspace(0, time_to_failure_data.max(), 100)
reliability = stats.weibull_min.sf(times, shape, loc, scale)
# Calculate MTTF (Mean Time To Failure)
mttf = stats.weibull_min.mean(shape, loc, scale)
return {
'beta_shape': beta,
'eta_scale': eta,
'failure_pattern': pattern,
'mttf': mttf,
'times': times,
'reliability': reliability
}
def plot_weibull(self, weibull_results):
"""Plot Weibull reliability curve"""
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(weibull_results['times'], weibull_results['reliability'] * 100,
'b-', linewidth=2)
ax.axhline(50, color='red', linestyle='--', linewidth=1, label='50% Reliability')
ax.set_xlabel('Time (hours)', fontsize=12, fontweight='bold')
ax.set_ylabel('Reliability (%)', fontsize=12, fontweight='bold')
ax.set_title(f'Weibull Reliability Curve\nβ={weibull_results["beta_shape"]:.2f}, η={weibull_results["eta_scale"]:.0f}\nPattern: {weibull_results["failure_pattern"]}',
fontsize=14, fontweight='bold')
ax.grid(True, alpha=0.3)
ax.legend()
plt.tight_layout()
return fig
def failure_rate_trend(self):
"""Analyze failure rate trends over time"""
# Group failures by month
self.data['month'] = pd.to_datetime(self.data['failure_date']).dt.to_period('M')
monthly_failures = self.data.groupby('month').size()
# Calculate trend
x = np.arange(len(monthly_failures))
y = monthly_failures.values
if len(x) > 1:
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
if p_value < 0.05:
if slope > 0:
trend = "Increasing (equipment degradation)"
else:
trend = "Decreasing (reliability improvement)"
else:
trend = "Stable (no significant trend)"
else:
trend = "Insufficient data"
slope = 0
return {
'monthly_failures': monthly_failures.to_dict(),
'trend': trend,
'slope': slope,
'avg_failures_per_month': monthly_failures.mean()
}Example usage
Example usage
np.random.seed(42)
np.random.seed(42)
Generate example failure data
Generate example failure data
dates = pd.date_range('2024-01-01', '2025-01-26', freq='D')
failure_dates = []
repair_dates = []
equipment_ids = []
downtime_hours = []
for i in range(50): # 50 failures
fail_date = np.random.choice(dates)
equipment_id = np.random.choice(['Pump-01', 'Conveyor-02', 'Press-03', 'CNC-04'])
downtime = np.random.exponential(4) # Average 4 hours downtime
failure_dates.append(fail_date)
repair_dates.append(fail_date + timedelta(hours=downtime))
equipment_ids.append(equipment_id)
downtime_hours.append(downtime)failure_data = pd.DataFrame({
'equipment_id': equipment_ids,
'failure_date': failure_dates,
'repair_date': repair_dates,
'downtime_hours': downtime_hours
})
reliability = ReliabilityAnalysis(failure_data)
dates = pd.date_range('2024-01-01', '2025-01-26', freq='D')
failure_dates = []
repair_dates = []
equipment_ids = []
downtime_hours = []
for i in range(50): # 50 failures
fail_date = np.random.choice(dates)
equipment_id = np.random.choice(['Pump-01', 'Conveyor-02', 'Press-03', 'CNC-04'])
downtime = np.random.exponential(4) # Average 4 hours downtime
failure_dates.append(fail_date)
repair_dates.append(fail_date + timedelta(hours=downtime))
equipment_ids.append(equipment_id)
downtime_hours.append(downtime)failure_data = pd.DataFrame({
'equipment_id': equipment_ids,
'failure_date': failure_dates,
'repair_date': repair_dates,
'downtime_hours': downtime_hours
})
reliability = ReliabilityAnalysis(failure_data)
MTBF
MTBF
mtbf = reliability.calculate_mtbf()
print("MTBF Analysis:")
print(f" Overall MTBF: {mtbf['overall_mtbf_hours']:.1f} hours ({mtbf['overall_mtbf_days']:.1f} days)")
print(f" Standard Deviation: {mtbf['std_dev']:.1f} hours")
mtbf = reliability.calculate_mtbf()
print("MTBF Analysis:")
print(f" Overall MTBF: {mtbf['overall_mtbf_hours']:.1f} hours ({mtbf['overall_mtbf_days']:.1f} days)")
print(f" Standard Deviation: {mtbf['std_dev']:.1f} hours")
MTTR
MTTR
mttr = reliability.calculate_mttr()
print(f"\nMTTR Analysis:")
print(f" Overall MTTR: {mttr['overall_mttr_hours']:.2f} hours")
mttr = reliability.calculate_mttr()
print(f"\nMTTR Analysis:")
print(f" Overall MTTR: {mttr['overall_mttr_hours']:.2f} hours")
Availability
Availability
availability = reliability.calculate_availability()
print(f"\nAvailability Analysis:")
print(f" Inherent Availability: {availability['inherent_availability_pct']:.2f}%")
print(f" Actual Availability: {availability['actual_availability_pct']:.2f}%")
print(f" Total Failures: {availability['total_failures']}")
print(f" Total Downtime: {availability['total_downtime_hours']:.1f} hours")
availability = reliability.calculate_availability()
print(f"\nAvailability Analysis:")
print(f" Inherent Availability: {availability['inherent_availability_pct']:.2f}%")
print(f" Actual Availability: {availability['actual_availability_pct']:.2f}%")
print(f" Total Failures: {availability['total_failures']}")
print(f" Total Downtime: {availability['total_downtime_hours']:.1f} hours")
Weibull analysis
Weibull analysis
ttf_data = failure_data[failure_data['time_between_failures'].notna()]['time_between_failures'].values
if len(ttf_data) > 5:
weibull = reliability.weibull_analysis(ttf_data)
print(f"\nWeibull Analysis:")
print(f" Beta (Shape): {weibull['beta_shape']:.2f}")
print(f" Eta (Scale): {weibull['eta_scale']:.0f} hours")
print(f" Failure Pattern: {weibull['failure_pattern']}")
print(f" MTTF: {weibull['mttf']:.1f} hours")
fig = reliability.plot_weibull(weibull)
plt.show()ttf_data = failure_data[failure_data['time_between_failures'].notna()]['time_between_failures'].values
if len(ttf_data) > 5:
weibull = reliability.weibull_analysis(ttf_data)
print(f"\nWeibull Analysis:")
print(f" Beta (Shape): {weibull['beta_shape']:.2f}")
print(f" Eta (Scale): {weibull['eta_scale']:.0f} hours")
print(f" Failure Pattern: {weibull['failure_pattern']}")
print(f" MTTF: {weibull['mttf']:.1f} hours")
fig = reliability.plot_weibull(weibull)
plt.show()Failure trend
Failure trend
trend = reliability.failure_rate_trend()
print(f"\nFailure Rate Trend:")
print(f" Trend: {trend['trend']}")
print(f" Average Failures/Month: {trend['avg_failures_per_month']:.1f}")
---trend = reliability.failure_rate_trend()
print(f"\nFailure Rate Trend:")
print(f" Trend: {trend['trend']}")
print(f" Average Failures/Month: {trend['avg_failures_per_month']:.1f}")
---Preventive Maintenance Optimization
预防性维护优化
PM Interval Optimization
PM间隔优化
python
class PreventiveMaintenanceOptimizer:
"""
Optimize preventive maintenance intervals
Balance maintenance cost vs. failure cost
"""
def __init__(self, mtbf, mttr, pm_cost, failure_cost, pm_duration_hours):
"""
Parameters:
- mtbf: Mean Time Between Failures (hours)
- mttr: Mean Time To Repair (hours)
- pm_cost: Cost of preventive maintenance ($)
- failure_cost: Cost of failure (downtime + repair) ($)
- pm_duration_hours: Duration of PM activity (hours)
"""
self.mtbf = mtbf
self.mttr = mttr
self.pm_cost = pm_cost
self.failure_cost = failure_cost
self.pm_duration = pm_duration_hours
def calculate_optimal_interval(self, interval_range=(100, 2000, 50)):
"""
Find optimal PM interval that minimizes total cost
Total Cost = PM Cost + Failure Cost
"""
intervals = np.arange(*interval_range)
results = []
for T in intervals:
# Expected number of PMs per year (8760 hours)
n_pms = 8760 / T
# Probability of failure before PM
# Assuming exponential distribution
failure_prob = 1 - np.exp(-T / self.mtbf)
# Expected failures per year
n_failures = n_pms * failure_prob
# Total cost
annual_pm_cost = n_pms * self.pm_cost
annual_failure_cost = n_failures * self.failure_cost
total_cost = annual_pm_cost + annual_failure_cost
# Downtime
pm_downtime = n_pms * self.pm_duration
failure_downtime = n_failures * self.mttr
total_downtime = pm_downtime + failure_downtime
# Availability
availability = 1 - (total_downtime / 8760)
results.append({
'interval_hours': T,
'n_pms_per_year': n_pms,
'n_failures_per_year': n_failures,
'annual_pm_cost': annual_pm_cost,
'annual_failure_cost': annual_failure_cost,
'total_annual_cost': total_cost,
'total_downtime_hours': total_downtime,
'availability_pct': availability * 100
})
df = pd.DataFrame(results)
# Find optimal
optimal_idx = df['total_annual_cost'].idxmin()
optimal = df.loc[optimal_idx]
return {
'optimal_interval_hours': optimal['interval_hours'],
'optimal_interval_days': optimal['interval_hours'] / 24,
'total_annual_cost': optimal['total_annual_cost'],
'availability': optimal['availability_pct'],
'analysis': df
}
def plot_optimization(self, optimization_results):
"""Plot cost optimization curve"""
df = optimization_results['analysis']
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Cost curves
ax1.plot(df['interval_hours'], df['annual_pm_cost'], 'b-', linewidth=2, label='PM Cost')
ax1.plot(df['interval_hours'], df['annual_failure_cost'], 'r-', linewidth=2, label='Failure Cost')
ax1.plot(df['interval_hours'], df['total_annual_cost'], 'g-', linewidth=3, label='Total Cost')
# Mark optimal
optimal_interval = optimization_results['optimal_interval_hours']
optimal_cost = optimization_results['total_annual_cost']
ax1.plot(optimal_interval, optimal_cost, 'go', markersize=12, markeredgewidth=2,
markerfacecolor='yellow', label='Optimal')
ax1.set_xlabel('PM Interval (hours)', fontsize=12, fontweight='bold')
ax1.set_ylabel('Annual Cost ($)', fontsize=12, fontweight='bold')
ax1.set_title('Preventive Maintenance Interval Optimization', fontsize=14, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Availability curve
ax2.plot(df['interval_hours'], df['availability_pct'], 'b-', linewidth=2)
ax2.axhline(optimization_results['availability'], color='red', linestyle='--',
label=f"Optimal Availability: {optimization_results['availability']:.1f}%")
ax2.set_xlabel('PM Interval (hours)', fontsize=12, fontweight='bold')
ax2.set_ylabel('Availability (%)', fontsize=12, fontweight='bold')
ax2.set_title('Equipment Availability vs. PM Interval', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
return figpython
class PreventiveMaintenanceOptimizer:
"""
Optimize preventive maintenance intervals
Balance maintenance cost vs. failure cost
"""
def __init__(self, mtbf, mttr, pm_cost, failure_cost, pm_duration_hours):
"""
Parameters:
- mtbf: Mean Time Between Failures (hours)
- mttr: Mean Time To Repair (hours)
- pm_cost: Cost of preventive maintenance ($)
- failure_cost: Cost of failure (downtime + repair) ($)
- pm_duration_hours: Duration of PM activity (hours)
"""
self.mtbf = mtbf
self.mttr = mttr
self.pm_cost = pm_cost
self.failure_cost = failure_cost
self.pm_duration = pm_duration_hours
def calculate_optimal_interval(self, interval_range=(100, 2000, 50)):
"""
Find optimal PM interval that minimizes total cost
Total Cost = PM Cost + Failure Cost
"""
intervals = np.arange(*interval_range)
results = []
for T in intervals:
# Expected number of PMs per year (8760 hours)
n_pms = 8760 / T
# Probability of failure before PM
# Assuming exponential distribution
failure_prob = 1 - np.exp(-T / self.mtbf)
# Expected failures per year
n_failures = n_pms * failure_prob
# Total cost
annual_pm_cost = n_pms * self.pm_cost
annual_failure_cost = n_failures * self.failure_cost
total_cost = annual_pm_cost + annual_failure_cost
# Downtime
pm_downtime = n_pms * self.pm_duration
failure_downtime = n_failures * self.mttr
total_downtime = pm_downtime + failure_downtime
# Availability
availability = 1 - (total_downtime / 8760)
results.append({
'interval_hours': T,
'n_pms_per_year': n_pms,
'n_failures_per_year': n_failures,
'annual_pm_cost': annual_pm_cost,
'annual_failure_cost': annual_failure_cost,
'total_annual_cost': total_cost,
'total_downtime_hours': total_downtime,
'availability_pct': availability * 100
})
df = pd.DataFrame(results)
# Find optimal
optimal_idx = df['total_annual_cost'].idxmin()
optimal = df.loc[optimal_idx]
return {
'optimal_interval_hours': optimal['interval_hours'],
'optimal_interval_days': optimal['interval_hours'] / 24,
'total_annual_cost': optimal['total_annual_cost'],
'availability': optimal['availability_pct'],
'analysis': df
}
def plot_optimization(self, optimization_results):
"""Plot cost optimization curve"""
df = optimization_results['analysis']
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Cost curves
ax1.plot(df['interval_hours'], df['annual_pm_cost'], 'b-', linewidth=2, label='PM Cost')
ax1.plot(df['interval_hours'], df['annual_failure_cost'], 'r-', linewidth=2, label='Failure Cost')
ax1.plot(df['interval_hours'], df['total_annual_cost'], 'g-', linewidth=3, label='Total Cost')
# Mark optimal
optimal_interval = optimization_results['optimal_interval_hours']
optimal_cost = optimization_results['total_annual_cost']
ax1.plot(optimal_interval, optimal_cost, 'go', markersize=12, markeredgewidth=2,
markerfacecolor='yellow', label='Optimal')
ax1.set_xlabel('PM Interval (hours)', fontsize=12, fontweight='bold')
ax1.set_ylabel('Annual Cost ($)', fontsize=12, fontweight='bold')
ax1.set_title('Preventive Maintenance Interval Optimization', fontsize=14, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# Availability curve
ax2.plot(df['interval_hours'], df['availability_pct'], 'b-', linewidth=2)
ax2.axhline(optimization_results['availability'], color='red', linestyle='--',
label=f"Optimal Availability: {optimization_results['availability']:.1f}%")
ax2.set_xlabel('PM Interval (hours)', fontsize=12, fontweight='bold')
ax2.set_ylabel('Availability (%)', fontsize=12, fontweight='bold')
ax2.set_title('Equipment Availability vs. PM Interval', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
return figExample usage
Example usage
pm_optimizer = PreventiveMaintenanceOptimizer(
mtbf=720, # 720 hours (30 days) MTBF
mttr=8, # 8 hours MTTR
pm_cost=500, # $500 per PM
failure_cost=5000, # $5000 per failure
pm_duration_hours=4 # 4 hours PM duration
)
optimization = pm_optimizer.calculate_optimal_interval(interval_range=(100, 1500, 25))
print("PM Interval Optimization:")
print(f" Optimal Interval: {optimization['optimal_interval_hours']:.0f} hours ({optimization['optimal_interval_days']:.1f} days)")
print(f" Total Annual Cost: ${optimization['total_annual_cost']:,.0f}")
print(f" Expected Availability: {optimization['availability']:.2f}%")
fig = pm_optimizer.plot_optimization(optimization)
plt.show()
---pm_optimizer = PreventiveMaintenanceOptimizer(
mtbf=720, # 720 hours (30 days) MTBF
mttr=8, # 8 hours MTTR
pm_cost=500, # $500 per PM
failure_cost=5000, # $5000 per failure
pm_duration_hours=4 # 4 hours PM duration
)
optimization = pm_optimizer.calculate_optimal_interval(interval_range=(100, 1500, 25))
print("PM Interval Optimization:")
print(f" Optimal Interval: {optimization['optimal_interval_hours']:.0f} hours ({optimization['optimal_interval_days']:.1f} days)")
print(f" Total Annual Cost: ${optimization['total_annual_cost']:,.0f}")
print(f" Expected Availability: {optimization['availability']:.2f}%")
fig = pm_optimizer.plot_optimization(optimization)
plt.show()
---Predictive Maintenance
预测性维护
Condition Monitoring & Anomaly Detection
状态监测与异常检测
python
class PredictiveMaintenanceAnalyzer:
"""
Predictive maintenance using condition monitoring data
Anomaly detection and RUL (Remaining Useful Life) estimation
"""
def __init__(self, sensor_data, sensor_name):
"""
sensor_data: time series of sensor readings (e.g., vibration, temperature)
sensor_name: description of sensor
"""
self.data = np.array(sensor_data)
self.sensor_name = sensor_name
def detect_anomalies(self, threshold_sigma=3):
"""
Detect anomalies using statistical methods
Values beyond threshold_sigma standard deviations are flagged
Returns anomaly indices and scores
"""
mean = self.data.mean()
std = self.data.std()
# Z-score
z_scores = np.abs((self.data - mean) / std)
# Anomalies
anomalies = z_scores > threshold_sigma
anomaly_indices = np.where(anomalies)[0]
return {
'anomaly_count': anomalies.sum(),
'anomaly_indices': anomaly_indices,
'anomaly_values': self.data[anomaly_indices],
'z_scores': z_scores,
'mean': mean,
'std': std,
'threshold': threshold_sigma
}
def trend_analysis(self):
"""
Analyze trend in sensor data
Increasing trend may indicate degradation
"""
x = np.arange(len(self.data))
y = self.data
# Linear regression
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
# Determine trend
if p_value < 0.05:
if slope > 0:
trend = "Increasing (potential degradation)"
concern_level = "High" if slope > std_err * 2 else "Medium"
else:
trend = "Decreasing (improving or cooling)"
concern_level = "Low"
else:
trend = "Stable (no significant trend)"
concern_level = "Low"
return {
'trend': trend,
'slope': slope,
'r_squared': r_value ** 2,
'p_value': p_value,
'concern_level': concern_level
}
def estimate_rul(self, failure_threshold):
"""
Estimate Remaining Useful Life
Assumes linear degradation to failure threshold
Parameters:
- failure_threshold: sensor value at which failure occurs
Returns estimated RUL
"""
trend = self.trend_analysis()
if trend['slope'] <= 0:
return {
'rul_cycles': float('inf'),
'message': 'No degradation detected - RUL cannot be estimated'
}
# Current value (most recent)
current_value = self.data[-1]
# Cycles until threshold
rul_cycles = (failure_threshold - current_value) / trend['slope']
# Check if already exceeded
if rul_cycles < 0:
rul_cycles = 0
message = "Warning: Failure threshold already exceeded!"
else:
message = f"Estimated {rul_cycles:.0f} cycles remaining until failure threshold"
return {
'current_value': current_value,
'failure_threshold': failure_threshold,
'degradation_rate': trend['slope'],
'rul_cycles': max(0, rul_cycles),
'message': message
}
def maintenance_recommendation(self, rul_result, lead_time_cycles=50):
"""
Generate maintenance recommendation based on RUL
Parameters:
- rul_result: output from estimate_rul()
- lead_time_cycles: lead time needed to plan maintenance
"""
rul = rul_result['rul_cycles']
if rul == float('inf'):
recommendation = "Continue monitoring - no immediate action needed"
priority = "Low"
elif rul <= 0:
recommendation = "URGENT: Schedule immediate maintenance"
priority = "Critical"
elif rul < lead_time_cycles:
recommendation = "Schedule maintenance soon - approaching failure threshold"
priority = "High"
elif rul < lead_time_cycles * 2:
recommendation = "Plan maintenance within next planning cycle"
priority = "Medium"
else:
recommendation = "Continue monitoring - sufficient remaining life"
priority = "Low"
return {
'recommendation': recommendation,
'priority': priority,
'estimated_rul': rul,
'lead_time': lead_time_cycles
}
def plot_condition_monitoring(self, anomaly_results, trend_results, rul_results=None):
"""Plot condition monitoring dashboard"""
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Time series with anomalies
x = np.arange(len(self.data))
ax1.plot(x, self.data, 'b-', linewidth=1, label='Sensor Reading')
# Mark anomalies
if len(anomaly_results['anomaly_indices']) > 0:
ax1.plot(anomaly_results['anomaly_indices'],
anomaly_results['anomaly_values'],
'ro', markersize=8, label='Anomalies')
# Mean and control limits
mean = anomaly_results['mean']
std = anomaly_results['std']
threshold = anomaly_results['threshold']
ax1.axhline(mean, color='green', linestyle='-', linewidth=2, label='Mean')
ax1.axhline(mean + threshold * std, color='red', linestyle='--', linewidth=1.5, label='Upper Limit')
ax1.axhline(mean - threshold * std, color='red', linestyle='--', linewidth=1.5, label='Lower Limit')
# Trend line
if trend_results['p_value'] < 0.05:
trend_line = trend_results['slope'] * x + (self.data[0] - trend_results['slope'] * 0)
ax1.plot(x, trend_line, 'orange', linestyle='--', linewidth=2, label='Trend')
ax1.set_xlabel('Cycle / Time', fontsize=12, fontweight='bold')
ax1.set_ylabel(f'{self.sensor_name}', fontsize=12, fontweight='bold')
ax1.set_title(f'Condition Monitoring - {self.sensor_name}\nTrend: {trend_results["trend"]}',
fontsize=14, fontweight='bold')
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)
# Z-scores (anomaly scores)
ax2.plot(x, anomaly_results['z_scores'], 'b-', linewidth=1)
ax2.axhline(threshold, color='red', linestyle='--', linewidth=2, label=f'Threshold ({threshold}σ)')
ax2.fill_between(x, 0, threshold, alpha=0.2, color='green', label='Normal')
ax2.fill_between(x, threshold, anomaly_results['z_scores'].max() + 1,
alpha=0.2, color='red', label='Anomaly Zone')
ax2.set_xlabel('Cycle / Time', fontsize=12, fontweight='bold')
ax2.set_ylabel('Anomaly Score (Z-score)', fontsize=12, fontweight='bold')
ax2.set_title('Anomaly Detection', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
return figpython
class PredictiveMaintenanceAnalyzer:
"""
Predictive maintenance using condition monitoring data
Anomaly detection and RUL (Remaining Useful Life) estimation
"""
def __init__(self, sensor_data, sensor_name):
"""
sensor_data: time series of sensor readings (e.g., vibration, temperature)
sensor_name: description of sensor
"""
self.data = np.array(sensor_data)
self.sensor_name = sensor_name
def detect_anomalies(self, threshold_sigma=3):
"""
Detect anomalies using statistical methods
Values beyond threshold_sigma standard deviations are flagged
Returns anomaly indices and scores
"""
mean = self.data.mean()
std = self.data.std()
# Z-score
z_scores = np.abs((self.data - mean) / std)
# Anomalies
anomalies = z_scores > threshold_sigma
anomaly_indices = np.where(anomalies)[0]
return {
'anomaly_count': anomalies.sum(),
'anomaly_indices': anomaly_indices,
'anomaly_values': self.data[anomaly_indices],
'z_scores': z_scores,
'mean': mean,
'std': std,
'threshold': threshold_sigma
}
def trend_analysis(self):
"""
Analyze trend in sensor data
Increasing trend may indicate degradation
"""
x = np.arange(len(self.data))
y = self.data
# Linear regression
slope, intercept, r_value, p_value, std_err = stats.linregress(x, y)
# Determine trend
if p_value < 0.05:
if slope > 0:
trend = "Increasing (potential degradation)"
concern_level = "High" if slope > std_err * 2 else "Medium"
else:
trend = "Decreasing (improving or cooling)"
concern_level = "Low"
else:
trend = "Stable (no significant trend)"
concern_level = "Low"
return {
'trend': trend,
'slope': slope,
'r_squared': r_value ** 2,
'p_value': p_value,
'concern_level': concern_level
}
def estimate_rul(self, failure_threshold):
"""
Estimate Remaining Useful Life
Assumes linear degradation to failure threshold
Parameters:
- failure_threshold: sensor value at which failure occurs
Returns estimated RUL
"""
trend = self.trend_analysis()
if trend['slope'] <= 0:
return {
'rul_cycles': float('inf'),
'message': 'No degradation detected - RUL cannot be estimated'
}
# Current value (most recent)
current_value = self.data[-1]
# Cycles until threshold
rul_cycles = (failure_threshold - current_value) / trend['slope']
# Check if already exceeded
if rul_cycles < 0:
rul_cycles = 0
message = "Warning: Failure threshold already exceeded!"
else:
message = f"Estimated {rul_cycles:.0f} cycles remaining until failure threshold"
return {
'current_value': current_value,
'failure_threshold': failure_threshold,
'degradation_rate': trend['slope'],
'rul_cycles': max(0, rul_cycles),
'message': message
}
def maintenance_recommendation(self, rul_result, lead_time_cycles=50):
"""
Generate maintenance recommendation based on RUL
Parameters:
- rul_result: output from estimate_rul()
- lead_time_cycles: lead time needed to plan maintenance
"""
rul = rul_result['rul_cycles']
if rul == float('inf'):
recommendation = "Continue monitoring - no immediate action needed"
priority = "Low"
elif rul <= 0:
recommendation = "URGENT: Schedule immediate maintenance"
priority = "Critical"
elif rul < lead_time_cycles:
recommendation = "Schedule maintenance soon - approaching failure threshold"
priority = "High"
elif rul < lead_time_cycles * 2:
recommendation = "Plan maintenance within next planning cycle"
priority = "Medium"
else:
recommendation = "Continue monitoring - sufficient remaining life"
priority = "Low"
return {
'recommendation': recommendation,
'priority': priority,
'estimated_rul': rul,
'lead_time': lead_time_cycles
}
def plot_condition_monitoring(self, anomaly_results, trend_results, rul_results=None):
"""Plot condition monitoring dashboard"""
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8))
# Time series with anomalies
x = np.arange(len(self.data))
ax1.plot(x, self.data, 'b-', linewidth=1, label='Sensor Reading')
# Mark anomalies
if len(anomaly_results['anomaly_indices']) > 0:
ax1.plot(anomaly_results['anomaly_indices'],
anomaly_results['anomaly_values'],
'ro', markersize=8, label='Anomalies')
# Mean and control limits
mean = anomaly_results['mean']
std = anomaly_results['std']
threshold = anomaly_results['threshold']
ax1.axhline(mean, color='green', linestyle='-', linewidth=2, label='Mean')
ax1.axhline(mean + threshold * std, color='red', linestyle='--', linewidth=1.5, label='Upper Limit')
ax1.axhline(mean - threshold * std, color='red', linestyle='--', linewidth=1.5, label='Lower Limit')
# Trend line
if trend_results['p_value'] < 0.05:
trend_line = trend_results['slope'] * x + (self.data[0] - trend_results['slope'] * 0)
ax1.plot(x, trend_line, 'orange', linestyle='--', linewidth=2, label='Trend')
ax1.set_xlabel('Cycle / Time', fontsize=12, fontweight='bold')
ax1.set_ylabel(f'{self.sensor_name}', fontsize=12, fontweight='bold')
ax1.set_title(f'Condition Monitoring - {self.sensor_name}\nTrend: {trend_results["trend"]}',
fontsize=14, fontweight='bold')
ax1.legend(loc='upper left')
ax1.grid(True, alpha=0.3)
# Z-scores (anomaly scores)
ax2.plot(x, anomaly_results['z_scores'], 'b-', linewidth=1)
ax2.axhline(threshold, color='red', linestyle='--', linewidth=2, label=f'Threshold ({threshold}σ)')
ax2.fill_between(x, 0, threshold, alpha=0.2, color='green', label='Normal')
ax2.fill_between(x, threshold, anomaly_results['z_scores'].max() + 1,
alpha=0.2, color='red', label='Anomaly Zone')
ax2.set_xlabel('Cycle / Time', fontsize=12, fontweight='bold')
ax2.set_ylabel('Anomaly Score (Z-score)', fontsize=12, fontweight='bold')
ax2.set_title('Anomaly Detection', fontsize=12, fontweight='bold')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.tight_layout()
return figExample usage
Example usage
np.random.seed(42)
np.random.seed(42)
Simulate sensor data with degradation trend and some anomalies
Simulate sensor data with degradation trend and some anomalies
n_cycles = 200
baseline = 100
degradation_rate = 0.1
noise = np.random.normal(0, 3, n_cycles)
sensor_data = baseline + degradation_rate * np.arange(n_cycles) + noise
n_cycles = 200
baseline = 100
degradation_rate = 0.1
noise = np.random.normal(0, 3, n_cycles)
sensor_data = baseline + degradation_rate * np.arange(n_cycles) + noise
Add some anomalies
Add some anomalies
sensor_data[50] = 130 # Spike
sensor_data[100] = 135
sensor_data[150] = 140
pdm = PredictiveMaintenanceAnalyzer(sensor_data, "Vibration (mm/s)")
sensor_data[50] = 130 # Spike
sensor_data[100] = 135
sensor_data[150] = 140
pdm = PredictiveMaintenanceAnalyzer(sensor_data, "Vibration (mm/s)")
Detect anomalies
Detect anomalies
anomalies = pdm.detect_anomalies(threshold_sigma=3)
print(f"Anomaly Detection:")
print(f" Anomalies Found: {anomalies['anomaly_count']}")
print(f" Anomaly Indices: {anomalies['anomaly_indices']}")
anomalies = pdm.detect_anomalies(threshold_sigma=3)
print(f"Anomaly Detection:")
print(f" Anomalies Found: {anomalies['anomaly_count']}")
print(f" Anomaly Indices: {anomalies['anomaly_indices']}")
Trend analysis
Trend analysis
trend = pdm.trend_analysis()
print(f"\nTrend Analysis:")
print(f" Trend: {trend['trend']}")
print(f" Degradation Rate: {trend['slope']:.4f} per cycle")
print(f" R-squared: {trend['r_squared']:.3f}")
print(f" Concern Level: {trend['concern_level']}")
trend = pdm.trend_analysis()
print(f"\nTrend Analysis:")
print(f" Trend: {trend['trend']}")
print(f" Degradation Rate: {trend['slope']:.4f} per cycle")
print(f" R-squared: {trend['r_squared']:.3f}")
print(f" Concern Level: {trend['concern_level']}")
RUL estimation
RUL estimation
failure_threshold = 150 # Failure occurs at 150 mm/s
rul = pdm.estimate_rul(failure_threshold)
print(f"\nRUL Estimation:")
print(f" Current Value: {rul['current_value']:.1f}")
print(f" Failure Threshold: {rul['failure_threshold']}")
print(f" {rul['message']}")
failure_threshold = 150 # Failure occurs at 150 mm/s
rul = pdm.estimate_rul(failure_threshold)
print(f"\nRUL Estimation:")
print(f" Current Value: {rul['current_value']:.1f}")
print(f" Failure Threshold: {rul['failure_threshold']}")
print(f" {rul['message']}")
Maintenance recommendation
Maintenance recommendation
recommendation = pdm.maintenance_recommendation(rul, lead_time_cycles=50)
print(f"\nMaintenance Recommendation:")
print(f" Priority: {recommendation['priority']}")
print(f" Action: {recommendation['recommendation']}")
recommendation = pdm.maintenance_recommendation(rul, lead_time_cycles=50)
print(f"\nMaintenance Recommendation:")
print(f" Priority: {recommendation['priority']}")
print(f" Action: {recommendation['recommendation']}")
Plot
Plot
fig = pdm.plot_condition_monitoring(anomalies, trend, rul)
plt.show()
---fig = pdm.plot_condition_monitoring(anomalies, trend, rul)
plt.show()
---Total Productive Maintenance (TPM)
全面生产维护(TPM)
OEE Calculation and Analysis
OEE计算与分析
python
class OEEAnalysis:
"""
Overall Equipment Effectiveness (OEE) calculation
OEE = Availability × Performance × Quality
"""
def __init__(self, shift_data):
"""
shift_data: DataFrame with production shift information
Columns:
- planned_production_time (minutes)
- downtime (minutes)
- units_produced
- ideal_cycle_time (minutes per unit)
- defects
"""
self.data = shift_data
def calculate_oee(self):
"""
Calculate OEE and its components
Availability = Operating Time / Planned Production Time
Performance = (Actual Output / Max Possible Output)
Quality = Good Units / Total Units
OEE = Availability × Performance × Quality
"""
results = []
for idx, row in self.data.iterrows():
# Availability
operating_time = row['planned_production_time'] - row['downtime']
availability = operating_time / row['planned_production_time']
# Performance
ideal_time_for_output = row['units_produced'] * row['ideal_cycle_time']
performance = ideal_time_for_output / operating_time if operating_time > 0 else 0
# Quality
good_units = row['units_produced'] - row['defects']
quality = good_units / row['units_produced'] if row['units_produced'] > 0 else 0
# OEE
oee = availability * performance * quality
results.append({
'shift': idx,
'availability': availability * 100,
'performance': performance * 100,
'quality': quality * 100,
'oee': oee * 100,
'operating_time': operating_time,
'good_units': good_units
})
df_results = pd.DataFrame(results)
# Overall averages
overall = {
'avg_availability': df_results['availability'].mean(),
'avg_performance': df_results['performance'].mean(),
'avg_quality': df_results['quality'].mean(),
'avg_oee': df_results['oee'].mean(),
'world_class_gap': 85 - df_results['oee'].mean(), # World class = 85%+
'results_by_shift': df_results
}
return overall
def six_big_losses_analysis(self, breakdown_hours, setup_hours,
small_stops_hours, speed_loss_pct,
process_defects, reduced_yield_pct):
"""
Analyze TPM Six Big Losses
1. Breakdowns (Availability)
2. Setup/Changeovers (Availability)
3. Small Stops (Performance)
4. Speed Loss (Performance)
5. Process Defects (Quality)
6. Reduced Yield (Quality)
"""
total_time = self.data['planned_production_time'].sum() / 60 # Convert to hours
losses = {
'Breakdowns': {
'hours': breakdown_hours,
'pct_of_total': (breakdown_hours / total_time) * 100,
'category': 'Availability'
},
'Setup/Changeover': {
'hours': setup_hours,
'pct_of_total': (setup_hours / total_time) * 100,
'category': 'Availability'
},
'Small Stops': {
'hours': small_stops_hours,
'pct_of_total': (small_stops_hours / total_time) * 100,
'category': 'Performance'
},
'Speed Loss': {
'hours': total_time * (speed_loss_pct / 100),
'pct_of_total': speed_loss_pct,
'category': 'Performance'
},
'Process Defects': {
'units': process_defects,
'pct_of_total': (process_defects / self.data['units_produced'].sum()) * 100,
'category': 'Quality'
},
'Reduced Yield': {
'pct_of_total': reduced_yield_pct,
'category': 'Quality'
}
}
return pd.DataFrame(losses).T
def plot_oee_dashboard(self, oee_results):
"""Create OEE dashboard visualization"""
df = oee_results['results_by_shift']
fig = plt.figure(figsize=(14, 10))
gs = fig.add_gridspec(3, 2, hspace=0.3, wspace=0.3)
# OEE over time
ax1 = fig.add_subplot(gs[0, :])
ax1.plot(df['shift'], df['oee'], 'bo-', linewidth=2, markersize=8, label='OEE')
ax1.axhline(85, color='green', linestyle='--', linewidth=2, label='World Class (85%)')
ax1.axhline(oee_results['avg_oee'], color='orange', linestyle='-', linewidth=2, label='Average')
ax1.set_xlabel('Shift', fontsize=12, fontweight='bold')
ax1.set_ylabel('OEE (%)', fontsize=12, fontweight='bold')
ax1.set_title('Overall Equipment Effectiveness (OEE) by Shift', fontsize=14, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# OEE components (average)
ax2 = fig.add_subplot(gs[1, 0])
components = ['Availability', 'Performance', 'Quality', 'OEE']
values = [
oee_results['avg_availability'],
oee_results['avg_performance'],
oee_results['avg_quality'],
oee_results['avg_oee']
]
colors = ['skyblue', 'lightgreen', 'lightcoral', 'gold']
bars = ax2.bar(components, values, color=colors, edgecolor='black', linewidth=1.5)
ax2.axhline(85, color='green', linestyle='--', linewidth=2, label='World Class')
ax2.set_ylabel('Percentage (%)', fontsize=12, fontweight='bold')
ax2.set_title('Average OEE Components', fontsize=12, fontweight='bold')
ax2.set_ylim([0, 100])
ax2.legend()
ax2.grid(True, axis='y', alpha=0.3)
# Add value labels on bars
for bar in bars:
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.1f}%', ha='center', va='bottom', fontweight='bold')
# Component trends
ax3 = fig.add_subplot(gs[1, 1])
ax3.plot(df['shift'], df['availability'], 'o-', label='Availability', linewidth=2)
ax3.plot(df['shift'], df['performance'], 's-', label='Performance', linewidth=2)
ax3.plot(df['shift'], df['quality'], '^-', label='Quality', linewidth=2)
ax3.set_xlabel('Shift', fontsize=12, fontweight='bold')
ax3.set_ylabel('Percentage (%)', fontsize=12, fontweight='bold')
ax3.set_title('OEE Component Trends', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Performance metrics table
ax4 = fig.add_subplot(gs[2, :])
ax4.axis('tight')
ax4.axis('off')
table_data = [
['Metric', 'Value', 'World Class', 'Gap'],
['Availability', f"{oee_results['avg_availability']:.1f}%", '90%', f"{90 - oee_results['avg_availability']:.1f}%"],
['Performance', f"{oee_results['avg_performance']:.1f}%", '95%', f"{95 - oee_results['avg_performance']:.1f}%"],
['Quality', f"{oee_results['avg_quality']:.1f}%", '99%', f"{99 - oee_results['avg_quality']:.1f}%"],
['OEE', f"{oee_results['avg_oee']:.1f}%", '85%', f"{oee_results['world_class_gap']:.1f}%"]
]
table = ax4.table(cellText=table_data, cellLoc='center', loc='center',
colWidths=[0.3, 0.2, 0.25, 0.25])
table.auto_set_font_size(False)
table.set_fontsize(11)
table.scale(1, 2)
# Style header row
for i in range(4):
table[(0, i)].set_facecolor('#4CAF50')
table[(0, i)].set_text_props(weight='bold', color='white')
plt.suptitle('OEE Performance Dashboard', fontsize=16, fontweight='bold', y=0.98)
return figpython
class OEEAnalysis:
"""
Overall Equipment Effectiveness (OEE) calculation
OEE = Availability × Performance × Quality
"""
def __init__(self, shift_data):
"""
shift_data: DataFrame with production shift information
Columns:
- planned_production_time (minutes)
- downtime (minutes)
- units_produced
- ideal_cycle_time (minutes per unit)
- defects
"""
self.data = shift_data
def calculate_oee(self):
"""
Calculate OEE and its components
Availability = Operating Time / Planned Production Time
Performance = (Actual Output / Max Possible Output)
Quality = Good Units / Total Units
OEE = Availability × Performance × Quality
"""
results = []
for idx, row in self.data.iterrows():
# Availability
operating_time = row['planned_production_time'] - row['downtime']
availability = operating_time / row['planned_production_time']
# Performance
ideal_time_for_output = row['units_produced'] * row['ideal_cycle_time']
performance = ideal_time_for_output / operating_time if operating_time > 0 else 0
# Quality
good_units = row['units_produced'] - row['defects']
quality = good_units / row['units_produced'] if row['units_produced'] > 0 else 0
# OEE
oee = availability * performance * quality
results.append({
'shift': idx,
'availability': availability * 100,
'performance': performance * 100,
'quality': quality * 100,
'oee': oee * 100,
'operating_time': operating_time,
'good_units': good_units
})
df_results = pd.DataFrame(results)
# Overall averages
overall = {
'avg_availability': df_results['availability'].mean(),
'avg_performance': df_results['performance'].mean(),
'avg_quality': df_results['quality'].mean(),
'avg_oee': df_results['oee'].mean(),
'world_class_gap': 85 - df_results['oee'].mean(), # World class = 85%+
'results_by_shift': df_results
}
return overall
def six_big_losses_analysis(self, breakdown_hours, setup_hours,
small_stops_hours, speed_loss_pct,
process_defects, reduced_yield_pct):
"""
Analyze TPM Six Big Losses
1. Breakdowns (Availability)
2. Setup/Changeovers (Availability)
3. Small Stops (Performance)
4. Speed Loss (Performance)
5. Process Defects (Quality)
6. Reduced Yield (Quality)
"""
total_time = self.data['planned_production_time'].sum() / 60 # Convert to hours
losses = {
'Breakdowns': {
'hours': breakdown_hours,
'pct_of_total': (breakdown_hours / total_time) * 100,
'category': 'Availability'
},
'Setup/Changeover': {
'hours': setup_hours,
'pct_of_total': (setup_hours / total_time) * 100,
'category': 'Availability'
},
'Small Stops': {
'hours': small_stops_hours,
'pct_of_total': (small_stops_hours / total_time) * 100,
'category': 'Performance'
},
'Speed Loss': {
'hours': total_time * (speed_loss_pct / 100),
'pct_of_total': speed_loss_pct,
'category': 'Performance'
},
'Process Defects': {
'units': process_defects,
'pct_of_total': (process_defects / self.data['units_produced'].sum()) * 100,
'category': 'Quality'
},
'Reduced Yield': {
'pct_of_total': reduced_yield_pct,
'category': 'Quality'
}
}
return pd.DataFrame(losses).T
def plot_oee_dashboard(self, oee_results):
"""Create OEE dashboard visualization"""
df = oee_results['results_by_shift']
fig = plt.figure(figsize=(14, 10))
gs = fig.add_gridspec(3, 2, hspace=0.3, wspace=0.3)
# OEE over time
ax1 = fig.add_subplot(gs[0, :])
ax1.plot(df['shift'], df['oee'], 'bo-', linewidth=2, markersize=8, label='OEE')
ax1.axhline(85, color='green', linestyle='--', linewidth=2, label='World Class (85%)')
ax1.axhline(oee_results['avg_oee'], color='orange', linestyle='-', linewidth=2, label='Average')
ax1.set_xlabel('Shift', fontsize=12, fontweight='bold')
ax1.set_ylabel('OEE (%)', fontsize=12, fontweight='bold')
ax1.set_title('Overall Equipment Effectiveness (OEE) by Shift', fontsize=14, fontweight='bold')
ax1.legend()
ax1.grid(True, alpha=0.3)
# OEE components (average)
ax2 = fig.add_subplot(gs[1, 0])
components = ['Availability', 'Performance', 'Quality', 'OEE']
values = [
oee_results['avg_availability'],
oee_results['avg_performance'],
oee_results['avg_quality'],
oee_results['avg_oee']
]
colors = ['skyblue', 'lightgreen', 'lightcoral', 'gold']
bars = ax2.bar(components, values, color=colors, edgecolor='black', linewidth=1.5)
ax2.axhline(85, color='green', linestyle='--', linewidth=2, label='World Class')
ax2.set_ylabel('Percentage (%)', fontsize=12, fontweight='bold')
ax2.set_title('Average OEE Components', fontsize=12, fontweight='bold')
ax2.set_ylim([0, 100])
ax2.legend()
ax2.grid(True, axis='y', alpha=0.3)
# Add value labels on bars
for bar in bars:
height = bar.get_height()
ax2.text(bar.get_x() + bar.get_width()/2., height,
f'{height:.1f}%', ha='center', va='bottom', fontweight='bold')
# Component trends
ax3 = fig.add_subplot(gs[1, 1])
ax3.plot(df['shift'], df['availability'], 'o-', label='Availability', linewidth=2)
ax3.plot(df['shift'], df['performance'], 's-', label='Performance', linewidth=2)
ax3.plot(df['shift'], df['quality'], '^-', label='Quality', linewidth=2)
ax3.set_xlabel('Shift', fontsize=12, fontweight='bold')
ax3.set_ylabel('Percentage (%)', fontsize=12, fontweight='bold')
ax3.set_title('OEE Component Trends', fontsize=12, fontweight='bold')
ax3.legend()
ax3.grid(True, alpha=0.3)
# Performance metrics table
ax4 = fig.add_subplot(gs[2, :])
ax4.axis('tight')
ax4.axis('off')
table_data = [
['Metric', 'Value', 'World Class', 'Gap'],
['Availability', f"{oee_results['avg_availability']:.1f}%", '90%', f"{90 - oee_results['avg_availability']:.1f}%"],
['Performance', f"{oee_results['avg_performance']:.1f}%", '95%', f"{95 - oee_results['avg_performance']:.1f}%"],
['Quality', f"{oee_results['avg_quality']:.1f}%", '99%', f"{99 - oee_results['avg_quality']:.1f}%"],
['OEE', f"{oee_results['avg_oee']:.1f}%", '85%', f"{oee_results['world_class_gap']:.1f}%"]
]
table = ax4.table(cellText=table_data, cellLoc='center', loc='center',
colWidths=[0.3, 0.2, 0.25, 0.25])
table.auto_set_font_size(False)
table.set_fontsize(11)
table.scale(1, 2)
# Style header row
for i in range(4):
table[(0, i)].set_facecolor('#4CAF50')
table[(0, i)].set_text_props(weight='bold', color='white')
plt.suptitle('OEE Performance Dashboard', fontsize=16, fontweight='bold', y=0.98)
return figExample usage
Example usage
shift_data = pd.DataFrame({
'planned_production_time': [480, 480, 480, 480, 480], # 8-hour shifts in minutes
'downtime': [45, 30, 50, 35, 40],
'units_produced': [850, 920, 800, 890, 870],
'ideal_cycle_time': [0.5, 0.5, 0.5, 0.5, 0.5], # 0.5 min per unit
'defects': [25, 18, 35, 22, 20]
})
oee = OEEAnalysis(shift_data)
shift_data = pd.DataFrame({
'planned_production_time': [480, 480, 480, 480, 480], # 8-hour shifts in minutes
'downtime': [45, 30, 50, 35, 40],
'units_produced': [850, 920, 800, 890, 870],
'ideal_cycle_time': [0.5, 0.5, 0.5, 0.5, 0.5], # 0.5 min per unit
'defects': [25, 18, 35, 22, 20]
})
oee = OEEAnalysis(shift_data)
Calculate OEE
Calculate OEE
results = oee.calculate_oee()
print("OEE Analysis:")
print(f" Average Availability: {results['avg_availability']:.1f}%")
print(f" Average Performance: {results['avg_performance']:.1f}%")
print(f" Average Quality: {results['avg_quality']:.1f}%")
print(f" Average OEE: {results['avg_oee']:.1f}%")
print(f" Gap to World Class (85%): {results['world_class_gap']:.1f}%")
results = oee.calculate_oee()
print("OEE Analysis:")
print(f" Average Availability: {results['avg_availability']:.1f}%")
print(f" Average Performance: {results['avg_performance']:.1f}%")
print(f" Average Quality: {results['avg_quality']:.1f}%")
print(f" Average OEE: {results['avg_oee']:.1f}%")
print(f" Gap to World Class (85%): {results['world_class_gap']:.1f}%")
Six Big Losses
Six Big Losses
losses = oee.six_big_losses_analysis(
breakdown_hours=15,
setup_hours=12,
small_stops_hours=8,
speed_loss_pct=5,
process_defects=120,
reduced_yield_pct=2
)
print("\nSix Big Losses Analysis:")
print(losses)
losses = oee.six_big_losses_analysis(
breakdown_hours=15,
setup_hours=12,
small_stops_hours=8,
speed_loss_pct=5,
process_defects=120,
reduced_yield_pct=2
)
print("\nSix Big Losses Analysis:")
print(losses)
Plot
Plot
fig = oee.plot_oee_dashboard(results)
plt.show()
---fig = oee.plot_oee_dashboard(results)
plt.show()
---Spare Parts Optimization
备件优化
python
class SparePartsOptimization:
"""
Optimize spare parts inventory
Balance holding costs vs. stockout costs
"""
def __init__(self, annual_demand, unit_cost, ordering_cost, holding_cost_pct,
lead_time_days, stockout_cost):
"""
Parameters:
- annual_demand: annual usage of spare part
- unit_cost: cost per unit
- ordering_cost: fixed cost per order
- holding_cost_pct: annual holding cost as % of unit cost
- lead_time_days: replenishment lead time
- stockout_cost: cost per stockout incident
"""
self.annual_demand = annual_demand
self.unit_cost = unit_cost
self.ordering_cost = ordering_cost
self.holding_cost = unit_cost * holding_cost_pct
self.lead_time_days = lead_time_days
self.stockout_cost = stockout_cost
def economic_order_quantity(self):
"""
Calculate EOQ (Economic Order Quantity)
EOQ = sqrt((2 * D * S) / H)
Where:
- D = annual demand
- S = ordering cost
- H = holding cost per unit per year
"""
eoq = np.sqrt((2 * self.annual_demand * self.ordering_cost) / self.holding_cost)
# Number of orders per year
n_orders = self.annual_demand / eoq
# Total cost
ordering_cost_total = n_orders * self.ordering_cost
holding_cost_total = (eoq / 2) * self.holding_cost
total_cost = ordering_cost_total + holding_cost_total
return {
'eoq': eoq,
'orders_per_year': n_orders,
'ordering_cost': ordering_cost_total,
'holding_cost': holding_cost_total,
'total_cost': total_cost
}
def reorder_point(self, demand_std_dev, service_level=0.95):
"""
Calculate reorder point with safety stock
ROP = (Average Daily Demand × Lead Time) + Safety Stock
Safety Stock = Z × σ_demand × sqrt(lead_time)
"""
avg_daily_demand = self.annual_demand / 365
# Z-score for service level
z_score = stats.norm.ppf(service_level)
# Safety stock
safety_stock = z_score * demand_std_dev * np.sqrt(self.lead_time_days)
# Reorder point
rop = (avg_daily_demand * self.lead_time_days) + safety_stock
return {
'reorder_point': rop,
'safety_stock': safety_stock,
'avg_daily_demand': avg_daily_demand,
'service_level': service_level * 100
}
def abc_classification(self, parts_data):
"""
ABC classification of spare parts based on annual value
parts_data: DataFrame with 'part_id', 'annual_usage', 'unit_cost'
"""
df = parts_data.copy()
# Calculate annual value
df['annual_value'] = df['annual_usage'] * df['unit_cost']
# Sort by value descending
df = df.sort_values('annual_value', ascending=False)
# Calculate cumulative percentage
total_value = df['annual_value'].sum()
df['cumulative_value'] = df['annual_value'].cumsum()
df['cumulative_pct'] = (df['cumulative_value'] / total_value) * 100
# Classify
def classify(row):
if row['cumulative_pct'] <= 80:
return 'A'
elif row['cumulative_pct'] <= 95:
return 'B'
else:
return 'C'
df['abc_class'] = df.apply(classify, axis=1)
return dfpython
class SparePartsOptimization:
"""
Optimize spare parts inventory
Balance holding costs vs. stockout costs
"""
def __init__(self, annual_demand, unit_cost, ordering_cost, holding_cost_pct,
lead_time_days, stockout_cost):
"""
Parameters:
- annual_demand: annual usage of spare part
- unit_cost: cost per unit
- ordering_cost: fixed cost per order
- holding_cost_pct: annual holding cost as % of unit cost
- lead_time_days: replenishment lead time
- stockout_cost: cost per stockout incident
"""
self.annual_demand = annual_demand
self.unit_cost = unit_cost
self.ordering_cost = ordering_cost
self.holding_cost = unit_cost * holding_cost_pct
self.lead_time_days = lead_time_days
self.stockout_cost = stockout_cost
def economic_order_quantity(self):
"""
Calculate EOQ (Economic Order Quantity)
EOQ = sqrt((2 * D * S) / H)
Where:
- D = annual demand
- S = ordering cost
- H = holding cost per unit per year
"""
eoq = np.sqrt((2 * self.annual_demand * self.ordering_cost) / self.holding_cost)
# Number of orders per year
n_orders = self.annual_demand / eoq
# Total cost
ordering_cost_total = n_orders * self.ordering_cost
holding_cost_total = (eoq / 2) * self.holding_cost
total_cost = ordering_cost_total + holding_cost_total
return {
'eoq': eoq,
'orders_per_year': n_orders,
'ordering_cost': ordering_cost_total,
'holding_cost': holding_cost_total,
'total_cost': total_cost
}
def reorder_point(self, demand_std_dev, service_level=0.95):
"""
Calculate reorder point with safety stock
ROP = (Average Daily Demand × Lead Time) + Safety Stock
Safety Stock = Z × σ_demand × sqrt(lead_time)
"""
avg_daily_demand = self.annual_demand / 365
# Z-score for service level
z_score = stats.norm.ppf(service_level)
# Safety stock
safety_stock = z_score * demand_std_dev * np.sqrt(self.lead_time_days)
# Reorder point
rop = (avg_daily_demand * self.lead_time_days) + safety_stock
return {
'reorder_point': rop,
'safety_stock': safety_stock,
'avg_daily_demand': avg_daily_demand,
'service_level': service_level * 100
}
def abc_classification(self, parts_data):
"""
ABC classification of spare parts based on annual value
parts_data: DataFrame with 'part_id', 'annual_usage', 'unit_cost'
"""
df = parts_data.copy()
# Calculate annual value
df['annual_value'] = df['annual_usage'] * df['unit_cost']
# Sort by value descending
df = df.sort_values('annual_value', ascending=False)
# Calculate cumulative percentage
total_value = df['annual_value'].sum()
df['cumulative_value'] = df['annual_value'].cumsum()
df['cumulative_pct'] = (df['cumulative_value'] / total_value) * 100
# Classify
def classify(row):
if row['cumulative_pct'] <= 80:
return 'A'
elif row['cumulative_pct'] <= 95:
return 'B'
else:
return 'C'
df['abc_class'] = df.apply(classify, axis=1)
return dfExample usage
Example usage
spare_part = SparePartsOptimization(
annual_demand=120, # 120 units per year (10 per month)
unit_cost=500,
ordering_cost=100,
holding_cost_pct=0.25, # 25% of unit cost
lead_time_days=30,
stockout_cost=5000
)
spare_part = SparePartsOptimization(
annual_demand=120, # 120 units per year (10 per month)
unit_cost=500,
ordering_cost=100,
holding_cost_pct=0.25, # 25% of unit cost
lead_time_days=30,
stockout_cost=5000
)
EOQ
EOQ
eoq = spare_part.economic_order_quantity()
print("Economic Order Quantity:")
print(f" EOQ: {eoq['eoq']:.0f} units")
print(f" Orders per Year: {eoq['orders_per_year']:.1f}")
print(f" Total Annual Cost: ${eoq['total_cost']:,.0f}")
eoq = spare_part.economic_order_quantity()
print("Economic Order Quantity:")
print(f" EOQ: {eoq['eoq']:.0f} units")
print(f" Orders per Year: {eoq['orders_per_year']:.1f}")
print(f" Total Annual Cost: ${eoq['total_cost']:,.0f}")
Reorder point
Reorder point
rop = spare_part.reorder_point(demand_std_dev=15, service_level=0.95)
print(f"\nReorder Point:")
print(f" ROP: {rop['reorder_point']:.0f} units")
print(f" Safety Stock: {rop['safety_stock']:.0f} units")
print(f" Service Level: {rop['service_level']:.0f}%")
rop = spare_part.reorder_point(demand_std_dev=15, service_level=0.95)
print(f"\nReorder Point:")
print(f" ROP: {rop['reorder_point']:.0f} units")
print(f" Safety Stock: {rop['safety_stock']:.0f} units")
print(f" Service Level: {rop['service_level']:.0f}%")
ABC Classification example
ABC Classification example
parts_data = pd.DataFrame({
'part_id': [f'PART-{i:03d}' for i in range(1, 21)],
'annual_usage': np.random.randint(10, 500, 20),
'unit_cost': np.random.uniform(50, 2000, 20)
})
abc_classification = spare_part.abc_classification(parts_data)
print(f"\nABC Classification:")
print(abc_classification[['part_id', 'annual_value', 'cumulative_pct', 'abc_class']].head(10))
print(f"\nABC Summary:")
print(abc_classification.groupby('abc_class').agg({
'part_id': 'count',
'annual_value': 'sum'
}))
---parts_data = pd.DataFrame({
'part_id': [f'PART-{i:03d}' for i in range(1, 21)],
'annual_usage': np.random.randint(10, 500, 20),
'unit_cost': np.random.uniform(50, 2000, 20)
})
abc_classification = spare_part.abc_classification(parts_data)
print(f"\nABC Classification:")
print(abc_classification[['part_id', 'annual_value', 'cumulative_pct', 'abc_class']].head(10))
print(f"\nABC Summary:")
print(abc_classification.groupby('abc_class').agg({
'part_id': 'count',
'annual_value': 'sum'
}))
---Tools & Libraries
工具与库
Python Libraries
Python库
Reliability & Statistics:
- ,
numpy: Data manipulationpandas - : Statistical distributions (Weibull, exponential)
scipy.stats - : Survival analysis and reliability
lifelines - : Reliability engineering library
reliability
Visualization:
- ,
matplotlib,seaborn: Charts and dashboardsplotly
Machine Learning (for PdM):
- : Anomaly detection, classification
scikit-learn - ,
tensorflow: Deep learning for predictive modelspytorch - : Time series analysis
tslearn
可靠性与统计:
- ,
numpy: 数据处理pandas - : 统计分布(威布尔分布、指数分布)
scipy.stats - : 生存分析与可靠性分析
lifelines - : 可靠性工程专用库
reliability
可视化:
- ,
matplotlib,seaborn: 图表与仪表盘plotly
机器学习(用于预测性维护):
- : 异常检测、分类
scikit-learn - ,
tensorflow: 深度学习预测模型pytorch - : 时间序列分析
tslearn
Commercial Maintenance Software
商用维护软件
CMMS (Computerized Maintenance Management):
- SAP PM: Plant maintenance module
- IBM Maximo: Enterprise asset management
- Oracle EAM: Enterprise asset management
- Infor EAM: Maintenance management
- eMaint: Cloud-based CMMS
- Fiix: Modern CMMS platform
Predictive Maintenance:
- GE Predix: Industrial IoT and PdM platform
- Uptake: Predictive maintenance software
- C3 AI: AI-based predictive maintenance
- Azure IoT: Microsoft PdM solutions
- AWS IoT: Amazon predictive maintenance
Reliability Software:
- Reliasoft: Reliability analysis tools
- ARMS Reliability: RAM (Reliability, Availability, Maintainability)
- Item Software: Reliability engineering
CMMS(计算机化维护管理系统):
- SAP PM: 工厂维护模块
- IBM Maximo: 企业资产管理系统
- Oracle EAM: 企业资产管理系统
- Infor EAM: 维护管理系统
- eMaint: 云原生CMMS
- Fiix: 现代化CMMS平台
预测性维护:
- GE Predix: 工业物联网与预测性维护平台
- Uptake: 预测性维护软件
- C3 AI: 基于AI的预测性维护
- Azure IoT: 微软预测性维护解决方案
- AWS IoT: 亚马逊预测性维护
可靠性软件:
- Reliasoft: 可靠性分析工具
- ARMS Reliability: RAM(可靠性、可用性、可维护性)分析
- Item Software: 可靠性工程工具
Common Challenges & Solutions
常见挑战与解决方案
Challenge: Lack of Failure Data
挑战:故障数据缺失
Problem:
- New equipment, no history
- Poor record-keeping
- Insufficient data for analysis
Solutions:
- Start collecting data immediately (even manual logs)
- Use manufacturer reliability data as baseline
- Implement CMMS to track all maintenance activities
- Benchmark with similar equipment or industry data
- Use conservative assumptions until data accumulates
问题:
- 新设备无历史数据
- 记录保存不完善
- 分析数据不足
解决方案:
- 立即开始收集数据(即使是手动记录)
- 以制造商提供的可靠性数据为基线
- 部署CMMS跟踪所有维护活动
- 与同类设备或行业数据对标
- 在数据积累前采用保守假设
Challenge: Balancing PM vs. PdM Investment
挑战:平衡PM与PdM投入
Problem:
- PdM requires sensors and technology investment
- Not all equipment justifies PdM cost
- Where to start?
Solutions:
- Use criticality analysis (RCM approach)
- Start PdM on most critical/expensive assets
- ROI analysis: compare cost of sensors vs. cost of failures
- Hybrid approach: PdM for critical, PM for others
- Pilot program on 1-2 assets first
问题:
- PdM需要传感器与技术投入
- 并非所有设备都值得投入PdM成本
- 不知从何入手
解决方案:
- 采用关键性分析(RCM方法)
- 先对最关键/高价值资产实施PdM
- ROI分析:对比传感器成本与故障成本
- 混合策略:关键设备用PdM,其他用PM
- 先在1-2台设备上开展试点
Challenge: Over-Maintenance
挑战:过度维护
Problem:
- Too frequent PM wastes resources
- "If it ain't broke, don't fix it" mentality
- PM intervals too conservative
Solutions:
- Optimize PM intervals using cost models
- Transition to condition-based maintenance
- Track PM effectiveness (failures after PM?)
- Use Weibull analysis to understand failure patterns
- Pilot extended PM intervals on non-critical equipment
问题:
- PM过于频繁浪费资源
- 秉持「没坏就不修」的心态
- PM间隔过于保守
解决方案:
- 用成本模型优化PM间隔
- 向基于状态的维护转型
- 跟踪PM有效性(PM后是否仍出现故障?)
- 用威布尔分析理解故障模式
- 在非关键设备上试点延长PM间隔
Challenge: Emergency/Reactive Maintenance Dominance
挑战:应急/被动维修占比过高
Problem:
- Always fighting fires
- No time for planned maintenance
- High costs and poor availability
Solutions:
- Dedicate resources to preventive work (separate teams if needed)
- Schedule PM during planned downtime
- Root cause analysis on repeat failures
- Build maintenance backlog and prioritize
- Track reactive vs. planned maintenance ratio (target <20% reactive)
问题:
- 始终忙于「救火」
- 无时间开展计划内维护
- 成本高、可用性差
解决方案:
- 专门分配资源用于预防性工作(必要时组建独立团队)
- 在计划停机时间内安排PM
- 对重复故障进行根本原因分析
- 建立维护积压任务并排序
- 跟踪被动维修与计划维修的比例(目标:被动维修占比<20%)
Challenge: Spare Parts Inventory Issues
挑战:备件库存问题
Problem:
- Stockouts of critical parts cause long downtime
- Excess inventory ties up capital
- Obsolete parts accumulate
Solutions:
- ABC classification (focus on high-value parts)
- Calculate optimal stock levels (ROP, safety stock)
- Vendor partnerships for critical parts
- Consignment inventory for slow-movers
- Regular inventory audits and obsolescence review
问题:
- 关键备件缺货导致长时间停机
- 库存过剩占用资金
- 积压过时备件
解决方案:
- ABC分类(重点关注高价值备件)
- 计算最优库存水平(ROP、安全库存)
- 与供应商建立关键备件合作关系
- 对慢周转备件采用寄售库存模式
- 定期进行库存审计与过时备件清理
Output Format
输出格式
Maintenance Plan Report
维护计划报告
Executive Summary:
- Current maintenance strategy and performance
- Key reliability metrics (MTBF, MTTR, availability)
- Improvement opportunities and priorities
- Expected benefits
Equipment Reliability Analysis:
| Equipment | MTBF (hours) | MTTR (hours) | Availability | Failures/Year | Criticality |
|---|---|---|---|---|---|
| Pump-01 | 720 | 8 | 98.9% | 12 | High |
| Conveyor-02 | 1,440 | 4 | 99.7% | 6 | Medium |
| Press-03 | 480 | 16 | 96.7% | 18 | Critical |
| CNC-04 | 960 | 12 | 98.7% | 9 | High |
Maintenance Strategy Recommendations:
| Equipment | Current Strategy | Recommended Strategy | Rationale |
|---|---|---|---|
| Press-03 | Reactive | Predictive Maintenance | Critical asset, high failure cost |
| Pump-01 | PM (monthly) | PM (optimized to 6 weeks) | Over-maintained, can extend interval |
| Conveyor-02 | PM | Continue PM | Appropriate for criticality |
| CNC-04 | PM | Condition-Based Monitoring | Good candidate for CBM pilot |
Preventive Maintenance Schedule:
- Weekly: Lubrication, inspections (non-critical)
- Monthly: PM routines for critical equipment
- Quarterly: Major inspections and overhauls
- Annual: Compliance inspections and certifications
Predictive Maintenance Implementation:
- Phase 1 (Q1): Install vibration sensors on critical rotating equipment
- Phase 2 (Q2): Implement condition monitoring dashboard
- Phase 3 (Q3): Develop RUL models and alerts
- Phase 4 (Q4): Integrate with CMMS and production scheduling
Expected Benefits:
- Unplanned downtime reduction: 40-50%
- Maintenance cost reduction: 15-25%
- Equipment availability improvement: +2-3%
- Spare parts inventory reduction: 20%
- Equipment life extension: 10-15%
执行摘要:
- 当前维护策略与绩效
- 关键可靠性指标(MTBF、MTTR、可用性)
- 改进机会与优先级
- 预期收益
设备可靠性分析:
| 设备 | MTBF(小时) | MTTR(小时) | 可用性 | 年故障次数 | 关键性 |
|---|---|---|---|---|---|
| Pump-01 | 720 | 8 | 98.9% | 12 | 高 |
| Conveyor-02 | 1,440 | 4 | 99.7% | 6 | 中 |
| Press-03 | 480 | 16 | 96.7% | 18 | 关键 |
| CNC-04 | 960 | 12 | 98.7% | 9 | 高 |
维护策略建议:
| 设备 | 当前策略 | 推荐策略 | 理由 |
|---|---|---|---|
| Press-03 | 被动维修 | 预测性维护 | 关键资产,故障成本高 |
| Pump-01 | PM(月度) | PM(优化为6周) | 过度维护,可延长间隔 |
| Conveyor-02 | PM | 继续执行PM | 符合设备关键性 |
| CNC-04 | PM | 基于状态的监测 | 适合作为CBM试点 |
预防性维护计划:
- 每周:润滑、检查(非关键设备)
- 每月:关键设备PM例行工作
- 每季度:大型检查与检修
- 每年:合规检查与认证
预测性维护实施计划:
- 第一阶段(Q1):在关键旋转设备上安装振动传感器
- 第二阶段(Q2):部署状态监测仪表盘
- 第三阶段(Q3):开发RUL模型与告警系统
- 第四阶段(Q4):与CMMS及生产调度系统集成
预期收益:
- 非计划停机时间减少:40-50%
- 维护成本降低:15-25%
- 设备可用性提升:+2-3%
- 备件库存减少:20%
- 设备使用寿命延长:10-15%
Questions to Ask
待确认问题
If you need more context:
- What equipment or assets need maintenance planning?
- What are current failure rates and downtime levels?
- What is the current maintenance approach? (reactive, preventive, predictive)
- What are the critical assets that impact production most?
- What is the cost of downtime vs. cost of maintenance?
- Is condition monitoring equipment available?
- Is a CMMS in place? What data is tracked?
- What are the maintenance resource constraints?
如需更多背景信息,请询问:
- 需要进行维护规划的设备或资产有哪些?
- 当前的故障率与停机时间水平如何?
- 当前采用的维护方式是什么?(被动维修、预防性维护、预测性维护)
- 哪些关键资产对生产影响最大?
- 停机成本与维护成本分别是多少?
- 是否已部署状态监测设备?
- 是否使用CMMS?当前跟踪哪些数据?
- 维护资源存在哪些约束?
Related Skills
相关技能
- lean-manufacturing: For TPM and waste elimination
- quality-management: For quality impact of equipment reliability
- production-scheduling: For maintenance scheduling integration
- process-optimization: For overall equipment optimization
- prescriptive-analytics: For predictive maintenance analytics
- supply-chain-analytics: For maintenance KPIs and dashboards
- capacity-planning: For capacity impact of maintenance
- risk-mitigation: For equipment failure risk assessment
- lean-manufacturing: TPM与浪费消除相关
- quality-management: 设备可靠性对质量的影响相关
- production-scheduling: 维护计划与生产调度集成相关
- process-optimization: 设备整体优化相关
- prescriptive-analytics: 预测性维护分析相关
- supply-chain-analytics: 维护KPI与仪表盘相关
- capacity-planning: 维护对产能的影响相关
- risk-mitigation: 设备故障风险评估相关