Python Interview Questions for Data Engineers

Every data engineering interview starts here. These questions seem basic, but they reveal whether you truly understand Python or just copy-paste from Stack Overflow.

Key Insights

  • Data engineering interviews test both Python fundamentals and your ability to think about scale, reliability, and production-readiness—not just whether you can write code that works
  • The best candidates demonstrate awareness of tradeoffs: memory vs speed, ORM vs raw SQL, batch vs streaming—there’s rarely one right answer
  • Interviewers care less about memorized syntax and more about your approach to error handling, testing, and building maintainable pipelines

Core Python Fundamentals

Every data engineering interview starts here. These questions seem basic, but they reveal whether you truly understand Python or just copy-paste from Stack Overflow.

Common questions:

  • What’s the difference between a list, tuple, and set?
  • Explain mutability and why it matters for data pipelines
  • When would you use a dictionary comprehension?

The key insight interviewers want: you understand when to use each data structure, not just how.

# List comprehension for filtering and transforming data
raw_records = [
    {"user_id": 1, "amount": 100, "status": "completed"},
    {"user_id": 2, "amount": -50, "status": "pending"},
    {"user_id": 3, "amount": 200, "status": "completed"},
]

# Filter and transform in one pass
valid_transactions = [
    {"user": r["user_id"], "amount": r["amount"]}
    for r in raw_records
    if r["status"] == "completed" and r["amount"] > 0
]

# Dictionary merge operations (Python 3.9+)
defaults = {"timeout": 30, "retries": 3, "batch_size": 1000}
user_config = {"timeout": 60, "debug": True}
final_config = defaults | user_config  # user_config wins on conflicts

# Set operations for deduplication and comparison
yesterday_users = {1, 2, 3, 4, 5}
today_users = {4, 5, 6, 7, 8}

new_users = today_users - yesterday_users      # {6, 7, 8}
churned_users = yesterday_users - today_users  # {1, 2, 3}
retained_users = yesterday_users & today_users # {4, 5}

When asked about mutability, explain why it matters: mutable default arguments cause bugs, immutable data is safer for concurrent processing, and understanding this prevents subtle pipeline errors.

Data Manipulation with Pandas

Pandas questions dominate data engineering interviews. Interviewers want to see fluent, idiomatic code—not five lines where one would do.

Common questions:

  • How do you handle missing data?
  • Explain the difference between merge and join
  • When would you use transform vs apply vs agg?
import pandas as pd
import numpy as np

# Sample dataset
df = pd.DataFrame({
    "date": pd.date_range("2024-01-01", periods=10, freq="D"),
    "user_id": [1, 1, 2, 2, 2, 3, 3, 3, 3, 1],
    "category": ["A", "B", "A", "A", "B", "B", "B", "A", "A", "A"],
    "amount": [100, np.nan, 200, 150, np.nan, 300, 250, np.nan, 400, 50]
})

# Chained transformations - this is what interviewers want to see
result = (
    df
    .assign(
        amount_filled=lambda x: x.groupby("user_id")["amount"]
            .transform(lambda s: s.fillna(s.mean())),
        amount_normalized=lambda x: x["amount_filled"] / x.groupby("user_id")["amount_filled"]
            .transform("sum")
    )
    .query("category == 'A'")
    .groupby("user_id")
    .agg(
        total_amount=("amount_filled", "sum"),
        transaction_count=("amount_filled", "count"),
        first_transaction=("date", "min")
    )
    .reset_index()
)

# Window functions for running calculations
df["rolling_avg"] = (
    df.sort_values("date")
    .groupby("user_id")["amount"]
    .transform(lambda x: x.rolling(window=3, min_periods=1).mean())
)

# Pivot tables for reshaping
pivot = df.pivot_table(
    values="amount",
    index="user_id",
    columns="category",
    aggfunc=["sum", "count"],
    fill_value=0
)

The fillna strategy matters: forward fill for time series, mean/median for numerical features, or a sentinel value when missingness itself is informative. Explain your reasoning.

Working with Large Datasets

This separates junior from senior candidates. Anyone can process 1,000 rows. What happens with 100 million?

Common questions:

  • How do you process a file that doesn’t fit in memory?
  • When would you use Dask vs PySpark vs vanilla pandas?
  • Explain generators and why they matter for data pipelines
