Data Pipeline Specification¶
Strategic Decision: This implementation follows ADR-007: Idempotent Data Pipeline.
End-to-end data flow for high-volume CSV and complex XLSX templates, utilizing event-time watermarking for performance.
Pipeline Overview¶
graph TB
subgraph "Ingestion Layer"
Excel[Excel Upload]
Manual[Manual Entry]
API[API Import]
end
subgraph "ETL Pipeline"
Extract[Extract]
Transform[Transform]
Load[Load]
Extract --> Parse[Parse Excel Structure]
Parse --> Map[Schema Mapping]
Map --> Validate[Data Validation]
Validate --> Enrich[Data Enrichment]
Enrich --> Calculate[Calculate Derived Metrics]
Calculate --> Load
end
subgraph "Storage Layer"
Staging[(Staging Tables)]
Production[(Production Tables)]
Metrics[(Metrics Cache)]
end
subgraph "Consumption Layer"
API_Out[REST API]
Dashboard[Dashboard]
Reports[Reports]
end
Excel --> Extract
Manual --> Transform
API --> Transform
Load --> Staging
Staging --> Production
Production --> Metrics
Metrics --> API_Out
Metrics --> Dashboard
Production --> Reports
style ETL fill:#fff3e0
style Metrics fill:#fce4ec,stroke:#e91e63,stroke-width:2px
Excel Structure Analysis¶
From your uploaded images, the Excel contains:
Sheet 1: Material & Conversion¶
Sections: 1. Part Identification (Rows 1-5) - Part no, Drawing No - Time period columns (SOP, 12M, 24M, 36M)
- R.M SPECIFICATION (Rows 6-20)
- Material grade (SAE4140)
- Dimensions (Diameter, Length, Gap, Thread)
- Weights (Gross, Net, Scrap)
-
Costs (RM Rate, Raw Material Cost)
-
CONVERSION (Rows 21-40)
- Operations: Cutting, Sizing, Turning, Threading, Bending, Marking
-
Outside processes: Heat treatment, Gauging, Coating
-
SUMMARY (Rows 41-45)
- Raw Material + Conversion total
- Rejection %, QCC %, Overhead %, Profit %
Sheet 2: Detailed Operations¶
For each operation: - Total Distance - Cutting Speed - Feed / Rev - RPM - Feed /mm - Cut Time - Setup Time - Loading / Unloading - Total Time - Efficiency Factor (EFFI. AT 80%) - Machine Hour Rate (MHR) - Cost
Sheet 3: Tooling¶
- Tool name and cost
- Tool life (pieces)
- Cost per piece calculation
Excel Parser Implementation¶
Detection & Identification¶
flowchart TD
Start[Excel File Uploaded] --> Read[Read All Sheets]
Read --> Detect{Detect Sheet Type}
Detect -->|Has RM SPECIFICATION| Material[Material Sheet]
Detect -->|Has Operation Details| Ops[Operations Sheet]
Detect -->|Has Tooling table| Tool[Tooling Sheet]
Detect -->|Has Packing section| Pack[Packing Sheet]
Material --> MapM[Map Material Fields]
Ops --> MapO[Map Operation Fields]
Tool --> MapT[Map Tooling Fields]
Pack --> MapP[Map Packing Fields]
MapM --> Merge[Merge All Data]
MapO --> Merge
MapT --> Merge
MapP --> Merge
Merge --> Validate[Run Validations]
Validate --> Store[Store in Database]
style Detect fill:#fff3e0
style Merge fill:#e3f2fd
style Store fill:#c8e6c9
Field Mapping Configuration¶
# field_mappings.yaml
material_section:
source_row_start: 6
source_row_end: 20
fields:
- source: "R.M SPECIFICATION"
target: "material_grade"
cell: "B7"
- source: "Diameter (mm)"
target: "diameter_mm"
cell: "B8"
type: decimal
- source: "Length (mm)"
target: "length_mm"
cell: "B9"
type: decimal
- source: "Gross Weight (Kg)"
target: "gross_weight_kg"
cell: "D13"
type: decimal
- source: "Raw Material Rate (Rs./kg)"
target: "rm_rate_per_kg"
cell: "D14"
type: decimal
- source: "Final Raw Material Cost (Rs.)"
target: "raw_material_cost"
cell: "D19"
type: decimal
calculated: true
conversion_section:
source_row_start: 21
source_row_end: 40
operations:
- name: "Cutting"
cost_cell: "D22"
- name: "Sizing & Chemfhering"
cost_cell: "D23"
- name: "Turning"
cost_cell: "D24"
# ... etc
operation_details:
for_each_operation:
fields:
- source: "Cutting speed"
target: "cutting_speed"
- source: "Feed / Rev."
target: "feed_per_rev"
- source: "Cut Time"
target: "cut_time_hrs"
- source: "Setup"
target: "setup_time_hrs"
- source: "EFFI. AT 80%"
target: "efficiency_factor"
- source: "Cost"
target: "operation_cost"
ETL Process Details¶
1. Extract Phase¶
# Pseudo-code for Excel extraction
class ExcelExtractor:
def extract(self, file_path):
workbook = load_workbook(file_path)
data = {
'part_info': self.extract_part_info(workbook),
'material': self.extract_material_section(workbook),
'operations': self.extract_operations(workbook),
'tooling': self.extract_tooling(workbook),
'packing': self.extract_packing(workbook)
}
return data
def extract_material_section(self, wb):
sheet = wb['Material & Conversion']
# Use named cells or cell references from config
return {
'material_grade': sheet['D7'].value, # SAE4140
'diameter_mm': sheet['D8'].value, # 21
'length_mm': sheet['D9'].value, # 297
'gross_weight': sheet['D13'].value, # 2.304
'rm_rate': sheet['D14'].value, # 99.5
# ... extract all fields
}
2. Transform Phase¶
sequenceDiagram
participant E as Extracted Data
participant T as Transformer
participant V as Validator
participant C as Calculator
participant M as Mapper
E->>T: Raw Excel data
T->>T: Normalize units
T->>T: Clean data types
T->>V: Validate data
alt Validation Fails
V->>E: Return errors
else Validation Passes
V->>C: Continue
end
C->>C: Calculate derived fields
C->>C: Apply formulas
C->>M: Map to schema
M->>M: Create entities
M-->>T: Transformed data
Transformation Rules¶
| Raw Field | Transformation | Target Field |
|---|---|---|
| "Diameter (mm)" | Parse decimal | diameter_mm |
| "SAE4140" | Lookup material_id | material_id (UUID) |
| "Cutting" | Lookup operation_id | operation_id (UUID) |
| "0.87" (hrs) | Convert to minutes | setup_time_minutes |
| "EFFI. AT 80%" | Parse percentage | efficiency_percent = 0.80 |
| "YOY Reduction" | Create time dimension | time_period_id |
3. Load Phase¶
flowchart LR
Transform[Transformed Data] --> Staging[Staging Tables]
Staging --> Validate{Validate<br/>Business Rules}
Validate -->|Pass| Production[Production Tables]
Validate -->|Fail| Error[Error Log]
Production --> UpdateMetrics[Update Metrics Cache]
UpdateMetrics --> Notify[Notify Completion]
Error --> Alert[Alert User]
style Production fill:#c8e6c9
style Error fill:#ffebee
Data Quality & Validation¶
Validation Rules¶
graph TD
Input[Input Data] --> V1{Material Grade<br/>Exists?}
V1 -->|No| E1[Error: Unknown Material]
V1 -->|Yes| V2{Gross Weight<br/>> Net Weight?}
V2 -->|No| E2[Error: Invalid Weights]
V2 -->|Yes| V3{Yield %<br/>in range?}
V3 -->|No| E3[Warning: Unusual Yield]
V3 -->|Yes| V4{Operation Times<br/>Reasonable?}
V4 -->|No| E4[Warning: Check Times]
V4 -->|Yes| V5{Costs<br/>Calculated?}
V5 -->|No| E5[Error: Missing Costs]
V5 -->|Yes| Pass[✓ Validation Passed]
style Pass fill:#c8e6c9
style E1 fill:#ffebee
style E2 fill:#ffebee
style E5 fill:#ffebee
Validation Checks¶
| Check Type | Rule | Severity | Action |
|---|---|---|---|
| Required Fields | Material grade not null | Error | Block import |
| Data Type | Weights are numeric | Error | Block import |
| Business Logic | Gross > Net weight | Error | Block import |
| Range Check | 50% < Yield < 100% | Warning | Allow with flag |
| Referential Integrity | Material exists in master | Error | Block import |
| Formula Validation | Recalculate and compare | Warning | Show difference |
| Duplicate Check | Same part + period | Warning | Overwrite/Version |
| Watermark Check | source_hash unchanged |
Skip | Fast-exit (No re-calc) |
Watermarking & Incremental Loads¶
To ensure high performance for multi-megabyte Excel files, the pipeline implements an event-time watermarking strategy.
1. Data Integrity Watermarks¶
The system tracks three critical markers for every part:
* source_fingerprint: An MD5 hash of the Excel sheet content.
* ingested_at: System wall-clock time of physical file arrival.
* calculation_waterline: The specific master_version_id of the Raw Material rates used for the last calculation.
2. Idempotency Flow¶
flowchart LR
A[Upload] --> B{Hash Change?}
B -->|No| C[Skip Parsing - Return Cache]
B -->|Yes| D[Trigger Incremental Sync]
D --> E{Rate Changed?}
E -->|No| F[Process Geometry Only]
E -->|Yes| G[Full Recalculation]
style C fill:#c8e6c9
style G fill:#ffebee
Incremental Updates¶
Change Detection¶
stateDiagram-v2
[*] --> CheckExisting
CheckExisting --> NewPart: No existing data
CheckExisting --> ExistingPart: Data found
NewPart --> InsertNew
ExistingPart --> CompareValues
CompareValues --> NoChange: Same values
CompareValues --> HasChanges: Values differ
NoChange --> [*]
HasChanges --> CreateVersion
InsertNew --> [*]
CreateVersion --> [*]
Strategy: 1. Hash the input data 2. Compare with existing hash 3. If different → create new version 4. If same → skip processing
Batch vs Real-Time Processing¶
Batch Processing (Recommended for Excel Import)¶
gantt
title Excel Import Batch Pipeline
dateFormat HH:mm
section Upload
Upload Excel :upload, 00:00, 1m
section Parse
Extract Sheets :extract, after upload, 2m
Validate Structure :validate, after extract, 1m
section Transform
Map Fields :map, after validate, 3m
Calculate Metrics :calc, after map, 5m
section Load
Stage Data :stage, after calc, 2m
Load Production :load, after stage, 3m
Update Cache :cache, after load, 2m
section Notify
Send Notification :notify, after cache, 1m
Total time: ~20 minutes for complex part with 10+ operations
Error Handling & Recovery¶
Error Taxonomy¶
graph TB
Error[Import Error] --> Type{Error Type}
Type -->|Parsing| P1[Invalid Excel Format]
Type -->|Validation| V1[Data Quality Issue]
Type -->|Calculation| C1[Formula Error]
Type -->|System| S1[Database Error]
P1 --> Action1[Return detailed parsing error]
V1 --> Action2[Show validation report]
C1 --> Action3[Highlight failed calculations]
S1 --> Action4[Retry + alert admin]
Action1 --> Log[Log Error]
Action2 --> Log
Action3 --> Log
Action4 --> Log
Log --> Notify[Notify User]
style Error fill:#ffebee
style Log fill:#fff3e0
Partial Success Handling¶
- Scenario: 10 parts uploaded, 2 fail validation
- Behavior:
- Import 8 successful parts
- Return error report for 2 failed
- Allow user to fix and re-upload failures
Performance Optimization¶
Strategies¶
| Optimization | Approach | Benefit |
|---|---|---|
| Parallel Processing | Process multiple parts concurrently | 3-5x speedup |
| Bulk Insert | Batch database writes | 10x faster |
| Caching | Cache material/operation masters | Avoid lookups |
| Lazy Calculation | Calculate metrics on-demand | Faster import |
| Async Processing | Background job for large imports | Non-blocking UI |
Pipeline Performance¶
graph LR
S1[Single-threaded] -->|20 min| R1[20 parts/hour]
S2[Parallel Processing] -->|4 min| R2[100 parts/hour]
S3[+ Bulk Insert] -->|2 min| R3[200 parts/hour]
S4[+ Caching] -->|1 min| R4[400 parts/hour]
style R4 fill:#c8e6c9
Monitoring & Observability¶
Pipeline Metrics¶
graph TD
subgraph "Key Metrics"
M1[Import Success Rate]
M2[Average Processing Time]
M3[Error Rate by Type]
M4[Data Quality Score]
end
subgraph "Alerting"
A1[Error Rate > 10%]
A2[Processing Time > 30min]
A3[Validation Failures]
end
M1 --> Dashboard
M2 --> Dashboard
M3 --> A1
M3 --> A3
M2 --> A2
M4 --> Dashboard
A1 --> Alert[Send Alert]
A2 --> Alert
A3 --> Alert
API Endpoints for Pipeline¶
Import API¶
POST /api/v1/import/excel
Content-Type: multipart/form-data
file: <excel-file>
options: {
"validate_only": false,
"create_version": true,
"overwrite_existing": false
}
Response:
{
"job_id": "uuid",
"status": "processing",
"estimated_time_mins": 15
}
GET /api/v1/import/jobs/{job_id}
Response:
{
"job_id": "uuid",
"status": "completed",
"parts_imported": 10,
"parts_failed": 2,
"errors": [...],
"processing_time_secs": 245
}
Future Enhancements¶
- ML-based Field Detection - Auto-detect Excel structure
- Smart Defaults - Suggest missing values based on similar parts
- Version Diffing - Visual diff between versions
- Template Validation - Validate against approved templates
- Real-time Collaboration - Multiple users editing simultaneously