Skip to content

Iceberg

  1. SSH to emr master node
    Bash
    ssh hadoop@<emr-master-node>
    
  2. Run the following command
    Bash
    sudo su - <user>
    kinit
    
  3. Connect to spark tool
  • For Hadoop catalog,

    Bash
    1
    2
    3
    4
    5
    pyspark \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.hadoop_catalog.type=hadoop" \
      --conf "spark.sql.catalog.hadoop_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}"
    

  • For Glue catalog,

    Bash
    1
    2
    3
    4
    5
    6
    7
    pyspark \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.glue_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}" \
      --conf "spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog" \
      --conf "spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO" \
      --conf "spark.sql.catalog.glue_catalog.s3.client-factory-impl=com.privacera.iceberg.aws.s3.PrivaceraAwsClientFactory"
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions property in above command as follows:

    Bash
    --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    

  • Include the below additional configuration if you have enabled JWT authorization in the cluster.

    Bash
    --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
    

  • Create table and read

    Bash
    ## Create a DataFrame.
    data = spark.createDataFrame([
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
        ],["id", "creation_date", "last_update_time"])
    
    ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
    spark.sql("""CREATE TABLE IF NOT EXISTS <my_catalog>.<db_name>.<table_name> (id string,
        creation_date string,
        last_update_time string)
        USING iceberg
        location 's3://${S3_BUCKET}/${TABLE_PATH}'""")
    
    data.writeTo("<my_catalog>.<db_name>.<table_name>").append()
    
    df = spark.read.format("iceberg").load("<my_catalog>.<db_name>.<table_name")
    df.show()
    

  • For Hadoop catalog,

    Bash
    1
    2
    3
    4
    5
    spark-shell \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.hadoop_catalog.type=hadoop" \
      --conf "spark.sql.catalog.hadoop_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}"
    

  • For Glue catalog,

    Bash
    1
    2
    3
    4
    5
    6
    7
    spark-shell \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.glue_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}" \
      --conf "spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog" \
      --conf "spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO" \
      --conf "spark.sql.catalog.glue_catalog.s3.client-factory-impl=com.privacera.iceberg.aws.s3.PrivaceraAwsClientFactory"
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions property in above command as follows:

    Bash
    --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    

  • Include the below additional configuration if you have enabled JWT authorization in the cluster.

    Bash
    --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
    

  • Create table and read

    Bash
    ## Create a DataFrame.
    val data = Seq(
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
        ).toDF("id", "creation_date", "last_update_time")
    
    ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
    spark.sql("""CREATE TABLE IF NOT EXISTS <my_catalog>.<db_name>.<table_name> (id string,
        creation_date string,
        last_update_time string)
        USING iceberg
        location 's3://${S3_BUCKET}/${TABLE_PATH}'""")
    
    data.writeTo("<my_catalog>.<db_name>.<table_name>").append()
    
    val df = spark.read.format("iceberg").load("<my_catalog>.<db_name>.<table_name")
    df.show()
    

  • For Hadoop catalog,

    Bash
    1
    2
    3
    4
    5
    spark-sql \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.hadoop_catalog.type=hadoop" \
      --conf "spark.sql.catalog.hadoop_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}"
    

  • For Glue catalog,

    Bash
    1
    2
    3
    4
    5
    6
    7
    spark-sql \
      --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
      --conf "spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog" \
      --conf "spark.sql.catalog.glue_catalog.warehouse=s3://${S3_BUCKET}/${S3_OBJECT_PATH}" \
      --conf "spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog" \
      --conf "spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO" \
      --conf "spark.sql.catalog.glue_catalog.s3.client-factory-impl=com.privacera.iceberg.aws.s3.PrivaceraAwsClientFactory"
    

  • If you are using FGAC or OLAC_FGAC, update the spark.sql.extensions property in above command as follows:

    Bash
    --conf "spark.sql.extensions=com.privacera.spark.agent.SparkSQLExtension,org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
    

  • Include the below additional configuration if you have enabled JWT authorization in the cluster.

    Bash
    --conf "spark.hadoop.privacera.jwt.token.str=<your-jwt-token>"
    

  • Run spark sql query

    Bash
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE TABLE IF NOT EXISTS <my_catalog>.<db_name>.<table_name> (
        id INT,
        name STRING,
        age INT,
        city STRING)
        USING iceberg
        LOCATION 's3://${S3_BUCKET}/${TABLE_PATH}';
    
    SELECT * FROM <my_catalog>.<db_name>.<table_name>;
    

Comments