Skip to content

Data Pipeline Specification

Strategic Decision: This implementation follows ADR-007: Idempotent Data Pipeline.

End-to-end data flow for high-volume CSV and complex XLSX templates, utilizing event-time watermarking for performance.


Pipeline Overview

graph TB
    subgraph "Ingestion Layer"
        Excel[Excel Upload]
        Manual[Manual Entry]
        API[API Import]
    end

    subgraph "ETL Pipeline"
        Extract[Extract]
        Transform[Transform]
        Load[Load]

        Extract --> Parse[Parse Excel Structure]
        Parse --> Map[Schema Mapping]
        Map --> Validate[Data Validation]
        Validate --> Enrich[Data Enrichment]
        Enrich --> Calculate[Calculate Derived Metrics]
        Calculate --> Load
    end

    subgraph "Storage Layer"
        Staging[(Staging Tables)]
        Production[(Production Tables)]
        Metrics[(Metrics Cache)]
    end

    subgraph "Consumption Layer"
        API_Out[REST API]
        Dashboard[Dashboard]
        Reports[Reports]
    end

    Excel --> Extract
    Manual --> Transform
    API --> Transform

    Load --> Staging
    Staging --> Production
    Production --> Metrics

    Metrics --> API_Out
    Metrics --> Dashboard
    Production --> Reports

    style ETL fill:#fff3e0
    style Metrics fill:#fce4ec,stroke:#e91e63,stroke-width:2px

Excel Structure Analysis

From your uploaded images, the Excel contains:

Sheet 1: Material & Conversion

Sections: 1. Part Identification (Rows 1-5) - Part no, Drawing No - Time period columns (SOP, 12M, 24M, 36M)

  1. R.M SPECIFICATION (Rows 6-20)
  2. Material grade (SAE4140)
  3. Dimensions (Diameter, Length, Gap, Thread)
  4. Weights (Gross, Net, Scrap)
  5. Costs (RM Rate, Raw Material Cost)

  6. CONVERSION (Rows 21-40)

  7. Operations: Cutting, Sizing, Turning, Threading, Bending, Marking
  8. Outside processes: Heat treatment, Gauging, Coating

  9. SUMMARY (Rows 41-45)

  10. Raw Material + Conversion total
  11. Rejection %, QCC %, Overhead %, Profit %

Sheet 2: Detailed Operations

For each operation: - Total Distance - Cutting Speed - Feed / Rev - RPM - Feed /mm - Cut Time - Setup Time - Loading / Unloading - Total Time - Efficiency Factor (EFFI. AT 80%) - Machine Hour Rate (MHR) - Cost

Sheet 3: Tooling

  • Tool name and cost
  • Tool life (pieces)
  • Cost per piece calculation

Excel Parser Implementation

Detection & Identification

flowchart TD
    Start[Excel File Uploaded] --> Read[Read All Sheets]
    Read --> Detect{Detect Sheet Type}

    Detect -->|Has RM SPECIFICATION| Material[Material Sheet]
    Detect -->|Has Operation Details| Ops[Operations Sheet]
    Detect -->|Has Tooling table| Tool[Tooling Sheet]
    Detect -->|Has Packing section| Pack[Packing Sheet]

    Material --> MapM[Map Material Fields]
    Ops --> MapO[Map Operation Fields]
    Tool --> MapT[Map Tooling Fields]
    Pack --> MapP[Map Packing Fields]

    MapM --> Merge[Merge All Data]
    MapO --> Merge
    MapT --> Merge
    MapP --> Merge

    Merge --> Validate[Run Validations]
    Validate --> Store[Store in Database]

    style Detect fill:#fff3e0
    style Merge fill:#e3f2fd
    style Store fill:#c8e6c9

Field Mapping Configuration

# field_mappings.yaml

material_section:
  source_row_start: 6
  source_row_end: 20
  fields:
    - source: "R.M SPECIFICATION"
      target: "material_grade"
      cell: "B7"

   -  source: "Diameter (mm)"
      target: "diameter_mm"
      cell: "B8"
      type: decimal

    - source: "Length (mm)"
      target: "length_mm"
      cell: "B9"
      type: decimal

    - source: "Gross Weight (Kg)"
      target: "gross_weight_kg"
      cell: "D13"
      type: decimal

    - source: "Raw Material Rate (Rs./kg)"
      target: "rm_rate_per_kg"
      cell: "D14"
      type: decimal

    - source: "Final Raw Material Cost (Rs.)"
      target: "raw_material_cost"
      cell: "D19"
      type: decimal
      calculated: true

conversion_section:
  source_row_start: 21
  source_row_end: 40
  operations:
    - name: "Cutting"
      cost_cell: "D22"
    - name: "Sizing & Chemfhering"
      cost_cell: "D23"
    - name: "Turning"
      cost_cell: "D24"
    # ... etc

