Skip to content

Using Iceberg in Apache Spark OLAC

If you have enabled Iceberg support (see Enable Iceberg), required iceberg configurations are automatically set up in spark-defaults.conf. You can start Spark normally without additional configuration.

  1. Navigate to ${SPARK_HOME}/bin folder and export the JWT token:

    Bash
    cd <SPARK_HOME>/bin
    export JWT_TOKEN="<JWT_TOKEN>"
    

  2. Start spark-session (choose one of spark-shell, pyspark, or spark-sql):

    • To pass the JWT token directly as a command-line argument, use the following configuration when connecting to the cluster:

      Bash
      ./<spark-shell | pyspark | spark-sql> \
      --conf "spark.hadoop.privacera.jwt.token.str=${JWT_TOKEN}"
      

    • To use the file path containing the JWT token, use the following configuration:

      Bash
      ./<spark-shell | pyspark | spark-sql> \
      --conf "spark.hadoop.privacera.jwt.token=<path-to-jwt-token-file>" 
      

    • If you want to override the warehouse path, add the following configuration:

      Bash
      1
      2
      3
      ./<spark-shell | pyspark | spark-sql> \
      --conf "spark.hadoop.privacera.jwt.token.str=${JWT_TOKEN}" \
      --conf "spark.sql.catalog.iceberg_catalog.warehouse=s3a://${S3_BUCKET}/${ICEBERG_WAREHOUSE}"
      

  3. Refer below reference commands to access Iceberg tables:

    Python
    ## Create a DataFrame.
    data = spark.createDataFrame([
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
        ], ["id", "creation_date", "last_update_time"])
    
    ## Write a DataFrame as an Iceberg dataset.
    spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_catalog.db.table_name (
        id string,
        creation_date string,
        last_update_time string)
        USING iceberg
        LOCATION 's3a://${S3_BUCKET}/${TABLE_PATH}'""")
    
    data.writeTo("iceberg_catalog.db.table_name").append()
    
    ## Read from Iceberg table
    df = spark.read.format("iceberg").load("iceberg_catalog.db.table_name")
    df.show()
    
    Scala
    ## Create a DataFrame.
    val data = Seq(
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
        ).toDF("id", "creation_date", "last_update_time")
    
    ## Write a DataFrame as an Iceberg dataset.
    spark.sql("""CREATE TABLE IF NOT EXISTS iceberg_catalog.db.table_name (
        id string,
        creation_date string,
        last_update_time string)
        USING iceberg
        LOCATION 's3a://${S3_BUCKET}/${TABLE_PATH}'""")
    
    data.writeTo("iceberg_catalog.db.table_name").append()
    
    ## Read from Iceberg table
    val df = spark.read.format("iceberg").load("iceberg_catalog.db.table_name")
    df.show()
    
    SQL
    1
    2
    3
    4
    5
    6
    7
    8
    9
    CREATE TABLE IF NOT EXISTS iceberg_catalog.db.table_name (
        id INT,
        name STRING,
        age INT,
        city STRING)
        USING ICEBERG
        LOCATION 's3a://${S3_BUCKET}/${TABLE_PATH}';
    
    SELECT * FROM iceberg_catalog.db.table_name;