🧠 AI Computer Institute
Content is AI-generated for educational purposes. Verify critical information independently. A bharath.ai initiative.

Data Pipelines and ETL: From Raw Data to Insights

📚 APIs & Data Engineering⏱️ 18 min read🎓 Grade 9

Data Pipelines and ETL: From Raw Data to Insights

What is ETL?

ETL stands for Extract, Transform, Load - the three-stage process of building data pipelines:

  • Extract: Retrieve data from source systems (APIs, databases, files)
  • Transform: Clean, validate, and reshape data into useful format
  • Load: Move processed data to target system (warehouse, database)

ETL Pipeline Architecture


Data Sources (APIs, DB, Files) ↓ [EXTRACT] ↓ [TRANSFORM] - Clean, validate, enrich, aggregate ↓ [LOAD] ↓
Data Warehouse / Analytics DB / Data Lake ↓ [ANALYZE] - Reports, Dashboards, ML models

Stage 1: Extract


import requests
import json
import csv
from datetime import datetime

# Extract from REST API
def extract_from_api(api_url, params=None): """Fetch data from API""" try: response = requests.get(api_url, params=params, timeout=10) response.raise_for_status() return response.json() except requests.RequestException as e: print(f"Error extracting from API: {e}") return None

# Extract from CSV file
def extract_from_csv(file_path): """Read CSV file into list of dictionaries""" data = [] try: with open(file_path, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) data = list(reader) except Exception as e: print(f"Error reading CSV: {e}") return data

