Categories
Databricks PySpark Python

PySpark Syntax Cheat Sheet: The Developer’s Quick Reference Guide

PySpark is the Python API for Apache Spark, giving data engineers and analysts the ability to process massive datasets using familiar Python syntax backed by distributed computing power. Whether you’re building ETL pipelines, transforming data in a Databricks Medallion architecture, or exploring a Delta Lake table, knowing the core PySpark syntax by heart saves hours of tab-switching to documentation.

This cheat sheet covers the most practical patterns you’ll reach for every day.

Setup: The Two Imports You Always Need

Before anything else, alias the functions and types modules. Nearly every example in this guide depends on them.

from pyspark.sql import functions as F, types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").getOrCreate()

Referencing them as F and T keeps your code concise and avoids naming collisions with Python built-ins.

Reading and Writing Data

# Read a CSV with headers and inferred schema
df = spark.read.option("header", True).option("inferSchema", True).csv("/data/sales.csv")

# Read a Parquet file (preferred format for performance)
df = spark.read.parquet("/data/transactions/")

# Read a Delta table (Databricks / Delta Lake)
df = spark.read.format("delta").load("/mnt/bronze/orders")

# Write as Parquet, overwriting existing data
df.write.mode("overwrite").parquet("/data/output/")

# Write as Delta with partitioning
df.write.format("delta").partitionBy("year", "month").mode("append").save("/mnt/silver/orders")

# Quick inspection
df.printSchema()
df.show(10, truncate=False)
df.describe().show()

Filtering Rows

# Single condition
df = df.filter(F.col("country") == "UK")

# Negation
df = df.filter(F.col("status") != "cancelled")

# Numeric range
df = df.filter((F.col("revenue") >= 1000) & (F.col("revenue") <= 50000))

# Null checks
df = df.filter(F.col("email").isNotNull())
df = df.filter(F.col("phone").isNull())

# Match against a set of values
df = df.filter(F.col("region").isin(["EMEA", "APAC", "AMER"]))

# Exclude a set of values
df = df.filter(~F.col("category").isin(["test", "demo"]))

# Sort ascending and descending
df = df.orderBy(F.col("created_at").desc(), F.col("customer_id").asc())

Selecting and Reshaping Columns

# Select a subset of columns
df = df.select("customer_id", "order_date", "total_amount")

# Rename on the fly using alias
df = df.select(
    F.col("cust_id").alias("customer_id"),
    F.col("ord_dt").alias("order_date"),
    F.col("amt").alias("total_amount")
)

# Add a literal constant column
df = df.withColumn("source_system", F.lit("salesforce"))

# Conditional column using when/otherwise
df = df.withColumn("tier",
    F.when(F.col("total_spend") > 10000, "platinum")
     .when(F.col("total_spend") > 5000, "gold")
     .when(F.col("total_spend") > 1000, "silver")
     .otherwise("bronze")
)

# Rename a single column
df = df.withColumnRenamed("amt", "total_amount")

# Drop columns you no longer need
df = df.drop("temp_flag", "internal_notes", "legacy_id")

# Standardise all column names to snake_case
for c in df.columns:
    df = df.withColumnRenamed(c, c.strip().lower().replace(" ", "_").replace("-", "_"))

Joins

# Inner join
df = orders.join(customers, on="customer_id", how="inner")

# Left join (keep all rows from orders)
df = orders.join(customers, on="customer_id", how="left")

# Join on columns with different names
df = orders.join(customers, orders.cust_id == customers.customer_id, "left")

# Multi-column join
df = invoices.join(payments, on=["invoice_id", "currency_code"], how="inner")

# Anti-join — find orders with no matching customer record
df = orders.join(customers, on="customer_id", how="left_anti")

# Avoid column ambiguity after join by dropping the duplicate key
df = orders.join(customers.select("customer_id", "country"), on="customer_id", how="left")

Handling Nulls and Duplicates

# Replace nulls with defaults
df = df.fillna({
    "country": "Unknown",
    "discount_pct": 0.0,
    "notes": ""
})

# Drop rows where any column is null
df = df.dropna()

# Drop rows where specific columns are null
df = df.dropna(subset=["customer_id", "order_date"])

# Use the first non-null value from a list of fallback columns
df = df.withColumn("display_name",
    F.coalesce(F.col("preferred_name"), F.col("full_name"), F.lit("Anonymous"))
)

# Remove fully duplicate rows
df = df.dropDuplicates()

# Deduplicate on a subset of columns (keeps first occurrence)
df = df.dropDuplicates(["customer_id", "order_date"])

String Operations

# Uppercase and lowercase
df = df.withColumn("country_upper", F.upper(F.col("country")))
df = df.withColumn("email_lower", F.lower(F.col("email")))

# Trim whitespace from both sides
df = df.withColumn("name", F.trim(F.col("name")))

# Concatenate strings with a separator (nulls ignored)
df = df.withColumn("full_address",
    F.concat_ws(", ", F.col("street"), F.col("city"), F.col("postcode"))
)

# Extract substring (1-based indexing)
df = df.withColumn("year_part", F.col("date_str").substr(1, 4))

# Replace pattern using regex
df = df.withColumn("clean_phone", F.regexp_replace(F.col("phone"), r"[^0-9]", ""))

# Extract first regex match
df = df.withColumn("postcode_area", F.regexp_extract(F.col("postcode"), r"^[A-Z]{1,2}", 0))

# Check if a column contains a substring
df = df.filter(F.col("product_name").contains("pro"))

