Skip to content

Connector Guide - AWS EMR - Using Delta Lake

If you are using AWS EMR with Delta Lake, you can use the following instructions to connect to the cluster and run Spark jobs.

  1. SSH to your EMR master node:

    Bash
    ssh your_user@<emr-master-node>
    

  2. If you are using JWT for authentication, then you will have to pass the JWT token to the EMR cluster. You can do this by either passing the JWT token directly as a command-line argument or using a file path containing the JWT token.

    • To pass the JWT token directly as a command-line argument, use the following configuration when connecting to the cluster:

      Bash
       --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
      

    • To use the file path containing the JWT token, use the following configuration:

      Bash
      --conf "spark.hadoop.privacera.jwt.token=<path-to-jwt-token-file>"
      

  3. Connecting to Apache Spark Cluster

If you are using JWT, then add the --conf option from above with JWT while connecting to the cluster

  • If you are using OLAC, connect to pyspark as below

    Bash
    1
    2
    3
    pyspark \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions as below:

    Bash
    1
    2
    3
    pyspark \
      --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    

  • Run spark read/write

    Bash
    1
    2
    3
    4
    df = spark.read.format("delta").option("header", "true").option("inferSchema", "true").load("s3a://${S3_BUCKET}/${DELTA_FILE}")
    df.show(5)
    
    df.write.format("delta").option("header", "true").mode("overwrite").save("s3a://${S3_BUCKET}/${DELTA_FILE}")
    

  • If you are using OLAC, connect to spark-shell as below

    Bash
    1
    2
    3
    spark-shell \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions as below:

    Bash
    1
    2
    3
    spark-shell \
      --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    

  • Run spark read/write

    Bash
    1
    2
    3
    4
    val df = spark.read.format("delta").option("header", "true").option("inferSchema", "true").load("s3a://${S3_BUCKET}/${DELTA_FILE}")
    df.show(5)
    
    df.write.format("delta").option("header", "true").mode("overwrite").save("s3a://${S3_BUCKET}/${DELTA_FILE}")
    

When using Spark SQL, the query retrieves the metadata from AWS Glue catalog or Hive Metastore, which provides the location of the data in S3. The access to these files is controlled by Privacera.

For running SQL commands, the cluster should have access to the AWS Glue catalog or Hive Metastore.

  • If you are using OLAC, connect to spark-sql as below

    Bash
    1
    2
    3
    spark-sql \
      --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions as below:

    Bash
    1
    2
    3
    spark-sql \
      --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,io.delta.sql.DeltaSparkSessionExtension" \
      --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" \
    

  • Run spark sql query

    Bash
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE TABLE IF NOT EXISTS <db_name>.<table_name> (
        id INT,
        name STRING,
        age INT,
        city STRING)
        USING DELTA
        LOCATION 's3://${S3_BUCKET}/${TABLE_PATH}';
    
    SELECT * FROM <db_name>.<table_name>;
    

Comments