# Extract from JSON file
def extract_from_json(file_path): """Read JSON file""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error reading JSON: {e}") return []

# Extract from database
from pymongo import MongoClient

def extract_from_mongodb(connection_string, db_name, collection_name, query=None): """Extract from MongoDB""" client = MongoClient(connection_string) db = client[db_name] collection = db[collection_name] return list(collection.find(query or {}))

# Example: Extract Census data from India
census_data = extract_from_api( 'https://api.data.gov.in/census/2021', params={'state': 'Maharashtra', 'limit': 1000}
)

Stage 2: Transform


import pandas as pd
from datetime import datetime
import re

def clean_and_validate_data(data): """Remove duplicates, handle missing values, validate""" # Remove duplicates df = pd.DataFrame(data) df = df.drop_duplicates() # Handle missing values df['email'] = df['email'].fillna('') df['phone'] = df['phone'].fillna('N/A') df['age'] = df['age'].fillna(df['age'].median())  # Fill with median # Remove rows where critical fields are missing df = df.dropna(subset=['name', 'id']) return df.to_dict('records')

def standardize_data(data): """Convert data to consistent format""" for record in data: # Standardize names to title case if 'name' in record: record['name'] = record['name'].title().strip() # Standardize phone to 10 digits (India format) if 'phone' in record: phone = re.sub(r'D', '', record['phone']) record['phone'] = phone[-10:] if len(phone) >= 10 else phone # Standardize email if 'email' in record: record['email'] = record['email'].lower().strip() # Convert dates if 'date_of_birth' in record: try: record['date_of_birth'] = datetime.strptime( record['date_of_birth'], '%d-%m-%Y' ).isoformat() except: record['date_of_birth'] = None return data

def enrich_data(data): """Add calculated fields""" for record in data: # Calculate age from date of birth if 'date_of_birth' in record and record['date_of_birth']: try: dob = datetime.fromisoformat(record['date_of_birth']) age = (datetime.now() - dob).days // 365 record['age'] = age except: pass # Add processing timestamp record['processed_at'] = datetime.utcnow().isoformat() # Add data quality score score = 0 required_fields = ['name', 'email', 'phone', 'age'] for field in required_fields: if field in record and record[field]: score += 25 record['data_quality_score'] = score return data

def validate_data(data): """Check data meets requirements, return valid and invalid""" valid = [] invalid = [] for record in data: errors = [] # Validate email format if 'email' in record: if record['email'] and not re.match(r'^[w.-]+@[w.-]+.w+$', record['email']): errors.append('Invalid email') # Validate age if 'age' in record: try: age = int(record['age']) if age < 0 or age > 150: errors.append('Age out of range') except: errors.append('Age not numeric') if errors: record['validation_errors'] = errors invalid.append(record) else: valid.append(record) return valid, invalid

# Complete transform example
def transform_pipeline(raw_data): """Execute complete transformation""" print("Starting transformation...") # Step 1: Clean cleaned = clean_and_validate_data(raw_data) print(f"Cleaned data: {len(cleaned)} records") # Step 2: Standardize standardized = standardize_data(cleaned) # Step 3: Enrich enriched = enrich_data(standardized) # Step 4: Validate valid, invalid = validate_data(enriched) print(f"Valid: {len(valid)}, Invalid: {len(invalid)}") return valid, invalid

Stage 3: Load


def load_to_csv(data, file_path): """Save to CSV file""" if not data: return df = pd.DataFrame(data) df.to_csv(file_path, index=False, encoding='utf-8') print(f"Loaded {len(data)} records to {file_path}")

def load_to_json(data, file_path): """Save to JSON file""" with open(file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"Loaded {len(data)} records to {file_path}")

def load_to_mongodb(data, connection_string, db_name, collection_name): """Save to MongoDB""" client = MongoClient(connection_string) db = client[db_name] collection = db[collection_name] if data: # Use insert_many for batch insert result = collection.insert_many(data) print(f"Loaded {len(result.inserted_ids)} records to MongoDB")

def load_to_postgres(data, connection_string, table_name): """Save to PostgreSQL""" import psycopg2 from io import StringIO conn = psycopg2.connect(connection_string) cursor = conn.cursor() # Convert to CSV-like format for COPY df = pd.DataFrame(data) # Create table if not exists (simplified) cursor.execute(f"TRUNCATE TABLE {table_name}") # Insert data for _, row in df.iterrows(): cursor.execute( f"INSERT INTO {table_name} VALUES (%s, %s, %s, ...)", tuple(row) ) conn.commit() cursor.close() conn.close() print(f"Loaded {len(data)} records to PostgreSQL")

Real Example: Election Results Pipeline (India)


class ElectionResultsPipeline: """ETL pipeline for Indian election results""" def __init__(self): self.raw_data = [] self.cleaned_data = [] self.results_db = 'election_results' def extract(self, election_commission_url): """Extract from Election Commission of India API""" print("Extracting election data...") # Fetch data from ECI response = requests.get(election_commission_url) self.raw_data = response.json()['results'] print(f"Extracted {len(self.raw_data)} constituencies") return self.raw_data def transform(self): """Transform raw election data""" print("Transforming election data...") transformed = [] for constituency in self.raw_data: # Extract and standardize record = { 'constituency_id': constituency['code'], 'constituency_name': constituency['name'].title(), 'state': constituency['state'], 'total_voters': int(constituency['total_voters']), 'voter_turnout': float(constituency['turnout_percent']), 'results': [] } # Process candidate results for candidate in constituency['candidates']: record['results'].append({ 'candidate_name': candidate['name'].title(), 'party': candidate['party'], 'votes': int(candidate['votes']), 'vote_share': float(candidate['vote_share']), 'status': candidate['status']  # Won/Lost/Trailing }) # Sort by votes (winner first) record['results'].sort(key=lambda x: x['votes'], reverse=True) # Add calculated fields total_votes_polled = sum(r['votes'] for r in record['results']) record['total_votes_polled'] = total_votes_polled record['winning_party'] = record['results'][0]['party'] record['winning_margin'] = ( record['results'][0]['votes'] - record['results'][1]['votes'] if len(record['results']) > 1 else 0 ) transformed.append(record) self.cleaned_data = transformed print(f"Transformed {len(transformed)} constituencies") return transformed def load(self, output_file): """Load to database/file""" print("Loading election results...") # Load to JSON with open(output_file, 'w') as f: json.dump(self.cleaned_data, f, indent=2) print(f"Loaded {len(self.cleaned_data)} results to {output_file}") def run_pipeline(self, api_url, output_file): """Execute complete ETL pipeline""" self.extract(api_url) self.transform() self.load(output_file)

# Usage
pipeline = ElectionResultsPipeline()
pipeline.run_pipeline( 'https://api.eci.gov.in/election/2024/results', 'election_results_2024.json'
)

Batch vs Streaming Pipelines


# BATCH PIPELINE - Process all data at once
def batch_pipeline(input_file, output_file, batch_size=1000): """Process data in batches""" data = extract_from_csv(input_file) # Process in batches for i in range(0, len(data), batch_size): batch = data[i:i+batch_size] transformed_batch = transform_pipeline(batch) load_to_csv(transformed_batch, f"{output_file}_part_{i}.csv")

# STREAMING PIPELINE - Process data continuously
from kafka import KafkaConsumer, KafkaProducer
import json

def streaming_pipeline(): """Process data as it arrives""" # Consume from Kafka topic consumer = KafkaConsumer( 'raw_data_topic', bootstrap_servers=['localhost:9092'], value_deserializer=lambda m: json.loads(m.decode('utf-8')) ) # Produce to results topic producer = KafkaProducer( bootstrap_servers=['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8') ) for message in consumer: raw_record = message.value # Transform immediately cleaned = clean_and_validate_data([raw_record])[0] transformed = enrich_data([cleaned])[0] # Load immediately producer.send('results_topic', value=transformed)

Data Quality and Monitoring


class PipelineMonitor: def __init__(self): self.stats = { 'extracted': 0, 'valid': 0, 'invalid': 0, 'loaded': 0, 'errors': [] } def log_extraction(self, count): self.stats['extracted'] = count def log_validation(self, valid_count, invalid_count): self.stats['valid'] = valid_count self.stats['invalid'] = invalid_count def log_error(self, error_msg): self.stats['errors'].append({ 'timestamp': datetime.utcnow().isoformat(), 'message': error_msg }) def log_load(self, count): self.stats['loaded'] = count def get_report(self): success_rate = (self.stats['valid'] / self.stats['extracted'] * 100) if self.stats['extracted'] > 0 else 0 return { 'extracted': self.stats['extracted'], 'processed': self.stats['valid'] + self.stats['invalid'], 'loaded': self.stats['loaded'], 'success_rate': f"{success_rate:.2f}%", 'errors': len(self.stats['errors']) }

Practice Problems

  1. Design an ETL pipeline to process Census 2021 data from CSV to MongoDB
  2. Create a data cleaning function that handles 10 common data quality issues
  3. Build a streaming pipeline using Kafka to process live election results
  4. Write validation rules for a student enrollment database
  5. Compare batch vs streaming approaches for stock market price updates

Key Takeaways

  • ETL pipelines move data through Extract → Transform → Load stages
  • Extraction retrieves data from diverse sources (APIs, files, databases)
  • Transformation cleans, validates, standardizes, and enriches data
  • Loading moves processed data to target systems efficiently
  • Data quality is critical - validate and monitor at each stage
  • Batch pipelines process accumulated data periodically
  • Streaming pipelines process data continuously as it arrives
  • Monitoring and error handling ensure pipeline reliability

Under the Hood: Data Pipelines and ETL: From Raw Data to Insights

Here is what separates someone who merely USES technology from someone who UNDERSTANDS it: knowing what happens behind the screen. When you tap "Send" on a WhatsApp message, do you know what journey that message takes? When you search something on Google, do you know how it finds the answer among billions of web pages in less than a second? When UPI processes a payment, what makes sure the money goes to the right person?

Understanding Data Pipelines and ETL: From Raw Data to Insights gives you the ability to answer these questions. More importantly, it gives you the foundation to BUILD things, not just use things other people built. India's tech industry employs over 5 million people, and companies like Infosys, TCS, Wipro, and thousands of startups are all built on the concepts we are about to explore.

This is not just theory for exams. This is how the real world works. Let us get into it.

Database Design: Normalisation and Relationships

Good database design prevents data duplication and inconsistency. This is called normalisation. Consider an e-commerce database:

-- BAD design (denormalised — data repeated everywhere)
-- If customer moves city, you must update EVERY order row!

-- GOOD design (normalised — each fact stored once)
CREATE TABLE customers ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE, city  TEXT
);

CREATE TABLE products ( id SERIAL PRIMARY KEY, name  TEXT NOT NULL, price DECIMAL(10,2), category TEXT
);

CREATE TABLE orders ( id SERIAL PRIMARY KEY, customer_id INTEGER REFERENCES customers(id), product_id  INTEGER REFERENCES products(id), quantity INTEGER, order_date  TIMESTAMP DEFAULT NOW()
);

-- JOIN to reconstruct the full picture
SELECT c.name, p.name AS product, o.quantity, (p.price * o.quantity) AS total
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN products p ON o.product_id = p.id
WHERE o.order_date > '2025-01-01';

The REFERENCES keyword creates a foreign key — a link between tables. This is a relational database: data is stored in related tables, and JOINs combine them. The tradeoff: normalised databases are consistent and space-efficient, but JOINs can be slow on very large datasets. This is why companies like Flipkart use a mix of SQL databases (for transactions) and NoSQL databases like MongoDB or Cassandra (for product catalogs and recommendations).

Did You Know?

🚀 ISRO is the world's 4th largest space agency, powered by Indian engineers. With a budget smaller than some Hollywood blockbusters, ISRO does things that cost 10x more for other countries. The Mangalyaan (Mars Orbiter Mission) proved India could reach Mars for the cost of a film. Chandrayaan-3 succeeded where others failed. This is efficiency and engineering brilliance that the world studies.

🏥 AI-powered healthcare diagnosis is being developed in India. Indian startups and research labs are building AI systems that can detect cancer, tuberculosis, and retinopathy from images — better than human doctors in some cases. These systems are being deployed in rural clinics across India, bringing world-class healthcare to millions who otherwise could not afford it.

🌾 Agriculture technology is transforming Indian farming. Drones with computer vision scan crop health. IoT sensors in soil measure moisture and nutrients. AI models predict yields and optimal planting times. Companies like Ninjacart and SoilCompanion are using these technologies to help farmers earn 2-3x more. This is computer science changing millions of lives in real-time.

💰 India has more coding experts per capita than most Western countries. India hosts platforms like CodeChef, which has over 15 million users worldwide. Indians dominate competitive programming rankings. Companies like Flipkart and Razorpay are building world-class engineering cultures. The talent is real, and if you stick with computer science, you will be part of this story.

Real-World System Design: Swiggy's Architecture

When you order food on Swiggy, here is what happens behind the scenes in about 2 seconds: your location is geocoded (algorithms), nearby restaurants are queried from a spatial index (data structures), menu prices are pulled from a database (SQL), delivery time is estimated using ML models trained on historical data (AI), the order is placed in a distributed message queue (Kafka), a delivery partner is assigned using a matching algorithm (optimization), and real-time tracking begins using WebSocket connections (networking). EVERY concept in your CS curriculum is being used simultaneously to deliver your biryani.

The Process: How Data Pipelines and ETL: From Raw Data to Insights Works in Production

In professional engineering, implementing data pipelines and etl: from raw data to insights requires a systematic approach that balances correctness, performance, and maintainability:

Step 1: Requirements Analysis and Design Trade-offs
Start with a clear specification: what does this system need to do? What are the performance requirements (latency, throughput)? What about reliability (how often can it fail)? What constraints exist (memory, disk, network)? Engineers create detailed design documents, often including complexity analysis (how does the system scale as data grows?).

Step 2: Architecture and System Design
Design the system architecture: what components exist? How do they communicate? Where are the critical paths? Use design patterns (proven solutions to common problems) to avoid reinventing the wheel. For distributed systems, consider: how do we handle failures? How do we ensure consistency across multiple servers? These questions determine the entire architecture.

Step 3: Implementation with Code Review and Testing
Write the code following the architecture. But here is the thing — it is not a solo activity. Other engineers read and critique the code (code review). They ask: is this maintainable? Are there subtle bugs? Can we optimize this? Meanwhile, automated tests verify every piece of functionality, from unit tests (testing individual functions) to integration tests (testing how components work together).

Step 4: Performance Optimization and Profiling
Measure where the system is slow. Use profilers (tools that measure where time is spent). Optimize the bottlenecks. Sometimes this means algorithmic improvements (choosing a smarter algorithm). Sometimes it means system-level improvements (using caching, adding more servers, optimizing database queries). Always profile before and after to prove the optimization worked.

Step 5: Deployment, Monitoring, and Iteration
Deploy gradually, not all at once. Run A/B tests (comparing two versions) to ensure the new system is better. Once live, monitor relentlessly: metrics dashboards, logs, traces. If issues arise, implement circuit breakers and graceful degradation (keeping the system partially functional rather than crashing completely). Then iterate — version 2.0 will be better than 1.0 based on lessons learned.


Algorithm Complexity and Big-O Notation

Big-O notation describes how an algorithm's performance scales with input size. This is THE most important concept for coding interviews:

  BIG-O COMPARISON (n = 1,000,000 elements): O(1) Constant 1 operation Hash table lookup O(log n) Logarithmic  20 operations Binary search O(n) Linear 1,000,000 ops Linear search O(n log n)  Linearithmic 20,000,000 ops Merge sort, Quick sort O(n²) Quadratic 1,000,000,000,000 Bubble sort, Selection sort O(2ⁿ) Exponential  ∞ (universe dies) Brute force subset Time at 1 billion ops/sec: O(n log n): 0.02 seconds ← Perfectly usable O(n²): 11.5 DAYS ← Completely unusable! O(2ⁿ): Longer than the age of the universe # Python example: Merge Sort (O(n log n)) def merge_sort(arr): if len(arr) <= 1: return arr mid = len(arr) // 2 left = merge_sort(arr[:mid]) # Sort left half right = merge_sort(arr[mid:]) # Sort right half return merge(left, right) # Merge sorted halves def merge(left, right): result = [] i = j = 0 while i < len(left) and j < len(right): if left[i] <= right[j]: result.append(left[i]); i += 1 else: result.append(right[j]); j += 1 result.extend(left[i:]) result.extend(right[j:]) return result

This matters in the real world. India's Aadhaar system must search through 1.4 billion biometric records for every authentication request. At O(n), that would take seconds per request. With the right data structures (hash tables, B-trees), it takes milliseconds. The algorithm choice is the difference between a working system and an unusable one.

Real Story from India

The India Stack Revolution

In the early 1990s, India's economy was closed. Indians could not easily send money abroad or access international services. But starting in 1991, India opened its economy. Young engineers in Bangalore, Hyderabad, and Chennai saw this as an opportunity. They built software companies (Infosys, TCS, Wipro) that served the world.

Fast forward to 2008. India had a problem: 500 million Indians had no formal identity. No bank account, no passport, no way to access government services. The government decided: let us use technology to solve this. UIDAI (Unique Identification Authority of India) was created, and engineers designed Aadhaar.

Aadhaar collects fingerprints and iris scans from every Indian, stores them in massive databases using sophisticated encryption, and allows anyone (even a street vendor) to verify identity instantly. Today, 1.4 billion Indians have Aadhaar. On top of Aadhaar, engineers built UPI (digital payments), Jan Dhan (bank accounts), and ONDC (open e-commerce network).

This entire stack — Aadhaar, UPI, Jan Dhan, ONDC — is called the India Stack. It is considered the most advanced digital infrastructure in the world. Governments and companies everywhere are trying to copy it. And it was built by Indian engineers using computer science concepts that you are learning right now.

Production Engineering: Data Pipelines and ETL: From Raw Data to Insights at Scale

Understanding data pipelines and etl: from raw data to insights at an academic level is necessary but not sufficient. Let us examine how these concepts manifest in production environments where failure has real consequences.

Consider India's UPI system processing 10+ billion transactions monthly. The architecture must guarantee: atomicity (a transfer either completes fully or not at all — no half-transfers), consistency (balances always add up correctly across all banks), isolation (concurrent transactions on the same account do not interfere), and durability (once confirmed, a transaction survives any failure). These are the ACID properties, and violating any one of them in a payment system would cause financial chaos for millions of people.

At scale, you also face the thundering herd problem: what happens when a million users check their exam results at the same time? (CBSE result day, anyone?) Without rate limiting, connection pooling, caching, and graceful degradation, the system crashes. Good engineering means designing for the worst case while optimising for the common case. Companies like NPCI (the organisation behind UPI) invest heavily in load testing — simulating peak traffic to identify bottlenecks before they affect real users.

Monitoring and observability become critical at scale. You need metrics (how many requests per second? what is the 99th percentile latency?), logs (what happened when something went wrong?), and traces (how did a single request flow through 15 different microservices?). Tools like Prometheus, Grafana, ELK Stack, and Jaeger are standard in Indian tech companies. When Hotstar streams IPL to 50 million concurrent users, their engineering team watches these dashboards in real-time, ready to intervene if any metric goes anomalous.

The career implications are clear: engineers who understand both the theory (from chapters like this one) AND the practice (from building real systems) command the highest salaries and most interesting roles. India's top engineering talent earns ₹50-100+ LPA at companies like Google, Microsoft, and Goldman Sachs, or builds their own startups. The foundation starts here.

Checkpoint: Test Your Understanding 🎯

Before moving forward, ensure you can answer these:

Question 1: Explain the tradeoffs in data pipelines and etl: from raw data to insights. What is better: speed or reliability? Can we have both? Why or why not?

Answer: Good engineers understand that there are always tradeoffs. Optimal depends on requirements — is this a real-time system or batch processing?

Question 2: How would you test if your implementation of data pipelines and etl: from raw data to insights is correct and performant? What would you measure?

Answer: Correctness testing, performance benchmarking, edge case handling, failure scenarios — just like professional engineers do.

Question 3: If data pipelines and etl: from raw data to insights fails in a production system (like UPI), what happens? How would you design to prevent or recover from failures?

Answer: Redundancy, failover systems, circuit breakers, graceful degradation — these are real concerns at scale.

Key Vocabulary

Here are important terms from this chapter that you should know:

JOIN: An important concept in APIs & Data Engineering
Index: An important concept in APIs & Data Engineering
Normalisation: An important concept in APIs & Data Engineering
Transaction: An important concept in APIs & Data Engineering
ACID: An important concept in APIs & Data Engineering

💡 Interview-Style Problem

Here is a problem that frequently appears in technical interviews at companies like Google, Amazon, and Flipkart: "Design a URL shortener like bit.ly. How would you generate unique short codes? How would you handle millions of redirects per second? What database would you use and why? How would you track click analytics?"

Think about: hash functions for generating short codes, read-heavy workload (99% redirects, 1% creates) suggesting caching, database choice (Redis for cache, PostgreSQL for persistence), and horizontal scaling with consistent hashing. Try sketching the system architecture on paper before looking up solutions. The ability to think through system design problems is the single most valuable skill for senior engineering roles.

Where This Takes You

The knowledge you have gained about data pipelines and etl: from raw data to insights is directly applicable to: competitive programming (Codeforces, CodeChef — India has the 2nd largest competitive programming community globally), open-source contribution (India is the 2nd largest contributor on GitHub), placement preparation (these concepts form 60% of technical interview questions), and building real products (every startup needs engineers who understand these fundamentals).

India's tech ecosystem offers incredible opportunities. Freshers at top companies earn ₹15-50 LPA; experienced engineers at FAANG companies in India earn ₹50-1 Cr+. But more importantly, the problems being solved in India — digital payments for 1.4 billion people, healthcare AI for rural areas, agricultural tech for 150 million farmers — are some of the most impactful engineering challenges in the world. The fundamentals you are building will be the tools you use to tackle them.

Crafted for Class 7–9 • APIs & Data Engineering • Aligned with NEP 2020 & CBSE Curriculum

← NoSQL Databases: When Tables Aren't EnoughReal-Time Data: WebSockets and Live Updates →
🔥 4× Challenge

Found this useful? Share it!

📱 WhatsApp 🐦 Twitter 💼 LinkedIn