Delta Lake
- SSH to emr master node
Bash |
---|
| ssh hadoop@<emr-master-node>
|
- Run the following command
- Connect to spark tool
-
If you are using OLAC, connect to pyspark
as below
Bash |
---|
| pyspark \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
|
-
If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions
as below:
Bash |
---|
| pyspark \
--conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
|
-
Include the below additional configuration if you have enabled JWT authorization in the cluster.
Bash |
---|
| --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
|
-
Run spark read/write
Bash |
---|
| df = spark.read.format("delta").option("header", "true").option("inferSchema", "true").load("s3a://${S3_BUCKET}/${DELTA_FILE}")
df.show(5)
df.write.format("delta").option("header", "true").mode("overwrite").save("s3a://${S3_BUCKET}/${DELTA_FILE}")
|
-
If you are using OLAC, connect to spark-shell
as below
Bash |
---|
| spark-shell \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
|
-
If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions
as below:
Bash |
---|
| spark-shell \
--conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
|
-
Include the below additional configuration if you have enabled JWT authorization in the cluster.
Bash |
---|
| --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
|
-
Run spark read/write
Bash |
---|
| val df = spark.read.format("delta").option("header", "true").option("inferSchema", "true").load("s3a://${S3_BUCKET}/${DELTA_FILE}")
df.show(5)
df.write.format("delta").option("header", "true").mode("overwrite").save("s3a://${S3_BUCKET}/${DELTA_FILE}")
|
-
If you are using OLAC, connect to spark-sql
as below
Bash |
---|
| spark-sql \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
|
-
If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions
as below:
Bash |
---|
| spark-sql \
--conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
|
-
Include the below additional configuration if you have enabled JWT authorization in the cluster.
Bash |
---|
| --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
|
-
Run spark sql query
Bash |
---|
| CREATE TABLE IF NOT EXISTS <db_name>.<table_name> (
id INT,
name STRING,
age INT,
city STRING)
USING DELTA
LOCATION 's3://${S3_BUCKET}/${TABLE_PATH}';
SELECT * FROM <db_name>.<table_name>;
|