from typing import Iterator, Dict, Any
import sys

# Generator for streaming large files - memory efficient
def stream_records(filepath: str, chunk_size: int = 10000) -> Iterator[Dict[str, Any]]:
    """Yield records one at a time, never loading full file."""
    import csv
    with open(filepath, "r") as f:
        reader = csv.DictReader(f)
        for row in reader:
            yield row

# Processing with generators - constant memory regardless of file size
def process_large_file(filepath: str) -> Dict[str, float]:
    totals_by_category = {}
    for record in stream_records(filepath):
        category = record["category"]
        amount = float(record["amount"])
        totals_by_category[category] = totals_by_category.get(category, 0) + amount
    return totals_by_category

# Pandas chunking for when you need DataFrame operations
def chunked_aggregation(filepath: str) -> pd.DataFrame:
    chunks = []
    for chunk in pd.read_csv(filepath, chunksize=50000):
        # Process each chunk independently
        agg = chunk.groupby("category")["amount"].agg(["sum", "count"])
        chunks.append(agg)
    
    # Combine chunk results
    combined = pd.concat(chunks).groupby(level=0).sum()
    combined["mean"] = combined["sum"] / combined["count"]
    return combined

# Memory comparison - know these numbers
sample_list = list(range(1_000_000))
sample_generator = (x for x in range(1_000_000))

print(f"List memory: {sys.getsizeof(sample_list):,} bytes")      # ~8MB
print(f"Generator memory: {sys.getsizeof(sample_generator)} bytes")  # ~120 bytes

When discussing Dask vs PySpark: Dask when you want pandas-like syntax and single-machine parallelism, PySpark when you need true distributed processing across a cluster. Know the tradeoffs.

SQL and Database Interactions

Data engineers live at the intersection of Python and SQL. You need both.

Common questions:

  • How do you prevent SQL injection?
  • Explain connection pooling and why it matters
  • When would you use an ORM vs raw SQL?
from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
from contextlib import contextmanager

# Connection setup with pooling
engine = create_engine(
    "postgresql://user:pass@localhost/db",
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True  # Verify connections before use
)
Session = sessionmaker(bind=engine)

@contextmanager
def get_session():
    """Context manager for safe session handling."""
    session = Session()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Parameterized queries - NEVER use string formatting
def get_user_transactions(user_id: int, min_amount: float) -> list:
    with get_session() as session:
        result = session.execute(
            text("""
                SELECT * FROM transactions 
                WHERE user_id = :user_id AND amount >= :min_amount
            """),
            {"user_id": user_id, "min_amount": min_amount}
        )
        return [dict(row._mapping) for row in result]

# Bulk insert pattern - much faster than individual inserts
def bulk_insert_records(records: list[dict]) -> None:
    with get_session() as session:
        session.execute(
            text("""
                INSERT INTO transactions (user_id, amount, category)
                VALUES (:user_id, :amount, :category)
            """),
            records  # Pass list of dicts directly
        )

ORM vs raw SQL opinion: use raw SQL for complex analytical queries and bulk operations, ORM for CRUD operations and when you need object relationships. Don’t be dogmatic.

ETL Pipeline Patterns

Production pipelines fail. The question is how gracefully.

Common questions:

  • How do you make a pipeline idempotent?
  • Explain your error handling strategy
  • How do you implement retry logic?
import logging
import time
from functools import wraps
from typing import Callable, TypeVar

T = TypeVar("T")

# Configure logging properly
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

# Retry decorator with exponential backoff
def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    exceptions: tuple = (Exception,)
) -> Callable:
    def decorator(func: Callable[..., T]) -> Callable[..., T]:
        @wraps(func)
        def wrapper(*args, **kwargs) -> T:
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_retries - 1:
                        logger.error(f"Final attempt failed: {e}")
                        raise
                    delay = base_delay * (2 ** attempt)
                    logger.warning(f"Attempt {attempt + 1} failed, retrying in {delay}s: {e}")
                    time.sleep(delay)
            raise RuntimeError("Unreachable")
        return wrapper
    return decorator

@retry_with_backoff(max_retries=3, exceptions=(ConnectionError, TimeoutError))
def fetch_data_from_api(endpoint: str) -> dict:
    """Fetch with automatic retry on transient failures."""
    # API call implementation
    pass