# Split a string into an array
df = df.withColumn("tags_array", F.split(F.col("tags"), ","))

Type Casting

# Cast to common types
df = df.withColumn("order_id", F.col("order_id").cast(T.IntegerType()))
df = df.withColumn("total_amount", F.col("total_amount").cast(T.DoubleType()))
df = df.withColumn("is_active", F.col("is_active").cast(T.BooleanType()))
df = df.withColumn("event_ts", F.col("event_ts").cast(T.TimestampType()))

# Parse a date from a known string format
df = df.withColumn("order_date", F.to_date(F.col("order_date_str"), "yyyy-MM-dd"))

# Parse a timestamp
df = df.withColumn("created_at", F.to_timestamp(F.col("created_str"), "yyyy-MM-dd HH:mm:ss"))

Date and Timestamp Operations

# Get today's date and current timestamp
df = df.withColumn("today", F.current_date())
df = df.withColumn("now", F.current_timestamp())

# Extract date parts
df = df.withColumn("order_year", F.year(F.col("order_date")))
df = df.withColumn("order_month", F.month(F.col("order_date")))
df = df.withColumn("order_day", F.dayofmonth(F.col("order_date")))

# Date arithmetic
df = df.withColumn("due_date", F.date_add(F.col("order_date"), 30))
df = df.withColumn("prev_month_start", F.add_months(F.col("order_date"), -1))

# Difference between two dates in days
df = df.withColumn("days_to_ship", F.datediff(F.col("ship_date"), F.col("order_date")))

# Difference in months
df = df.withColumn("months_active", F.months_between(F.col("end_date"), F.col("start_date")))

# Filter by date range
df = df.filter(
    (F.col("order_date") >= F.lit("2024-01-01")) &
    (F.col("order_date") < F.lit("2025-01-01"))
)

Aggregations

# Basic group-by aggregation
df_summary = df.groupBy("region", "product_category").agg(
    F.count("order_id").alias("total_orders"),
    F.sum("revenue").alias("total_revenue"),
    F.avg("order_value").alias("avg_order_value"),
    F.max("order_date").alias("latest_order"),
    F.countDistinct("customer_id").alias("unique_customers")
)

# Collect values into a list or set per group
df_tags = df.groupBy("product_id").agg(
    F.collect_list("tag").alias("all_tags"),
    F.collect_set("category").alias("unique_categories")
)

# Pivot — turn row values into columns
df_pivot = df.groupBy("customer_id").pivot("year").agg(F.sum("revenue"))

Window Functions

Window functions are essential for ranking, running totals, and lag/lead comparisons without collapsing the dataframe.

from pyspark.sql.window import Window

# Define a window: partition by customer, order by date
w = Window.partitionBy("customer_id").orderBy("order_date")

# Row number per customer (sequential rank)
df = df.withColumn("row_num", F.row_number().over(w))

# Dense rank (no gaps for ties)
df = df.withColumn("rank", F.dense_rank().over(w))

# Running total of revenue per customer
df = df.withColumn("running_revenue",
    F.sum("revenue").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)

# Previous order date (lag)
df = df.withColumn("prev_order_date", F.lag("order_date", 1).over(w))

# Next order date (lead)
df = df.withColumn("next_order_date", F.lead("order_date", 1).over(w))

# Keep only the most recent order per customer
df_latest = df.filter(F.col("row_num") == 1)

Array Operations

# Combine columns into an array
df = df.withColumn("contact_methods", F.array(F.col("email"), F.col("phone")))

# Check if an array contains a value
df = df.filter(F.array_contains(F.col("tags_array"), "premium"))

# Get the size of an array column
df = df.withColumn("tag_count", F.size(F.col("tags_array")))

# Explode array into individual rows
df = df.select("product_id", F.explode(F.col("tags_array")).alias("tag"))

# Remove duplicates from an array column
df = df.withColumn("unique_tags", F.array_distinct(F.col("tags_array")))

UDFs: User Defined Functions

Use UDFs when built-in functions can’t express your logic. Note that UDFs are slower than native Spark functions — prefer F.when, F.regexp_replace, and friends wherever possible.

from pyspark.sql.types import StringType

# Define and register a UDF
def classify_spend(amount):
    if amount is None:
        return "unknown"
    elif amount > 10000:
        return "high"
    elif amount > 1000:
        return "medium"
    return "low"

classify_spend_udf = F.udf(classify_spend, StringType())

df = df.withColumn("spend_band", classify_spend_udf(F.col("total_spend")))

Repartitioning and Performance Tips

# Repartition to a specific number of partitions (triggers a full shuffle)
df = df.repartition(8)

# Repartition by a column — useful before writing partitioned output
df = df.repartition(F.col("country"))

# Coalesce — reduce partitions without a full shuffle (use before writing small outputs)
df = df.coalesce(1)

# Cache a dataframe that will be used multiple times
df.cache()

# Unpersist when done
df.unpersist()

# Check the execution plan
df.explain(True)

Quick Reference: Common Function Summary

OperationFunction
Conditional logicF.when().otherwise()
Null fallbackF.coalesce()
Static valueF.lit()
Current dateF.current_date()
String concatF.concat_ws()
Regex replaceF.regexp_replace()
Row numberF.row_number().over(w)
Explode arrayF.explode()
Count distinctF.countDistinct()
Type cast.cast(T.IntegerType())

This cheat sheet covers the core 90% of PySpark operations you’ll encounter in real data engineering work. For advanced topics like structured streaming, Delta Lake merge operations, or Unity Catalog permissions, the official PySpark documentation is the authoritative reference.