Delta Lake Operations

  • Read
  • Discuss

This tutorial introduces common Delta Lake operations on Databricks, including the following:

  • Create a table.
  • Upsert to a table.
  • Read from a table.
  • Write to a table
  • Update a table
  • Display table history.
  • Delete from a table
  • Query an earlier version of a table.
  • Optimize a table.

Create a table

All tables created on Databricks use Delta Lake by default.

# Load the data from its source.
df=spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_table"
df.write.saveAsTable(table_name)

The preceding operations create a new managed table by using the schema that was inferred from the data. For information about available options when you create a Delta table.

For managed tables, Databricks determines the location for the data. To get the location, you can use the DESCRIBE DETAIL statement, for example:

display(spark.sql('DESCRIBE DETAIL people_table'))

You can also use the DeltaTableBuilder API in Delta Lake to create tables. Compared to the DataFrameWriter APIs, this API makes it easier to specify additional information like column comments, table properties, and generated columns.

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people_table") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Upsert to a table

To merge a set of updates and insertions into an existing Delta table, you use the MERGE INTO statement. 

For example, the following statement takes data from the source table and merges it into the target Delta table. When there is a matching row in both tables, Delta Lake uses the provided expression to update the data column. Delta Lake inserts a new row whenever there isn’t a matching row. This operation is known as an upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_table.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

If you specify *, this updates or inserts all columns in the target table. This assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.

You must specify a value for every column in your table when you perform an INSERT operation (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.

To see the results, query the table.

SELECT * FROM people_10m WHERE id >= 9999998

Read a table

You access data in Delta tables by the table name or the table path, as shown in the following examples:

people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)

Write to a table

To atomically add new data to an existing Delta table, use append mode as in the following example:

df.write.mode("append").saveAsTable("people_table1")

To atomically replace all the data in a table, use overwrite mode as in the following example:

df.write.mode("overwrite").saveAsTable("people_table1")

Update a table

You can update data that matches a predicate in a Delta table. For example, in a table named people_table or a path at /tmp/delta/people-10m, to change an abbreviation in the gender column from M or F to Male or Female, you can run the following:

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Delete from a table

You can remove data that matches a predicate from a Delta table. For instance, in a table named people_table or a path at /tmp/delta/people-10m, to delete all rows corresponding to people with a value in the birthDate column from before 1955, you can run the following:

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Display table history

To view the history of a table, use the DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.

SQL code:

DESCRIBE HISTORY people_table

Optimize a table

Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use OPTIMIZE to collapse small files into larger ones:

SQL:

OPTIMIZE people_table

Query an earlier version of the table (time travel)

Delta Lake time travel allows you to query an older snapshot of a Delta table.

DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table, for example in Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_table")
display(df1)

or, alternately:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_table")
display(df2)

Leave a Reply

Leave a Reply

Scroll to Top