operation_details:
  for_each_operation:
    fields:
      - source: "Cutting speed"
        target: "cutting_speed"
      - source: "Feed / Rev."
        target: "feed_per_rev"
      - source: "Cut Time"
        target: "cut_time_hrs"
      - source: "Setup"
        target: "setup_time_hrs"
      - source: "EFFI. AT 80%"
        target: "efficiency_factor"
      - source: "Cost"
        target: "operation_cost"

ETL Process Details

1. Extract Phase

# Pseudo-code for Excel extraction

class ExcelExtractor:
    def extract(self, file_path):
        workbook = load_workbook(file_path)

        data = {
            'part_info': self.extract_part_info(workbook),
            'material': self.extract_material_section(workbook),
            'operations': self.extract_operations(workbook),
            'tooling': self.extract_tooling(workbook),
            'packing': self.extract_packing(workbook)
        }

        return data

    def extract_material_section(self, wb):
        sheet = wb['Material & Conversion']

        # Use named cells or cell references from config
        return {
            'material_grade': sheet['D7'].value,  # SAE4140
            'diameter_mm': sheet['D8'].value,      # 21
            'length_mm': sheet['D9'].value,        # 297
            'gross_weight': sheet['D13'].value,    # 2.304
            'rm_rate': sheet['D14'].value,         # 99.5
            # ... extract all fields
        }

2. Transform Phase

sequenceDiagram
    participant E as Extracted Data
    participant T as Transformer
    participant V as Validator
    participant C as Calculator
    participant M as Mapper

    E->>T: Raw Excel data
    T->>T: Normalize units
    T->>T: Clean data types
    T->>V: Validate data

    alt Validation Fails
        V->>E: Return errors
    else Validation Passes
        V->>C: Continue
    end

    C->>C: Calculate derived fields
    C->>C: Apply formulas
    C->>M: Map to schema
    M->>M: Create entities
    M-->>T: Transformed data

Transformation Rules

Raw Field Transformation Target Field
"Diameter (mm)" Parse decimal diameter_mm
"SAE4140" Lookup material_id material_id (UUID)
"Cutting" Lookup operation_id operation_id (UUID)
"0.87" (hrs) Convert to minutes setup_time_minutes
"EFFI. AT 80%" Parse percentage efficiency_percent = 0.80
"YOY Reduction" Create time dimension time_period_id

3. Load Phase

flowchart LR
    Transform[Transformed Data] --> Staging[Staging Tables]
    Staging --> Validate{Validate<br/>Business Rules}

    Validate -->|Pass| Production[Production Tables]
    Validate -->|Fail| Error[Error Log]

    Production --> UpdateMetrics[Update Metrics Cache]
    UpdateMetrics --> Notify[Notify Completion]

    Error --> Alert[Alert User]

    style Production fill:#c8e6c9
    style Error fill:#ffebee

Data Quality & Validation

Validation Rules

graph TD
    Input[Input Data] --> V1{Material Grade<br/>Exists?}
    V1 -->|No| E1[Error: Unknown Material]
    V1 -->|Yes| V2{Gross Weight<br/>> Net Weight?}

    V2 -->|No| E2[Error: Invalid Weights]
    V2 -->|Yes| V3{Yield %<br/>in range?}

    V3 -->|No| E3[Warning: Unusual Yield]
    V3 -->|Yes| V4{Operation Times<br/>Reasonable?}

    V4 -->|No| E4[Warning: Check Times]
    V4 -->|Yes| V5{Costs<br/>Calculated?}

    V5 -->|No| E5[Error: Missing Costs]
    V5 -->|Yes| Pass[✓ Validation Passed]

    style Pass fill:#c8e6c9
    style E1 fill:#ffebee
    style E2 fill:#ffebee
    style E5 fill:#ffebee

Validation Checks

Check Type Rule Severity Action
Required Fields Material grade not null Error Block import
Data Type Weights are numeric Error Block import
Business Logic Gross > Net weight Error Block import
Range Check 50% < Yield < 100% Warning Allow with flag
Referential Integrity Material exists in master Error Block import
Formula Validation Recalculate and compare Warning Show difference
Duplicate Check Same part + period Warning Overwrite/Version
Watermark Check source_hash unchanged Skip Fast-exit (No re-calc)

Watermarking & Incremental Loads

To ensure high performance for multi-megabyte Excel files, the pipeline implements an event-time watermarking strategy.

1. Data Integrity Watermarks

The system tracks three critical markers for every part: * source_fingerprint: An MD5 hash of the Excel sheet content. * ingested_at: System wall-clock time of physical file arrival. * calculation_waterline: The specific master_version_id of the Raw Material rates used for the last calculation.

2. Idempotency Flow

