Delta Lake Operations
- Read
- Discuss
This tutorial introduces common Delta Lake operations on Databricks, including the following:
- Create a table.
- Upsert to a table.
- Read from a table.
- Write to a table
- Update a table
- Display table history.
- Delete from a table
- Query an earlier version of a table.
- Optimize a table.
Create a table
All tables created on Databricks use Delta Lake by default.
# Load the data from its source.
df=spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_table"
df.write.saveAsTable(table_name)
The preceding operations create a new managed table by using the schema that was inferred from the data. For information about available options when you create a Delta table.
For managed tables, Databricks determines the location for the data. To get the location, you can use the DESCRIBE DETAIL statement, for example:
display(spark.sql('DESCRIBE DETAIL people_table'))
You can also use the DeltaTableBuilder API in Delta Lake to create tables. Compared to the DataFrameWriter APIs, this API makes it easier to specify additional information like column comments, table properties, and generated columns.
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people_table") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Upsert to a table
To merge a set of updates and insertions into an existing Delta table, you use the MERGE INTO statement.
For example, the following statement takes data from the source table and merges it into the target Delta table. When there is a matching row in both tables, Delta Lake uses the provided expression to update the data column. Delta Lake inserts a new row whenever there isn’t a matching row. This operation is known as an upsert.
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_table.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
If you specify *, this updates or inserts all columns in the target table. This assumes that the source table has the same columns as those in the target table, otherwise the query will throw an analysis error.
You must specify a value for every column in your table when you perform an INSERT operation (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.
To see the results, query the table.
SELECT * FROM people_10m WHERE id >= 9999998
Read a table
You access data in Delta tables by the table name or the table path, as shown in the following examples:
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
Write to a table
To atomically add new data to an existing Delta table, use append mode as in the following example:
df.write.mode("append").saveAsTable("people_table1")
To atomically replace all the data in a table, use overwrite mode as in the following example:
df.write.mode("overwrite").saveAsTable("people_table1")
Update a table
You can update data that matches a predicate in a Delta table. For example, in a table named people_table or a path at /tmp/delta/people-10m, to change an abbreviation in the gender column from M or F to Male or Female, you can run the following:
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Delete from a table
You can remove data that matches a predicate from a Delta table. For instance, in a table named people_table or a path at /tmp/delta/people-10m, to delete all rows corresponding to people with a value in the birthDate column from before 1955, you can run the following:
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Display table history
To view the history of a table, use the DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.
SQL code:
DESCRIBE HISTORY people_table
Optimize a table
Once you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use OPTIMIZE to collapse small files into larger ones:
SQL:
OPTIMIZE people_table
Query an earlier version of the table (time travel)
Delta Lake time travel allows you to query an older snapshot of a Delta table.
DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table, for example in Python:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_table")
display(df1)
or, alternately:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_table")
display(df2)
Leave a Reply
You must be logged in to post a comment.