# Idempotent upsert pattern
def upsert_records(records: list[dict], batch_date: str) -> None:
    """Idempotent load - safe to run multiple times."""
    with get_session() as session:
        # Delete existing records for this batch first
        session.execute(
            text("DELETE FROM daily_metrics WHERE batch_date = :batch_date"),
            {"batch_date": batch_date}
        )
        # Then insert fresh
        session.execute(
            text("""
                INSERT INTO daily_metrics (batch_date, metric_name, value)
                VALUES (:batch_date, :metric_name, :value)
            """),
            records
        )

Idempotency is crucial: if your pipeline runs twice, the result should be the same as running once. Delete-then-insert, upserts, or processing watermarks all achieve this.

Testing and Code Quality

Untested pipelines are broken pipelines waiting to happen.

Common questions:

  • How do you test a function that calls an external API?
  • What makes a good test fixture for data pipelines?
  • How do you assert DataFrame equality?
import pytest
import pandas as pd
from unittest.mock import Mock, patch

# Fixtures for reusable test data
@pytest.fixture
def sample_transactions() -> pd.DataFrame:
    return pd.DataFrame({
        "user_id": [1, 1, 2, 2],
        "amount": [100.0, 200.0, 150.0, 50.0],
        "category": ["A", "B", "A", "A"]
    })

@pytest.fixture
def mock_api_response() -> dict:
    return {"status": "success", "data": [{"id": 1, "value": 100}]}

# Testing with mocked external services
def test_fetch_and_transform(mock_api_response):
    with patch("mymodule.requests.get") as mock_get:
        mock_get.return_value = Mock(
            status_code=200,
            json=Mock(return_value=mock_api_response)
        )
        
        result = fetch_and_transform("http://api.example.com/data")
        
        assert len(result) == 1
        mock_get.assert_called_once()

# DataFrame equality assertions
def test_aggregation_logic(sample_transactions):
    result = aggregate_by_user(sample_transactions)
    
    expected = pd.DataFrame({
        "user_id": [1, 2],
        "total_amount": [300.0, 200.0]
    })
    
    pd.testing.assert_frame_equal(
        result.reset_index(drop=True),
        expected,
        check_dtype=False  # Be flexible on int vs float
    )

System Design Discussion Questions

These open-ended questions have no single right answer. Interviewers want to see your reasoning process.

Common questions:

  • How would you design a pipeline to process 10TB of daily logs?
  • What’s your approach to data validation?
  • How do you monitor pipeline health?
from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime

# Schema validation with Pydantic
class TransactionRecord(BaseModel):
    user_id: int
    amount: float
    timestamp: datetime
    category: Optional[str] = None
    
    @validator("amount")
    def amount_must_be_positive(cls, v):
        if v <= 0:
            raise ValueError("Amount must be positive")
        return v
    
    @validator("user_id")
    def user_id_must_be_valid(cls, v):
        if v <= 0:
            raise ValueError("Invalid user_id")
        return v

# Simple data quality check framework
class DataQualityChecker:
    def __init__(self, df: pd.DataFrame):
        self.df = df
        self.issues = []
    
    def check_nulls(self, columns: list[str], threshold: float = 0.05) -> "DataQualityChecker":
        for col in columns:
            null_rate = self.df[col].isnull().mean()
            if null_rate > threshold:
                self.issues.append(f"Column {col} has {null_rate:.1%} nulls (threshold: {threshold:.1%})")
        return self
    
    def check_unique(self, columns: list[str]) -> "DataQualityChecker":
        if self.df.duplicated(subset=columns).any():
            self.issues.append(f"Duplicate records found on columns: {columns}")
        return self
    
    def raise_on_issues(self) -> None:
        if self.issues:
            raise ValueError(f"Data quality issues: {self.issues}")

# Usage in pipeline
def validate_and_load(df: pd.DataFrame) -> None:
    (DataQualityChecker(df)
        .check_nulls(["user_id", "amount"])
        .check_unique(["transaction_id"])
        .raise_on_issues())
    
    # Proceed with load...

For system design discussions, talk through tradeoffs explicitly. Batch processing is simpler but has latency. Streaming is real-time but complex. Schema-on-read is flexible but risky. There’s no universally correct answer—show that you understand the implications of each choice.

Liked this? There's more.

Every week: one practical technique, explained simply, with code you can use immediately.