Skip to content

Connector Guide - AWS EMR - Using Apache IceBerg

  1. SSH to your EMR master node:

    Bash
    ssh your_user@<emr-master-node>
    

  2. If you are using JWT for authentication, then you will have to pass the JWT token to the EMR cluster. You can do this by either passing the JWT token directly as a command-line argument or using a file path containing the JWT token.

    • To pass the JWT token directly as a command-line argument, use the following configuration when connecting to the cluster:

      Bash
       --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
      

    • To use the file path containing the JWT token, use the following configuration:

      Bash
      --conf "spark.hadoop.privacera.jwt.token=<path-to-jwt-token-file>"
      

  3. Connecting to Apache Spark Cluster

If you are using JWT, then add the --conf option from above with JWT while connecting to the cluster

  • For Hadoop catalog,

    Bash
    1
    2
    3
    4
    5
    pyspark \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.hadoop_catalog.type=hadoop" \
      --conf "spark.sql.catalog.hadoop_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}"
    

  • For Glue catalog,

    Bash
    1
    2
    3
    4
    5
    6
    7
    pyspark \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.glue_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}" \
      --conf "spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog" \
      --conf "spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO" \
      --conf "spark.sql.catalog.glue_catalog.s3.client-factory-impl=com.privacera.iceberg.aws.s3.PrivaceraAwsClientFactory"
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions property in above command as follows:

    Bash
    --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    

  • Create table and read

    Bash
    ## Create a DataFrame.
    data = spark.createDataFrame([
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
        ],["id", "creation_date", "last_update_time"])
    
    ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
    spark.sql("""CREATE TABLE IF NOT EXISTS <my_catalog>.<db_name>.<table_name> (id string,
        creation_date string,
        last_update_time string)
        USING iceberg
        location 's3://${S3_BUCKET}/${TABLE_PATH}'""")
    
    data.writeTo("<my_catalog>.<db_name>.<table_name>").append()
    
    df = spark.read.format("iceberg").load("<my_catalog>.<db_name>.<table_name")
    df.show()
    

  • For Hadoop catalog,

    Bash
    1
    2
    3
    4
    5
    spark-shell \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.hadoop_catalog.type=hadoop" \
      --conf "spark.sql.catalog.hadoop_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}"
    

  • For Glue catalog,

    Bash
    1
    2
    3
    4
    5
    6
    7
    spark-shell \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.glue_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}" \
      --conf "spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog" \
      --conf "spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO" \
      --conf "spark.sql.catalog.glue_catalog.s3.client-factory-impl=com.privacera.iceberg.aws.s3.PrivaceraAwsClientFactory"
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions property in above command as follows:

    Bash
    --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    

  • Create table and read

    Bash
    ## Create a DataFrame.
    val data = Seq(
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
        ).toDF("id", "creation_date", "last_update_time")
    
    ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
    spark.sql("""CREATE TABLE IF NOT EXISTS <my_catalog>.<db_name>.<table_name> (id string,
        creation_date string,
        last_update_time string)
        USING iceberg
        location 's3://${S3_BUCKET}/${TABLE_PATH}'""")
    
    data.writeTo("<my_catalog>.<db_name>.<table_name>").append()
    
    val df = spark.read.format("iceberg").load("<my_catalog>.<db_name>.<table_name")
    df.show()
    

When using Spark SQL, the query retrieves the metadata from AWS Glue catalog or Hive Metastore, which provides the location of the data in S3. The access to these files is controlled by Privacera.

For running SQL commands, the cluster should have access to the AWS Glue catalog or Hive Metastore.

  • For Hadoop catalog,

    Bash
    1
    2
    3
    4
    5
    spark-sql \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.hadoop_catalog.type=hadoop" \
      --conf "spark.sql.catalog.hadoop_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}"
    

  • For Glue catalog,

    Bash
    1
    2
    3
    4
    5
    6
    7
    spark-sql \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.glue_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}" \
      --conf "spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog" \
      --conf "spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO" \
      --conf "spark.sql.catalog.glue_catalog.s3.client-factory-impl=com.privacera.iceberg.aws.s3.PrivaceraAwsClientFactory"
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions property in above command as follows:

    Bash
    --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    

  • Run spark sql query

    Bash
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE TABLE IF NOT EXISTS <my_catalog>.<db_name>.<table_name> (
        id INT,
        name STRING,
        age INT,
        city STRING)
        USING iceberg
        LOCATION 's3://${S3_BUCKET}/${TABLE_PATH}';
    
    SELECT * FROM <my_catalog>.<db_name>.<table_name>;
    

Comments