Connector Guide - AWS EMR - Using Delta Lake If you are using AWS EMR with Delta Lake, you can use the following instructions to connect to the cluster and run Spark jobs.
SSH to your EMR master node:
Bash ssh your_user@<emr-master-node>
If you are using JWT for authentication, then you will have to pass the JWT token to the EMR cluster. You can do this by either passing the JWT token directly as a command-line argument or using a file path containing the JWT token.
To pass the JWT token directly as a command-line argument, use the following configuration when connecting to the cluster:
Bash --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
To use the file path containing the JWT token, use the following configuration:
Bash --conf "spark.hadoop.privacera.jwt.token=<path-to-jwt-token-file>"
Connecting to Apache Spark Cluster
If you are using JWT, then add the --conf
option from above with JWT while connecting to the cluster
If you are using OLAC, connect to pyspark
as below
Bash pyspark \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions
as below:
Bash pyspark \
--conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
Run spark read/write
Bash df = spark.read.format( "delta" ) .option( "header" , "true" ) .option( "inferSchema" , "true" ) .load( "s3a:// ${ S3_BUCKET } / ${ DELTA_FILE } " )
df.show( 5 )
df.write.format( "delta" ) .option( "header" , "true" ) .mode( "overwrite" ) .save( "s3a:// ${ S3_BUCKET } / ${ DELTA_FILE } " )
If you are using OLAC, connect to spark-shell
as below
Bash spark-shell \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions
as below:
Bash spark-shell \
--conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
Run spark read/write
Bash val df = spark.read.format( "delta" ) .option( "header" , "true" ) .option( "inferSchema" , "true" ) .load( "s3a:// ${ S3_BUCKET } / ${ DELTA_FILE } " )
df.show( 5 )
df.write.format( "delta" ) .option( "header" , "true" ) .mode( "overwrite" ) .save( "s3a:// ${ S3_BUCKET } / ${ DELTA_FILE } " )
When using Spark SQL, the query retrieves the metadata from AWS Glue catalog or Hive Metastore, which provides the location of the data in S3. The access to these files is controlled by Privacera.
For running SQL commands, the cluster should have access to the AWS Glue catalog or Hive Metastore.
If you are using OLAC, connect to spark-sql
as below
Bash spark-sql \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions
as below:
Bash spark-sql \
--conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
Run spark sql query
Bash CREATE TABLE IF NOT EXISTS <db_name>.<table_name> (
id INT,
name STRING,
age INT,
city STRING)
USING DELTA
LOCATION 's3://${S3_BUCKET}/${TABLE_PATH}' ;
SELECT * FROM <db_name>.<table_name>;