flowchart LR
    A[Upload] --> B{Hash Change?}
    B -->|No| C[Skip Parsing - Return Cache]
    B -->|Yes| D[Trigger Incremental Sync]

    D --> E{Rate Changed?}
    E -->|No| F[Process Geometry Only]
    E -->|Yes| G[Full Recalculation]

    style C fill:#c8e6c9
    style G fill:#ffebee


Incremental Updates

Change Detection

stateDiagram-v2
    [*] --> CheckExisting
    CheckExisting --> NewPart: No existing data
    CheckExisting --> ExistingPart: Data found

    NewPart --> InsertNew
    ExistingPart --> CompareValues

    CompareValues --> NoChange: Same values
    CompareValues --> HasChanges: Values differ

    NoChange --> [*]
    HasChanges --> CreateVersion
    InsertNew --> [*]
    CreateVersion --> [*]

Strategy: 1. Hash the input data 2. Compare with existing hash 3. If different → create new version 4. If same → skip processing


Batch vs Real-Time Processing

gantt
    title Excel Import Batch Pipeline
    dateFormat  HH:mm

    section Upload
    Upload Excel           :upload, 00:00, 1m

    section Parse
    Extract Sheets         :extract, after upload, 2m
    Validate Structure     :validate, after extract, 1m

    section Transform
    Map Fields             :map, after validate, 3m
    Calculate Metrics      :calc, after map, 5m

    section Load
    Stage Data             :stage, after calc, 2m
    Load Production        :load, after stage, 3m
    Update Cache           :cache, after load, 2m

    section Notify
    Send Notification      :notify, after cache, 1m

Total time: ~20 minutes for complex part with 10+ operations


Error Handling & Recovery

Error Taxonomy

graph TB
    Error[Import Error] --> Type{Error Type}

    Type -->|Parsing| P1[Invalid Excel Format]
    Type -->|Validation| V1[Data Quality Issue]
    Type -->|Calculation| C1[Formula Error]
    Type -->|System| S1[Database Error]

    P1 --> Action1[Return detailed parsing error]
    V1 --> Action2[Show validation report]
    C1 --> Action3[Highlight failed calculations]
    S1 --> Action4[Retry + alert admin]

    Action1 --> Log[Log Error]
    Action2 --> Log
    Action3 --> Log
    Action4 --> Log

    Log --> Notify[Notify User]

    style Error fill:#ffebee
    style Log fill:#fff3e0

Partial Success Handling

  • Scenario: 10 parts uploaded, 2 fail validation
  • Behavior:
  • Import 8 successful parts
  • Return error report for 2 failed
  • Allow user to fix and re-upload failures

Performance Optimization

Strategies

Optimization Approach Benefit
Parallel Processing Process multiple parts concurrently 3-5x speedup
Bulk Insert Batch database writes 10x faster
Caching Cache material/operation masters Avoid lookups
Lazy Calculation Calculate metrics on-demand Faster import
Async Processing Background job for large imports Non-blocking UI

Pipeline Performance

graph LR
    S1[Single-threaded] -->|20 min| R1[20 parts/hour]
    S2[Parallel Processing] -->|4 min| R2[100 parts/hour]
    S3[+ Bulk Insert] -->|2 min| R3[200 parts/hour]
    S4[+ Caching] -->|1 min| R4[400 parts/hour]

    style R4 fill:#c8e6c9

Monitoring & Observability

Pipeline Metrics

graph TD
    subgraph "Key Metrics"
        M1[Import Success Rate]
        M2[Average Processing Time]
        M3[Error Rate by Type]
        M4[Data Quality Score]
    end

    subgraph "Alerting"
        A1[Error Rate > 10%]
        A2[Processing Time > 30min]
        A3[Validation Failures]
    end

    M1 --> Dashboard
    M2 --> Dashboard
    M3 --> A1
    M3 --> A3
    M2 --> A2
    M4 --> Dashboard

    A1 --> Alert[Send Alert]
    A2 --> Alert
    A3 --> Alert

API Endpoints for Pipeline

Import API

POST /api/v1/import/excel
Content-Type: multipart/form-data

file: <excel-file>
options: {
  "validate_only": false,
  "create_version": true,
  "overwrite_existing": false
}

Response:
{
  "job_id": "uuid",
  "status": "processing",
  "estimated_time_mins": 15
}

GET /api/v1/import/jobs/{job_id}

Response:
{
  "job_id": "uuid",
  "status": "completed",
  "parts_imported": 10,
  "parts_failed": 2,
  "errors": [...],
  "processing_time_secs": 245
}

Future Enhancements

  1. ML-based Field Detection - Auto-detect Excel structure
  2. Smart Defaults - Suggest missing values based on similar parts
  3. Version Diffing - Visual diff between versions
  4. Template Validation - Validate against approved templates
  5. Real-time Collaboration - Multiple users editing simultaneously

← Semantic Layer | Back to Architecture