DataFrames & SQL in Databricks: Reading, Writing, and Transforming Data
DataFrames & SQL in Databricks: Reading, Writing, and Transforming Data This is where things get real. So far we've set up our environment, understood clusters, learned how Spark works under the hood, and connected to cloud storage. Now we actually touch data. In this article you'll learn how to read data in multiple formats, transform it using PySpark and SQL, and write results back to storage. These are the operations you'll use every single day as a data engineer. A DataFrame is Spark's primary data structure — a distributed table with named columns and a defined schema. If you've used Pandas, the concept is familiar. But there are critical differences: Pandas DataFrame Spark DataFrame Where data lives Single machine RAM Distributed across cluster Data size limit Your machine's RAM Practically unlimited Execution Immediate Lazy (runs on action) API pandas pyspark.sql Best for 1000).show() # Filter with SQL string (often cleaner) df.filter("amount > 1000 AND region = 'North'").show() # .where() is an alias for .filter() df.where("region = 'South'").show() # Multiple conditions df.filter((col("amount") > 500) & (col("region").isin(["North", "South"]))).show() from pyspark.sql.functions import col, upper, round, when, lit # Add a new column df = df.withColumn("amount_eur", col("amount") * 0.92) # Modify an existing column df = df.withColumn("product", upper(col("product"))) # Round a numeric column df = df.withColumn("amount", round(col("amount"), 2)) # Conditional column (like IF/CASE) df = df.withColumn("tier", when(col("amount") >= 5000, "Premium") .when(col("amount") >= 1000, "Standard") .otherwise("Basic") ) # Add a constant column df = df.withColumn("currency", lit("USD")) from pyspark.sql.functions import sum, avg, count, max, min, countDistinct # Total revenue by region df.groupBy("region") \ .agg(sum("amount").alias("total_revenue")) \ .show() # Multiple aggregations at once df.groupBy("region", "product") \ .agg( sum("amount").alias("total_revenue"), avg("amount").alias("avg_order_value"), count("order_id").alias("total_orders"), countDistinct("customer_id").alias("unique_customers") ) \ .orderBy("total_revenue", ascending=False) \ .show() # Load a second DataFrame customers = spark.read.table("customers_silver") # Inner join (default) result = df.join(customers, on="customer_id", how="inner") # Left join result = df.join(customers, on="customer_id", how="left") # Join on multiple columns result = df.join( customers, on=["customer_id", "region"], how="inner" ) # Join on different column names result = df.join( customers, df.customer_id == customers.id, how="inner" ).drop(customers.id) # drop duplicate column ⚠️ Joins are expensive in Spark — they trigger a shuffle (data redistribution across workers). Avoid joining unnecessarily, and filter before joining when possible. # Drop fully duplicate rows df = df.dropDuplicates() # Drop duplicates based on specific columns (keep first occurrence) df = df.dropDuplicates(["order_id"]) # Drop rows with any null value df = df.dropna() # Drop rows where specific columns are null df = df.dropna(subset=["order_id", "amount"]) # Fill nulls with a default value df = df.fillna({"amount": 0.0, "region": "Unknown"}) Everything you can do with PySpark DataFrames, you can also do with SQL. In Databricks, SQL is a first-class citizen. To run SQL against a DataFrame, register it as a temporary view: df.createOrReplaceTempView("sales") # Now query it with SQL result = spark.sql(""" SELECT region, product, SUM(amount) AS total_revenue, COUNT(order_id) AS total_orders, AVG(amount) AS avg_order_value FROM sales WHERE order_date >= '2024-01-01' GROUP BY region, product ORDER BY total_revenue DESC """) result.show() In notebooks, you can switch an entire cell to SQL mode: %sql SELECT region, DATE_TRUNC('month', order_date) AS month, SUM(amount) AS monthly_revenue FROM sales WHERE year(order_date) = 2024 GROUP BY region, month ORDER BY month, monthly_revenue DESC The result renders automatically as an interactive table — you can even switch to a chart view directly in the notebook. Temp views disappear when the session ends. For persistent tables, use CREATE TABLE or save as Delta: %sql CREATE TABLE IF NOT EXISTS gold.monthly_revenue USING DELTA AS SELECT region, DATE_TRUNC('month', order_date) AS month, SUM(amount) AS total_revenue FROM sales_silver GROUP BY region, month Both PySpark and SQL produce the same result in Databricks — Spark executes them through the same engine. So how do you choose? Situation Recommendation Simple aggregations and filters SQL — more readable Complex multi-step transformations PySpark — easier to chain and debug Conditional logic (when/otherwise) PySpark — cleaner than nested CASE WHEN Window functions Either — SQL is often cleaner Building reusable pipeline functions PySpark — easier to parameterize Ad-hoc exploration SQL — fast to write and results display nicely Working with nested/complex types PySpark — more flexible In practice, experienced data engineers mix both freely. Use whatever makes the code clearest. After transforming your data, you need to write it somewhere. df.write \ .mode("overwrite") \ .parquet("/mnt/processed/sales/") df.write \ .format("delta") \ .mode("overwrite") \ .save("/mnt/processed/sales/") Mode Behavior overwrite Replace existing data entirely append Add new data to existing data ignore Do nothing if data already exists error (default) Raise an error if data already exists df.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("year", "month") \ .save("/mnt/processed/sales/") df.write \ .format("delta") \ .mode("overwrite") \ .saveAsTable("sales_silver") This registers the table in the Databricks catalog — queryable by name from any notebook or the SQL Editor. Let's put it all together. A realistic transformation pipeline: from pyspark.sql.functions import col, upper, round, when, year, month, sum, count # 1. Read raw data raw = spark.read.csv("/mnt/raw/sales.csv", header=True, inferSchema=True) # 2. Clean and enrich cleaned = raw \ .dropDuplicates(["order_id"]) \ .dropna(subset=["order_id", "amount", "customer_id"]) \ .withColumn("product", upper(col("product"))) \ .withColumn("amount", round(col("amount"), 2)) \ .withColumn("tier", when(col("amount") >= 5000, "Premium") .when(col("amount") >= 1000, "Standard") .otherwise("Basic") ) \ .withColumn("year", year(col("order_date"))) \ .withColumn("month", month(col("order_date"))) # 3. Aggregate to gold gold = cleaned \ .groupBy("region", "year", "month") \ .agg( sum("amount").alias("total_revenue"), count("order_id").alias("total_orders") ) # 4. Write Silver layer cleaned.write \ .format("delta") \ .mode("overwrite") \ .partitionBy("year", "month") \ .save("/mnt/processed/sales_silver/") # 5. Write Gold layer gold.write \ .format("delta") \ .mode("overwrite") \ .saveAsTable("gold.monthly_sales") print("Pipeline complete.") This is the skeleton of real data engineering work. In articles 9 and 10, we'll build this into a full Medallion Architecture pipeline. Here's what you've learned in this article: A Spark DataFrame is a distributed table — similar to Pandas but built for scale Reading data: CSV, JSON, Parquet, and Delta all have their place — prefer Parquet/Delta in production Core transformations: select, filter, withColumn, groupBy, join, dropDuplicates, fillna SQL works side by side with PySpark — use whichever makes your code clearer Writing data: always use Delta format, choose your write mode carefully, and partition large tables Use saveAsTable to register tables in the catalog for easy SQL access In the next article, we go deep on the technology that makes all of this reliable: Delta Lake.
