diff --git a/src/pages/latest/concurrency-control.mdx b/src/pages/latest/concurrency-control.mdx index 3d51d35..96891ed 100644 --- a/src/pages/latest/concurrency-control.mdx +++ b/src/pages/latest/concurrency-control.mdx @@ -7,16 +7,7 @@ DeltaLake provides ACID transaction guarantees between reads and writes. This means that: - For supported [storage systems](/latest/delta-storage), multiple writers across multiple clusters can simultaneously modify a table partition and see a consistent snapshot view of the table and there will be a serial order for these writes. -- Readers continue to see a consistent snapshot view of the table that the - - Databricks job started with, even when a table is modified during a job. - -- For supported [storage systems](delta-storage.md), multiple writers across - multiple clusters can simultaneously modify a table partition and see a - consistent snapshot view of the table and there will be a serial order for - these writes. -- Readers continue to see a consistent snapshot view of the table that the - ApacheSpark job started with, even when a table is modified during a job. +- Readers continue to see a consistent snapshot view of the table that the Apache Spark job started with, even when a table is modified during a job. In this article: diff --git a/src/pages/latest/delta-storage.mdx b/src/pages/latest/delta-storage.mdx index e067d93..83d53c9 100644 --- a/src/pages/latest/delta-storage.mdx +++ b/src/pages/latest/delta-storage.mdx @@ -116,67 +116,67 @@ to ensure that only one writer is able to create a file. This section explains how to quickly start reading and writing Delta tables on S3 using single-cluster mode. For a detailed explanation of the configuration, -see [\_](#setup-configuration-s3-multi-cluster). +see [Setup Configuration (S3 multi-cluster)](#setup-configuration-s3-multi-cluster). -#. Use the following command to launch a Spark shell with Delta Lake and S3 +1. Use the following command to launch a Spark shell with Delta Lake and S3 support (assuming you use Spark 3.2.1 which is pre-built for Hadoop 3.3.1): - + -```bash -bin/spark-shell \ - --packages io.delta:delta-core_2.12:$VERSION$,org.apache.hadoop:hadoop-aws:3.3.1 \ - --conf spark.hadoop.fs.s3a.access.key= \ - --conf spark.hadoop.fs.s3a.secret.key= -``` + ```bash + bin/spark-shell \ + --packages io.delta:delta-core_2.12:$VERSION$,org.apache.hadoop:hadoop-aws:3.3.1 \ + --conf spark.hadoop.fs.s3a.access.key= \ + --conf spark.hadoop.fs.s3a.secret.key= + ``` - + -#. Try out some basic Delta table operations on S3 (in Scala): +2. Try out some basic Delta table operations on S3 (in Scala): - + -```scala -// Create a Delta table on S3: -spark.range(5).write.format("delta").save("s3a:///") + ```scala + // Create a Delta table on S3: + spark.range(5).write.format("delta").save("s3a:///") -// Read a Delta table on S3: -spark.read.format("delta").load("s3a:///").show() -``` + // Read a Delta table on S3: + spark.read.format("delta").load("s3a:///").show() + ``` - + For other languages and more examples of Delta table operations, see the -[\_](quick-start.md) page. +[Quickstart](/latest/quick-start) page. #### Configuration (S3 single-cluster) Here are the steps to configure Delta Lake for S3. -#. Include `hadoop-aws` JAR in the classpath. +1. Include `hadoop-aws` JAR in the classpath. -Delta Lake needs the `org.apache.hadoop.fs.s3a.S3AFileSystem` class from the -`hadoop-aws` package, which implements Hadoop's `FileSystem` API for S3. Make -sure the version of this package matches the Hadoop version with which Spark was -built. + Delta Lake needs the `org.apache.hadoop.fs.s3a.S3AFileSystem` class from the + `hadoop-aws` package, which implements Hadoop's `FileSystem` API for S3. Make + sure the version of this package matches the Hadoop version with which Spark was + built. -#. Set up S3 credentials. +2. Set up S3 credentials. -We recommend using [IAM -roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) for -authentication and authorization. But if you want to use keys, here is one way -is to set up the [Hadoop -configurations](https://spark.apache.org/docs/latest/configuration.html#custom-hadoophive-configuration) -(in Scala): + We recommend using [IAM + roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html) for + authentication and authorization. But if you want to use keys, here is one way + is to set up the [Hadoop + configurations](https://spark.apache.org/docs/latest/configuration.html#custom-hadoophive-configuration) + (in Scala): - + -```scala -sc.hadoopConfiguration.set("fs.s3a.access.key", "") -sc.hadoopConfiguration.set("fs.s3a.secret.key", "") -``` + ```scala + sc.hadoopConfiguration.set("fs.s3a.access.key", "") + sc.hadoopConfiguration.set("fs.s3a.secret.key", "") + ``` - + @@ -201,7 +201,7 @@ that S3 is lacking. #### Requirements (S3 multi-cluster) -- All of the requirements listed in [\_](#requirements-s3-single-cluster) +- All of the requirements listed in [Requirements (S3 single-cluster)](#requirements-s3-single-cluster) section - In additon to S3 credentials, you also need DynamoDB operating permissions @@ -210,118 +210,118 @@ that S3 is lacking. This section explains how to quickly start reading and writing Delta tables on S3 using multi-cluster mode. -#. Use the following command to launch a Spark shell with Delta Lake and S3 +1. Use the following command to launch a Spark shell with Delta Lake and S3 support (assuming you use Spark 3.2.1 which is pre-built for Hadoop 3.3.1): - + -```bash -bin/spark-shell \ - --packages io.delta:delta-core_2.12:$VERSION$,org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-storage-s3-dynamodb:$VERSION$ \ - --conf spark.hadoop.fs.s3a.access.key= \ - --conf spark.hadoop.fs.s3a.secret.key= \ - --conf spark.delta.logStore.s3a.impl=io.delta.storage.S3DynamoDBLogStore \ - --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-west-2 -``` + ```bash + bin/spark-shell \ + --packages io.delta:delta-core_2.12:$VERSION$,org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-storage-s3-dynamodb:$VERSION$ \ + --conf spark.hadoop.fs.s3a.access.key= \ + --conf spark.hadoop.fs.s3a.secret.key= \ + --conf spark.delta.logStore.s3a.impl=io.delta.storage.S3DynamoDBLogStore \ + --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-west-2 + ``` - + -#. Try out some basic Delta table operations on S3 (in Scala): +2. Try out some basic Delta table operations on S3 (in Scala): - + -```scala -// Create a Delta table on S3: -spark.range(5).write.format("delta").save("s3a:///") + ```scala + // Create a Delta table on S3: + spark.range(5).write.format("delta").save("s3a:///") -// Read a Delta table on S3: -spark.read.format("delta").load("s3a:///").show() -``` + // Read a Delta table on S3: + spark.read.format("delta").load("s3a:///").show() + ``` - + #### Setup Configuration (S3 multi-cluster) -#. Create the DynamoDB table. +1. Create the DynamoDB table. -You have the choice of creating the DynamoDB table yourself (recommended) or -having it created for you automatically. + You have the choice of creating the DynamoDB table yourself (recommended) or + having it created for you automatically. -- Creating the DynamoDB table yourself + - Creating the DynamoDB table yourself - This DynamoDB table will maintain commit metadata for multiple Delta tables, - and it is important that it is configured with the [Read/Write Capacity - Mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) - (for example, on-demand or provisioned) that is right for your use cases. As - such, we strongly recommend that you create your DynamoDB table yourself. The - following example uses the AWS CLI. To learn more, see the - [create-table](https://docs.aws.amazon.com/cli/latest/reference/dynamodb/create-table.html) - command reference. + This DynamoDB table will maintain commit metadata for multiple Delta tables, + and it is important that it is configured with the [Read/Write Capacity + Mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html) + (for example, on-demand or provisioned) that is right for your use cases. As + such, we strongly recommend that you create your DynamoDB table yourself. The + following example uses the AWS CLI. To learn more, see the + [create-table](https://docs.aws.amazon.com/cli/latest/reference/dynamodb/create-table.html) + command reference. - + -```bash -aws dynamodb create-table \ - --region us-east-1 \ - --table-name delta_log \ - --attribute-definitions AttributeName=tablePath,AttributeType=S \ - AttributeName=fileName,AttributeType=S \ - --key-schema AttributeName=tablePath,KeyType=HASH \ - AttributeName=fileName,KeyType=RANGE \ - --billing-mode PAY_PER_REQUEST -``` + ```bash + aws dynamodb create-table \ + --region us-east-1 \ + --table-name delta_log \ + --attribute-definitions AttributeName=tablePath,AttributeType=S \ + AttributeName=fileName,AttributeType=S \ + --key-schema AttributeName=tablePath,KeyType=HASH \ + AttributeName=fileName,KeyType=RANGE \ + --billing-mode PAY_PER_REQUEST + ``` - + - - once you select a `table-name` and `region`, you will have to specify them in - each Spark session in order for this multi-cluster mode to work correctly. See - table below. - + + once you select a `table-name` and `region`, you will have to specify them in + each Spark session in order for this multi-cluster mode to work correctly. See + table below. + -- Automatic DynamoDB table creation + - Automatic DynamoDB table creation - Nonetheless, after specifying this `LogStore `implementation, if the default - DynamoDB table does not already exist, then it will be created for you - automatically. This default table supports 5 strongly consistent reads and 5 - writes per second. You may change these default values using the - table-creation-only configurations keys detailed in the table below. + Nonetheless, after specifying this `LogStore `implementation, if the default + DynamoDB table does not already exist, then it will be created for you + automatically. This default table supports 5 strongly consistent reads and 5 + writes per second. You may change these default values using the + table-creation-only configurations keys detailed in the table below. -#. Follow the configuration steps listed in -[\_](#configuration-s3-single-cluster) section. +2. Follow the configuration steps listed in +[Configuration (S3 single-cluster)](#configuration-s3-single-cluster) section. -#. Include the `delta-storage-s3-dynamodb` JAR in the classpath. +3. Include the `delta-storage-s3-dynamodb` JAR in the classpath. -#. Configure the `LogStore` implementation in your Spark session. +4. Configure the `LogStore` implementation in your Spark session. -First, configure this `LogStore` implementation for the scheme `s3`. You can -replicate this command for schemes `s3a` and `s3n` as well. + First, configure this `LogStore` implementation for the scheme `s3`. You can + replicate this command for schemes `s3a` and `s3n` as well. - + -```ini -spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore -``` + ```ini + spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore + ``` - + -Next, specify additional information necessary to instantiate the DynamoDB -client. You must instantiate the DynamoDB client with the same `tableName` and -`region` each Spark session for this multi-cluster mode to work correctly. A -list of per-session configurations and their defaults is given below: + Next, specify additional information necessary to instantiate the DynamoDB + client. You must instantiate the DynamoDB client with the same `tableName` and + `region` each Spark session for this multi-cluster mode to work correctly. A + list of per-session configurations and their defaults is given below: -| Configuration Key | Description | Default | -| ------------------------------------------------------------------- | ----------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | -| spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName | The name of the DynamoDB table to use | delta_log | -| spark.io.delta.storage.S3DynamoDBLogStore.ddb.region | The region to be used by the client | us-east-1 | -| spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider | The AWSCredentialsProvider\* used by the client | [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) | -| spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu | (Table-creation-only\*\*) Read Capacity Units | 5 | -| spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu | (Table-creation-only\*\*) Write Capacity Units | 5 | + | Configuration Key | Description | Default | + | ------------------------------------------------------------------- | ----------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------ | + | spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName | The name of the DynamoDB table to use | delta_log | + | spark.io.delta.storage.S3DynamoDBLogStore.ddb.region | The region to be used by the client | us-east-1 | + | spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider | The AWSCredentialsProvider\* used by the client | [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html) | + | spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu | (Table-creation-only\*\*) Read Capacity Units | 5 | + | spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu | (Table-creation-only\*\*) Write Capacity Units | 5 | -- \*For more details on AWS credential providers, see the [AWS - documentation](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html). -- \*\*These configurations are only used when the given DynamoDB table doesn't - already exist and needs to be automatically created. + - \*For more details on AWS credential providers, see the [AWS + documentation](https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/credentials.html). + - \*\*These configurations are only used when the given DynamoDB table doesn't + already exist and needs to be automatically created. #### Production Configuration (S3 multi-cluster) @@ -329,104 +329,104 @@ By this point, this multi-cluster setup is fully operational. However, there is extra configuration you may do to improve performance and optimize storage when running in production. -#. Adjust your Read and Write Capacity Mode. - -If you are using the default DynamoDB table created for you by this `LogStore` -implementation, its default RCU and WCU might not be enough for your workloads. -You can [adjust the provisioned -throughput](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ProvisionedThroughput.html#ProvisionedThroughput.CapacityUnits.Modifying) -or [update to On-Demand -Mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable). - -#. Cleanup old DynamoDB entries using Time to Live (TTL). - -Once a DynamoDB metadata entry is marked as complete, and after sufficient time -such that we can now rely on S3 alone to prevent accidental overwrites on its -corresponding Delta file, it is safe to delete that entry from DynamoDB. The -cheapest way to do this is using [DynamoDB's -TTL](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html) -feature which is a free, automated means to delete items from your DynamoDB -table. - -Run the following command on your given DynamoDB table to enable TTL: - -```bash aws dynamodb update-time-to-live \ --region us-east-1 \ - --table-name delta_log \ --time-to-live-specification "Enabled=true, - AttributeName=commitTime" -``` - -#. Cleanup old AWS S3 temp files using S3 Lifecycle Expiration. - -In this `LogStore` implementation, a temp file is created containing a copy of -the metadata to be committed into the Delta log. Once that commit to the Delta -log is complete, and after the corresponding DynamoDB entry has been removed, it -is safe to delete this temp file. In practice, only the latest temp file will -ever be used during recovery of a failed commit. - -Here are two simple options for deleting these temp files. - -#. Delete manually using S3 CLI. - -This is the safest option. The following command will delete all but the latest temp file in your given `` and ``: - -```bash -aws s3 ls s3:////_delta_log/.tmp/ --recursive | awk 'NF>1{print $4}' | grep . | sort | head -n -1 | while read -r line ; do - echo "Removing ${line}" - aws s3 rm s3:////_delta_log/.tmp/${line} -done -``` - -#. Delete using an S3 Lifecycle Expiration Rule - -A more automated option is to use an [S3 Lifecycle Expiration rule](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html), with filter prefix pointing to the `/_delta_log/.tmp/` folder located in your table path, and an expiration value of 30 days. - - - It is important that you choose a sufficiently large expiration value. As - stated above, the latest temp file will be used during recovery of a failed - commit. If this temp file is deleted, then your DynamoDB table and S3 - `delta_table_path/_delta_log/.tmp/` folder will be out of sync. - - -There are a variety of ways to configuring a bucket lifecycle configuration, -described in AWS docs -[here](https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-set-lifecycle-configuration-intro.html). - -One way to do this is using S3's `put-bucket-lifecycle-configuration` command. -See [S3 Lifecycle -Configuration](https://docs.aws.amazon.com/cli/latest/reference/s3api/put-bucket-lifecycle-configuration.html) -for details. An example rule and command invocation is given below: - - - -```json -// file://lifecycle.json -{ - "Rules": [ - { - "ID": "expire_tmp_files", - "Filter": { - "Prefix": "path/to/table/_delta_log/.tmp/" - }, - "Status": "Enabled", - "Expiration": { - "Days": 30 - } - } - ] -} -``` - - - -```bash aws s3api put-bucket-lifecycle-configuration \ --bucket my-bucket -\ --lifecycle-configuration file://lifecycle.json -``` - - - AWS S3 may have a limit on the number of rules per bucket. See - [PutBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html) - for details. - +1. Adjust your Read and Write Capacity Mode. + + If you are using the default DynamoDB table created for you by this `LogStore` + implementation, its default RCU and WCU might not be enough for your workloads. + You can [adjust the provisioned + throughput](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ProvisionedThroughput.html#ProvisionedThroughput.CapacityUnits.Modifying) + or [update to On-Demand + Mode](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.html#WorkingWithTables.Basics.UpdateTable). + +2. Cleanup old DynamoDB entries using Time to Live (TTL). + + Once a DynamoDB metadata entry is marked as complete, and after sufficient time + such that we can now rely on S3 alone to prevent accidental overwrites on its + corresponding Delta file, it is safe to delete that entry from DynamoDB. The + cheapest way to do this is using [DynamoDB's + TTL](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html) + feature which is a free, automated means to delete items from your DynamoDB + table. + + Run the following command on your given DynamoDB table to enable TTL: + + ```bash aws dynamodb update-time-to-live \ --region us-east-1 \ + --table-name delta_log \ --time-to-live-specification "Enabled=true, + AttributeName=commitTime" + ``` + +3. Cleanup old AWS S3 temp files using S3 Lifecycle Expiration. + + In this `LogStore` implementation, a temp file is created containing a copy of + the metadata to be committed into the Delta log. Once that commit to the Delta + log is complete, and after the corresponding DynamoDB entry has been removed, it + is safe to delete this temp file. In practice, only the latest temp file will + ever be used during recovery of a failed commit. + + Here are two simple options for deleting these temp files. + + 1. Delete manually using S3 CLI. + + This is the safest option. The following command will delete all but the latest temp file in your given `` and `
`: + + ```bash + aws s3 ls s3:////_delta_log/.tmp/ --recursive | awk 'NF>1{print $4}' | grep . | sort | head -n -1 | while read -r line ; do + echo "Removing ${line}" + aws s3 rm s3:////_delta_log/.tmp/${line} + done + ``` + + 2. Delete using an S3 Lifecycle Expiration Rule + + A more automated option is to use an [S3 Lifecycle Expiration rule](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html), with filter prefix pointing to the `/_delta_log/.tmp/` folder located in your table path, and an expiration value of 30 days. + + + It is important that you choose a sufficiently large expiration value. As + stated above, the latest temp file will be used during recovery of a failed + commit. If this temp file is deleted, then your DynamoDB table and S3 + `delta_table_path/_delta_log/.tmp/` folder will be out of sync. + + + There are a variety of ways to configuring a bucket lifecycle configuration, + described in AWS docs + [here](https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-set-lifecycle-configuration-intro.html). + + One way to do this is using S3's `put-bucket-lifecycle-configuration` command. + See [S3 Lifecycle + Configuration](https://docs.aws.amazon.com/cli/latest/reference/s3api/put-bucket-lifecycle-configuration.html) + for details. An example rule and command invocation is given below: + + + + ```json + // file://lifecycle.json + { + "Rules": [ + { + "ID": "expire_tmp_files", + "Filter": { + "Prefix": "path/to/table/_delta_log/.tmp/" + }, + "Status": "Enabled", + "Expiration": { + "Days": 30 + } + } + ] + } + ``` + + + + ```bash aws s3api put-bucket-lifecycle-configuration \ --bucket my-bucket + \ --lifecycle-configuration file://lifecycle.json + ``` + + + AWS S3 may have a limit on the number of rules per bucket. See + [PutBucketLifecycleConfiguration](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutBucketLifecycleConfiguration.html) + for details. + ## Microsoft Azure storage @@ -473,37 +473,37 @@ along with Apache Spark 3.0 compiled and deployed with Hadoop 3.2. Here are the steps to configure Delta Lake on Azure Blob storage. -#. Include `hadoop-azure` JAR in the classpath. See the requirements above for +1. Include `hadoop-azure` JAR in the classpath. See the requirements above for version details. -#. Set up credentials. +2. Set up credentials. -You can set up your credentials in the [Spark configuration -property](https://spark.apache.org/docs/latest/configuration.html). + You can set up your credentials in the [Spark configuration + property](https://spark.apache.org/docs/latest/configuration.html). -We recommend that you use a SAS token. In Scala, you can use the following: + We recommend that you use a SAS token. In Scala, you can use the following: - + -```scala -spark.conf.set( - "fs.azure.sas...blob.core.windows.net", - "") -``` + ```scala + spark.conf.set( + "fs.azure.sas...blob.core.windows.net", + "") + ``` - + -Or you can specify an account access key: + Or you can specify an account access key: - + -```scala -spark.conf.set( - "fs.azure.account.key..blob.core.windows.net", - "") -``` + ```scala + spark.conf.set( + "fs.azure.account.key..blob.core.windows.net", + "") + ``` - + #### Usage (Azure Blob storage) @@ -541,25 +541,25 @@ along with Apache Spark 3.0 compiled and deployed with Hadoop 3.2. Here are the steps to configure Delta Lake on Azure Data Lake Storage Gen1. -#. Include `hadoop-azure-datalake` JAR in the classpath. See the requirements +1. Include `hadoop-azure-datalake` JAR in the classpath. See the requirements above for version details. -#. Set up Azure Data Lake Storage Gen1 credentials. +2. Set up Azure Data Lake Storage Gen1 credentials. -You can set the following [Hadoop -configurations](https://spark.apache.org/docs/latest/configuration.html#custom-hadoophive-configuration) -with your credentials (in Scala): + You can set the following [Hadoop + configurations](https://spark.apache.org/docs/latest/configuration.html#custom-hadoophive-configuration) + with your credentials (in Scala): - + -```scala -spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential") -spark.conf.set("dfs.adls.oauth2.client.id", "") -spark.conf.set("dfs.adls.oauth2.credential", "") -spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com//oauth2/token") -``` + ```scala + spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential") + spark.conf.set("dfs.adls.oauth2.client.id", "") + spark.conf.set("dfs.adls.oauth2.credential", "") + spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com//oauth2/token") + ``` - + #### Usage (ADLS Gen1) @@ -600,40 +600,40 @@ along with Apache Spark 3.0 compiled and deployed with Hadoop 3.2. Here are the steps to configure Delta Lake on Azure Data Lake Storage Gen1. -#. Include the JAR of the Maven artifact `hadoop-azure-datalake` in the +1. Include the JAR of the Maven artifact `hadoop-azure-datalake` in the classpath. See the [requirements](#azure-blob-storage) for version details. In addition, you may also have to include JARs for Maven artifacts `hadoop-azure` and `wildfly-openssl`. -#. Set up Azure Data Lake Storage Gen2 credentials. +2. Set up Azure Data Lake Storage Gen2 credentials. - + -```scala -spark.conf.set("fs.azure.account.auth.type..dfs.core.windows.net", "OAuth") -spark.conf.set("fs.azure.account.oauth.provider.type..dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") -spark.conf.set("fs.azure.account.oauth2.client.id..dfs.core.windows.net", "") -spark.conf.set("fs.azure.account.oauth2.client.secret..dfs.core.windows.net","") -spark.conf.set("fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", "https://login.microsoftonline.com//oauth2/token") -``` + ```scala + spark.conf.set("fs.azure.account.auth.type..dfs.core.windows.net", "OAuth") + spark.conf.set("fs.azure.account.oauth.provider.type..dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") + spark.conf.set("fs.azure.account.oauth2.client.id..dfs.core.windows.net", "") + spark.conf.set("fs.azure.account.oauth2.client.secret..dfs.core.windows.net","") + spark.conf.set("fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", "https://login.microsoftonline.com//oauth2/token") + ``` - + -where ``, ``, `` and -`` are details of the service principal we set as requirements -earlier. + where ``, ``, `` and + `` are details of the service principal we set as requirements + earlier. -#. Initialize the file system if needed +3. Initialize the file system if needed - + -```scala -spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true") -dbutils.fs.ls("abfss://@.dfs.core.windows.net/") -spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false") -``` + ```scala + spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true") + dbutils.fs.ls("abfss://@.dfs.core.windows.net/") + spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false") + ``` - + #### Usage (ADLS Gen2) @@ -673,18 +673,18 @@ reading and writing from GCS. ### Configuration (GCS) -#. For Delta Lake 1.2.0 and below, you must explicitly configure the LogStore +1. For Delta Lake 1.2.0 and below, you must explicitly configure the LogStore implementation for the scheme `gs`. - + -```ini -spark.delta.logStore.gs.impl=io.delta.storage.GCSLogStore -``` + ```ini + spark.delta.logStore.gs.impl=io.delta.storage.GCSLogStore + ``` - + -#. Include the JAR for `gcs-connector` in the classpath. See the +2. Include the JAR for `gcs-connector` in the classpath. See the [documentation](https://cloud.google.com/dataproc/docs/tutorials/gcs-connector-spark-tutorial) for details on how to configure Spark with GCS. @@ -722,22 +722,22 @@ concurrently reading and writing. ### Configuration (OCI) -#. Configure LogStore implementation for the scheme `oci`. +1. Configure LogStore implementation for the scheme `oci`. - + -```ini -spark.delta.logStore.oci.impl=io.delta.storage.OracleCloudLogStore -``` + ```ini + spark.delta.logStore.oci.impl=io.delta.storage.OracleCloudLogStore + ``` - + -#. Include the JARs for `delta-contribs` and `hadoop-oci-connector` in the +2. Include the JARs for `delta-contribs` and `hadoop-oci-connector` in the classpath. See [Using the HDFS Connector with Spark](https://docs.oracle.com/en-us/iaas/Content/API/SDKDocs/hdfsconnectorspark.htm) for details on how to configure Spark with OCI. -#. Set the OCI Object Store credentials as explained in the +3. Set the OCI Object Store credentials as explained in the [documentation](https://docs.oracle.com/en-us/iaas/Content/API/SDKDocs/hdfsconnector.htm). ### Usage (OCI) @@ -775,43 +775,43 @@ concurrently reading and writing. ### Configuration (IBM) -#. Configure LogStore implementation for the scheme `cos`. +1. Configure LogStore implementation for the scheme `cos`. - + -```ini -spark.delta.logStore.cos.impl=io.delta.storage.IBMCOSLogStore -``` + ```ini + spark.delta.logStore.cos.impl=io.delta.storage.IBMCOSLogStore + ``` - + -#. Include the JARs for `delta-contribs` and `Stocator` in the classpath. +2. Include the JARs for `delta-contribs` and `Stocator` in the classpath. -#. Configure `Stocator` with atomic write support by setting the following +3. Configure `Stocator` with atomic write support by setting the following properties in the Hadoop configuration. - + -```ini fs.stocator.scheme.list=cos -fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem -fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient -fs.stocator.cos.scheme=cos fs.cos.atomic.write=true -``` + ```ini fs.stocator.scheme.list=cos + fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem + fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient + fs.stocator.cos.scheme=cos fs.cos.atomic.write=true + ``` - + -#. Set up IBM COS credentials. The example below uses access keys with a service +4. Set up IBM COS credentials. The example below uses access keys with a service named `service` (in Scala): - + -```scala -sc.hadoopConfiguration.set("fs.cos.service.endpoint", "") -sc.hadoopConfiguration.set("fs.cos.service.access.key", "") -sc.hadoopConfiguration.set("fs.cos.service.secret.key", "") -``` + ```scala + sc.hadoopConfiguration.set("fs.cos.service.endpoint", "") + sc.hadoopConfiguration.set("fs.cos.service.access.key", "") + sc.hadoopConfiguration.set("fs.cos.service.secret.key", "") + ``` - + ### Usage (IBM) diff --git a/src/pages/latest/delta-streaming.mdx b/src/pages/latest/delta-streaming.mdx index db1cd6f..fd31ff5 100644 --- a/src/pages/latest/delta-streaming.mdx +++ b/src/pages/latest/delta-streaming.mdx @@ -3,6 +3,8 @@ title: Streaming Reads and Writes menu: docs --- +import initialSnapshotImg from '/static/images/delta/delta-initial-snapshot-data-drop.png'; + Delta Lake is deeply integrated with [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) through `readStream` and `writeStream`. Delta Lake overcomes many of the limitations typically associated with streaming systems and files, including: - Maintaining "exactly-once" processing with more than one stream (or concurrent batch jobs) @@ -83,7 +85,7 @@ If you update a `user_email` with the `UPDATE` statement, the file containing th You can use the following options to specify the starting point of the Delta Lake streaming source without processing the entire table. -- `startingVersion`: The Delta Lake version to start from. All table changes starting from this version (inclusive) will be read by the streaming source. You can obtain the commit versions from the `version` column of the [DESCRIBE HISTORY](https://delta.io/docs/spark/utilities#delta-history) command output. +- `startingVersion`: The Delta Lake version to start from. All table changes starting from this version (inclusive) will be read by the streaming source. You can obtain the commit versions from the `version` column of the [DESCRIBE HISTORY](/latest/delta-utility#delta-history) command output. - To return only the latest changes, specify `latest`. @@ -141,6 +143,12 @@ You can avoid the data drop issue by enabling the following option: With event time order enabled, the event time range of initial snapshot data is divided into time buckets. Each micro batch processes a bucket by filtering data within the time range. The `maxFilesPerTrigger` and `maxBytesPerTrigger` configuration options are still applicable to control the microbatch size but only in an approximate way due to the nature of the processing. +The graphic below shows this process: + +Initial Snapshot +
+
+ Notable information about this feature: - The data drop issue only happens when the initial Delta snapshot of a stateful streaming query is processed in the default order. @@ -273,7 +281,7 @@ The preceding example continuously updates a table that contains the aggregate n For applications with more lenient latency requirements, you can save computing resources with one-time triggers. Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived since the last update. -## Idempotent table writes in `foreachBatch` +## Idempotent table writes in foreachBatch Available in Delta Lake 2.0.0 and above. @@ -290,14 +298,14 @@ Delta table uses the combination of `txnAppId` and `txnVersion` to identify dupl If a batch write is interrupted with a failure, rerunning the batch uses the same application and batch ID, which would help the runtime correctly identify duplicate writes and ignore them. Application ID (`txnAppId`) can be any user-generated unique string and does not have to be related to the stream ID. - + If you delete the streaming checkpoint and restart the query with a new checkpoint, you must provide a different `appId`; otherwise, writes from the restarted query will be ignored because it will contain the same `txnAppId` and the batch ID would start from 0. -The same `DataFrameWriter` options can be used to achieve the idempotent writes in non-Streaming job. For details [Idempotent writes](/latest/delta-batch/#idempotent-writes). +The same `DataFrameWriter` options can be used to achieve the idempotent writes in non-Streaming job. For details see [Idempotent writes](/latest/delta-batch/#idempotent-writes). diff --git a/src/pages/latest/porting.mdx b/src/pages/latest/porting.mdx index c885101..539a9c3 100644 --- a/src/pages/latest/porting.mdx +++ b/src/pages/latest/porting.mdx @@ -111,7 +111,7 @@ CONVERT TO DELTA events -For details, see [Convert a Parquet table to a Delta table](delta-utility.md#convert-to-delta). +For details, see [Convert a Parquet table to a Delta table](/latest/delta-utility#convert-to-delta). ## Migrate Delta Lake workloads to newer versions diff --git a/static/images/delta/delta-initial-snapshot-data-drop.png b/static/images/delta/delta-initial-snapshot-data-drop.png new file mode 100644 index 0000000..cdb0ce7 Binary files /dev/null and b/static/images/delta/delta-initial-snapshot-data-drop.png differ