Skip to content

Setup - Shared Cluster Encryption

This guide provides instructions for setting up Privacera Encryption on Databricks Unity Catalog shared clusters using Python UDFs.

Step 1: Create Privacera Portal User

Create an integration user in Privacera Portal to generate the JWT token for authentication.

  1. Navigate to Settings > User Management > Add User
  2. Select Role as ROLE_ENCRYPTION_READ
  3. Save the user

Step 2: Generate JWT Token

Generate the JWT token for the impersonate user created in Step 1:

  1. Navigate to Settings > Token Management
  2. Select the impersonate user created in Step 1
  3. Click Generate Token
  4. Save the token securely

Note

The JWT token generated here is for the impersonate user and will be used to authenticate encryption requests to Privacera Encryption Gateway (PEG).

Step 3: Create IAM Role

Create an IAM role that Unity Catalog will use to access AWS Secrets Manager.

  1. Open AWS Console and navigate to IAM > Roles > Create role
  2. Select Another AWS account
  3. Enable Require External ID (the External ID will be generated in Step 5)
  4. Provide a role name (e.g., privacera-dbx-uc-peg-role)

Add Trust Policy

Add the following trust relationship to the role:

JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": [
          "arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL",
          "arn:aws:iam::<AWS_ACCOUNT_ID>:role/<IAM_ROLE_NAME>"
        ]
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<EXTERNAL_ID>"
        }
      }
    }
  ]
}

Note

Replace <AWS_ACCOUNT_ID>, <IAM_ROLE_NAME>, and <EXTERNAL_ID> with your values. The External ID will be generated in Step 5.

Add Permissions Policy

Attach the following inline policy to the IAM role. This policy grants the required permissions for Secrets Manager access:

JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iam:GetRole",
        "iam:UpdateAssumeRolePolicy",
        "iam:PutRolePolicy"
      ],
      "Resource": "arn:aws:iam::<AWS_ACCOUNT_ID>:role/<IAM_ROLE_NAME>"
    },
    {
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue",
        "secretsmanager:DescribeSecret"
      ],
      "Resource": "arn:aws:secretsmanager:<REGION>:<AWS_ACCOUNT_ID>:secret:<SECRET_NAME>-*"
    }
  ]
}

Required Permissions

The IAM role must have the following permissions: - secretsmanager:GetSecretValue - To retrieve secret values from AWS Secrets Manager - secretsmanager:DescribeSecret - To describe secret metadata - iam:GetRole, iam:UpdateAssumeRolePolicy, iam:PutRolePolicy - For role management

Step 4: Create AWS Secret

  1. Open AWS Console and navigate to Secrets Manager
  2. Click Store a new secret
  3. Select Other type of secret
  4. Add the following key-value pairs:

    Key Value
    peg_host URL of your Privacera Encryption Gateway (e.g., https://peg-host.example.com)
    peg_jwt JWT token generated in Step 2
  5. Provide a secret name (e.g., db/dbx-peg-secret)

    Getting SECRET_NAME

    The SECRET_NAME is the name you provide when creating the secret in AWS Secrets Manager. It appears in the format db/<your-secret-name> and is used in the service credential setup script.

  6. Select the appropriate region

  7. Save and note the Secret ARN

    Placeholders for the service credential setup script

    The following placeholders are used in the env setup Python script (Step 5). Set these in Cell 1 of the notebook:

    Placeholder Description Example
    SECRET_NAME Name of the secret in AWS Secrets Manager db/dbx-peg-secret
    SECRET_ARN Full ARN of the secret (from secret details after saving) arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>-<suffix>
    ROLE_NAME IAM role name for Unity Catalog access privacera-dbx-uc-peg-role
    AWS_REGION AWS region where the secret is stored us-east-1
    SERVICE_CREDENTIAL_NAME Name for the Databricks service credential Your chosen name
    DB_SECRET_SCOPE (Optional) Databricks secret scope name for AWS credentials None or scope name
    DB_SECRET_AWS_ACCESS_KEY (Optional) Key in secret scope for AWS access key None or key name
    DB_SECRET_AWS_SECRET_ACCESS_KEY (Optional) Key in secret scope for AWS secret key None or key name
    DB_SECRET_AWS_SESSION_TOKEN (Optional) Key in secret scope for AWS session token None or key name

Add Secret Resource Policy

Attach a resource policy to restrict access to the IAM role:

JSON
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<AWS_ACCOUNT_ID>:role/<IAM_ROLE_NAME>"
      },
      "Action": [
        "secretsmanager:GetSecretValue",
        "secretsmanager:DescribeSecret"
      ],
      "Resource": "*"
    }
  ]
}

Step 5: Configure Databricks Service Credential

Configure a service credential in Databricks to establish a secure connection with AWS.

  1. In Databricks, create a service credential with the IAM role ARN

    Service Credential Setup Script

    Execute the following Python script in a Databricks notebook. Each section can be run in a separate cell. In each cell, add or replace the placeholders with your values as indicated in the comments below.

    Cell 1: Imports and Configuration

    In this cell, add: SECRET_NAME, SECRET_ARN, ROLE_NAME, AWS_REGION, SERVICE_CREDENTIAL_NAME, and optionally DB_SECRET_SCOPE, DB_SECRET_AWS_ACCESS_KEY, DB_SECRET_AWS_SECRET_ACCESS_KEY, DB_SECRET_AWS_SESSION_TOKEN.

    Python
    import json
    import time
    import boto3
    import requests
    from botocore.exceptions import ClientError
    
    # ============================================================================
    # Configuration Variables
    # ============================================================================
    # AWS Secrets Manager configuration
    SECRET_NAME = "<db/dbx-peg-secret>"  # Secret name from AWS Secrets Manager
    SECRET_ARN = "arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>-<suffix>"
    ROLE_NAME = "<example-role-name>"  # IAM role name for Unity Catalog access
    AWS_REGION = "<aws-region>"  # AWS region where secret is stored (e.g., us-east-1)
    
    # Databricks service credential configuration
    # Note: SERVICE_CREDENTIAL_NAME should be set by the user to their preferred credential name
    SERVICE_CREDENTIAL_NAME = "<service-credential-name>"  # Name for the Databricks service credential
    
    # Unity Catalog master role (fixed value for Databricks)
    UC_MASTER_ROLE_ARN = "arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL"
    
    # AWS Credentials - Set to None to be prompted, or provide Databricks secret scope details
    DB_SECRET_SCOPE = None  # Databricks secret scope name (if using secret scope)
    DB_SECRET_AWS_ACCESS_KEY = None  # Key name in secret scope for AWS access key
    DB_SECRET_AWS_SECRET_ACCESS_KEY = None  # Key name in secret scope for AWS secret key
    DB_SECRET_AWS_SESSION_TOKEN = None  # Key name in secret scope for AWS session token
    

    Cell 2: AWS Credentials Setup

    In this cell, add: AWS credentials (via input prompt)

    Python
    # ============================================================================
    # Get AWS Credentials and Create Session
    # ============================================================================
    # Load credentials from Databricks secret scope or prompt for input
    if (
        DB_SECRET_SCOPE
        and DB_SECRET_AWS_ACCESS_KEY
        and DB_SECRET_AWS_SECRET_ACCESS_KEY
        and DB_SECRET_AWS_SESSION_TOKEN
    ):
        # Load from Databricks secret scope
        aws_access_key = dbutils.secrets.get(scope=DB_SECRET_SCOPE, key=DB_SECRET_AWS_ACCESS_KEY)
        aws_secret_key = dbutils.secrets.get(scope=DB_SECRET_SCOPE, key=DB_SECRET_AWS_SECRET_ACCESS_KEY)
        aws_session_token = dbutils.secrets.get(scope=DB_SECRET_SCOPE, key=DB_SECRET_AWS_SESSION_TOKEN)
    else:
        # Prompt for credentials
        aws_access_key = input("AWS Access Key ID: ")
        aws_secret_key = input("AWS Secret Access Key: ")
        aws_session_token = input("AWS Session Token (optional): ") or None
    
    # Create boto3 session with AWS credentials
    session = boto3.Session(
        aws_access_key_id=aws_access_key if aws_access_key else None,
        aws_secret_access_key=aws_secret_key if aws_secret_key else None,
        aws_session_token=aws_session_token,
        region_name=AWS_REGION,
    )
    
    # Get AWS Account ID and construct IAM Role ARN
    AWS_ACCOUNT_ID = session.client("sts").get_caller_identity()["Account"]
    ROLE_ARN = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{ROLE_NAME}"
    

    Cell 3: Verify Secret Exists

    Python
    # ============================================================================
    # Verify AWS Secret Exists
    # ============================================================================
    sm = session.client("secretsmanager")
    
    try:
        # Use SECRET_ARN if provided, otherwise use SECRET_NAME
        if SECRET_ARN:
            response = sm.describe_secret(SecretId=SECRET_ARN)
        else:
            response = sm.describe_secret(SecretId=SECRET_NAME)
        SECRET_ARN = response["ARN"]  # Update SECRET_ARN with actual ARN
    except ClientError:
        raise Exception(f"Secret not found: {SECRET_ARN or SECRET_NAME}")
    

    Cell 4: IAM Role Setup

    Python
    # ============================================================================
    # IAM Role Setup and Helper Functions
    # ============================================================================
    iam = session.client("iam")
    PLACEHOLDER_EXTERNAL_ID = "0000"  # Placeholder until External ID is generated
    
    def build_trust_policy(external_id, include_self_assume=True):
        """
        Build IAM trust policy document.
    
        Args:
            external_id: External ID for the trust policy condition
            include_self_assume: Whether to include self-assume capability
    
        Returns:
            Trust policy document as dict
        """
        principals = [UC_MASTER_ROLE_ARN]  # Unity Catalog master role
        if include_self_assume:
            principals.append(ROLE_ARN)  # Add self-assume capability
        return {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Principal": {"AWS": principals},
                    "Action": "sts:AssumeRole",
                    "Condition": {"StringEquals": {"sts:ExternalId": external_id}},
                }
            ],
        }
    
    def role_has_self_assume():
        """
        Check if IAM role has self-assume capability.
    
        Returns:
            True if role exists and has self-assume, False if exists without self-assume, None if doesn't exist
        """
        try:
            policy = iam.get_role(RoleName=ROLE_NAME)["Role"]["AssumeRolePolicyDocument"]
            for stmt in policy.get("Statement", []):
                principals = stmt.get("Principal", {}).get("AWS", [])
                if ROLE_ARN in (principals if isinstance(principals, list) else [principals]):
                    return True
            return False
        except ClientError as e:
            if e.response["Error"]["Code"] == "NoSuchEntity":
                return None  # Role doesn't exist
            raise
    
    # Check role status and create/update as needed
    has_self_assume = role_has_self_assume()
    
    if has_self_assume is None:
        # Create new IAM role
        iam.create_role(
            RoleName=ROLE_NAME,
            AssumeRolePolicyDocument=json.dumps(
                build_trust_policy(PLACEHOLDER_EXTERNAL_ID, False)
            ),
            Description="IAM role for Databricks UC service credential",
        )
        time.sleep(10)  # Wait for AWS IAM consistency
    
        # Add self-assume capability with retry logic
        for attempt in range(1, 6):
            try:
                iam.update_assume_role_policy(
                    RoleName=ROLE_NAME,
                    PolicyDocument=json.dumps(
                        build_trust_policy(PLACEHOLDER_EXTERNAL_ID, True)
                    ),
                )
                break
            except ClientError as e:
                if (
                    attempt < 5
                    and e.response.get("Error", {}).get("Code") == "MalformedPolicyDocument"
                ):
                    time.sleep(5 * attempt)  # Exponential backoff
                else:
                    raise
    elif not has_self_assume:
        # Update existing role to add self-assume
        iam.update_assume_role_policy(
            RoleName=ROLE_NAME,
            PolicyDocument=json.dumps(build_trust_policy(PLACEHOLDER_EXTERNAL_ID, True)),
        )
    
    # Display required IAM permissions policy (must be attached separately by IAM admin)
    SECRET_ARN_PATTERN = f"arn:aws:secretsmanager:{AWS_REGION}:{AWS_ACCOUNT_ID}:secret:{SECRET_NAME}*"
    print(f"Required IAM Permissions Policy for role '{ROLE_NAME}':")
    print(json.dumps(
        {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Action": [
                        "secretsmanager:GetSecretValue",
                        "secretsmanager:DescribeSecret",
                    ],
                    "Resource": SECRET_ARN_PATTERN,
                }
            ],
        },
        indent=2,
    ))
    

    Cell 5: Create Databricks Service Credential

    Python
    # ============================================================================
    # Create or Get Databricks Service Credential
    # ============================================================================
    # Get Databricks workspace URL and API token
    workspace_url = spark.conf.get("spark.databricks.workspaceUrl")
    api_token = (
        dbutils.notebook.entry_point.getDbutils()
        .notebook()
        .getContext()
        .apiToken()
        .get()
    )
    headers = {"Authorization": f"Bearer {api_token}", "Content-Type": "application/json"}
    
    # Check if service credential already exists
    resp = requests.get(
        f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
        headers=headers,
    )
    
    if resp.status_code != 200:
        # Create new service credential
        create_resp = requests.post(
            f"https://{workspace_url}/api/2.1/unity-catalog/credentials",
            headers=headers,
            json={
                "name": SERVICE_CREDENTIAL_NAME,
                "purpose": "SERVICE",
                "aws_iam_role": {"role_arn": ROLE_ARN},
            },
        )
        if create_resp.status_code not in [200, 201]:
            raise Exception(f"Failed to create credential: {create_resp.text}")
        # Retrieve the created credential to get External ID
        resp = requests.get(
            f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
            headers=headers,
        )
    
    # Extract External ID from service credential
    cred_data = resp.json()
    EXTERNAL_ID = cred_data.get("aws_iam_role", {}).get("external_id")
    

    Cell 6: Update IAM Trust Policy with External ID

    Python
    # ============================================================================
    # Update IAM Trust Policy with Generated External ID
    # ============================================================================
    if not EXTERNAL_ID:
        raise Exception("No External ID found in service credential")
    
    # Check if trust policy already has this External ID
    current_policy = iam.get_role(RoleName=ROLE_NAME)["Role"]["AssumeRolePolicyDocument"]
    current_ext_ids = [
        stmt.get("Condition", {}).get("StringEquals", {}).get("sts:ExternalId")
        for stmt in current_policy.get("Statement", [])
    ]
    
    if EXTERNAL_ID not in current_ext_ids:
        # Update trust policy with External ID
        iam.update_assume_role_policy(
            RoleName=ROLE_NAME,
            PolicyDocument=json.dumps(build_trust_policy(EXTERNAL_ID)),
        )
        time.sleep(10)  # Wait for IAM propagation
    
        # Recreate service credential to force revalidation with updated trust policy
        requests.delete(
            f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
            headers=headers,
        )
        create_resp = requests.post(
            f"https://{workspace_url}/api/2.1/unity-catalog/credentials",
            headers=headers,
            json={
                "name": SERVICE_CREDENTIAL_NAME,
                "purpose": "SERVICE",
                "aws_iam_role": {"role_arn": ROLE_ARN},
            },
        )
        if create_resp.status_code not in [200, 201]:
            raise Exception(f"Failed to recreate credential: {create_resp.text}")
    

    Cell 7: Final Validation

    Python
    # ============================================================================
    # Final Validation and Testing
    # ============================================================================
    print("=" * 60)
    print("VALIDATION RESULTS")
    print("=" * 60)
    
    # Test 1: Verify service credential and External ID
    resp = requests.get(
        f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
        headers=headers,
    )
    if resp.status_code == 200:
        cred_data = resp.json()
        expected_external_id = cred_data.get("aws_iam_role", {}).get("external_id")
        print(f"✅ Service Credential: {SERVICE_CREDENTIAL_NAME}")
        print(f"✅ External ID: {expected_external_id}")
    
    # Test 2: Verify IAM trust policy
    role_info = iam.get_role(RoleName=ROLE_NAME)
    trust_policy = role_info["Role"]["AssumeRolePolicyDocument"]
    print(f"\n✅ IAM Role: {ROLE_NAME}")
    print(f"✅ Trust Policy configured with External ID")
    
    # Test 3: Test STS AssumeRole
    try:
        sts = session.client("sts")
        assume_response = sts.assume_role(
            RoleArn=ROLE_ARN,
            RoleSessionName="test-assume-role",
            ExternalId=EXTERNAL_ID,
        )
        print(f"✅ STS AssumeRole: Success")
    
        # Test 4: Test secret access with assumed role
        assumed_creds = assume_response["Credentials"]
        test_session = boto3.Session(
            aws_access_key_id=assumed_creds["AccessKeyId"],
            aws_secret_access_key=assumed_creds["SecretAccessKey"],
            aws_session_token=assumed_creds["SessionToken"],
            region_name=AWS_REGION,
        )
        sm_test = test_session.client("secretsmanager")
        secret_test = sm_test.get_secret_value(SecretId=SECRET_NAME)
        print(f"✅ Secret Access: Success (length: {len(secret_test.get('SecretString', ''))} chars)")
    except Exception as e:
        print(f"❌ STS/Secret Access Test Failed: {type(e).__name__}: {e}")
    
    # Test 5: Test service credential access to Secrets Manager
    print("\nTesting service credential access...")
    for attempt in range(1, 4):
        try:
            boto3_session = boto3.Session(
                botocore_session=dbutils.credentials.getServiceCredentialsProvider(
                    SERVICE_CREDENTIAL_NAME
                ),
                region_name=AWS_REGION,
            )
            sm_test = boto3_session.client("secretsmanager")
            test_response = sm_test.get_secret_value(SecretId=SECRET_NAME)
            print(f"✅ Service Credential Access: Success (length: {len(test_response.get('SecretString', ''))} chars)")
            break
        except Exception as e:
            if attempt < 3:
                print(f"⚠️  Attempt {attempt}/3 failed. Waiting 30s for IAM propagation...")
                time.sleep(30)
            else:
                print(f"❌ Service Credential Access Failed: {type(e).__name__}: {e}")
    
    # Final summary
    print("\n" + "=" * 60)
    print("SETUP COMPLETE")
    print("=" * 60)
    print(f"AWS Secret ARN: {SECRET_ARN}")
    print(f"IAM Role ARN: {ROLE_ARN}")
    print(f"Service Credential: {SERVICE_CREDENTIAL_NAME}")
    print(f"External ID: {EXTERNAL_ID}")
    print("=" * 60)
    

    Note

    Replace all placeholder values (e.g., <example-secret-name>, <region>, <account-id>, etc.) with your actual AWS and Databricks configuration values.

  2. Note the generated External ID

  3. Update the IAM role trust policy (from Step 3) with the generated External ID

Step 6: Update IAM Trust Policy

Update the IAM role trust policy with the External ID generated in Step 5:

JSON
1
2
3
4
5
"Condition": {
  "StringEquals": {
    "sts:ExternalId": "<GENERATED_EXTERNAL_ID>"
  }
}

Step 7: Enable PEG URL Access in Databricks

Before creating the UDFs, you must allow Databricks to access the Privacera Encryption Gateway (PEG) URL.

  1. In Databricks, click on your profile icon in the top right corner
  2. Navigate to Settings > Security > External Access
  3. Under Embed dashboards, click Manage
  4. Add your PEG URL (e.g., https://peg-host.privacera.us)
  5. Click Allow approved domain to add the URL
  6. Save the changes

Note

This step is required to allow Databricks to make API calls to the Privacera Encryption Gateway. Without this configuration, UDF calls to PEG will fail.

Step 8: Create Python UDFs

Create Python UDFs in the target catalog for data protection operations.

Execute the following SQL commands in a Databricks notebook to create the protect, unprotect, and mask UDFs:

Python UDF Creation Scripts

Replace the following placeholders with your actual values: - <catalog>.<schema>: Target catalog and schema where UDFs will be created - <service-credential-name>: Service credential name created in Step 5 - <secret-arn>: AWS Secrets Manager ARN from Step 4 - <aws-region>: AWS region where the secret is stored

Create Protect UDF:

SQL
CREATE OR REPLACE FUNCTION <catalog>.<schema>.protect(
  s STRING,
  scheme STRING
)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (`<SERVICE_CREDENTIAL_NAME>` DEFAULT)
AS $$
import boto3
import json
import requests
import pandas as pd
from typing import Iterator, Tuple
from databricks.service_credentials import getServiceCredentialsProvider
from pyspark.taskcontext import TaskContext

SECRET_ARN = "<secret-arn>"

def get_credentials():
    session = boto3.Session(
        botocore_session=getServiceCredentialsProvider("<service-credential-name>")
    )
    client = session.client("secretsmanager", region_name="<aws-region>")
    resp = client.get_secret_value(SecretId=SECRET_ARN)
    secret_data = json.loads(resp["SecretString"])
    return secret_data["peg_jwt"], secret_data["peg_host"]

def handler_function(
    batch_iter: Iterator[Tuple[pd.Series, pd.Series]]
) -> Iterator[pd.Series]:

    jwt, base_url = get_credentials()
    do_as_user = TaskContext.get().getLocalProperty("user")

    headers = {
        "Authorization": f"Bearer {jwt}",
        "Content-Type": "application/json"
    }

    for value_series, scheme_series in batch_iter:
        values = value_series.tolist()

        if all(pd.isna(v) for v in values):
            yield value_series
            continue

        payload = {
            "request": [
                {
                    "data": values,
                    "scheme": scheme_series.iloc[0],
                    "action": "PROTECT"
                }
            ],
            "doAs": do_as_user,
            "action": "PROTECT"
        }

        resp = requests.post(
            f"{base_url}/api/peg/v2/multi/protect",
            json=payload,
            headers=headers,
            timeout=30
        )
        resp.raise_for_status()

        encrypted_values = resp.json()["data"][0]
        yield pd.Series(encrypted_values)
$$;

Create Unprotect UDF:

SQL
CREATE OR REPLACE FUNCTION <catalog>.<schema>.unprotect(
  s STRING,
  scheme STRING,
  presentation_scheme STRING
)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (`<service-credential-name>` DEFAULT)
AS $$
import boto3
import json
import requests
import pandas as pd
from typing import Iterator, Tuple
from databricks.service_credentials import getServiceCredentialsProvider
from pyspark.taskcontext import TaskContext

SECRET_ARN = "<secret-arn>"

def get_credentials():
    session = boto3.Session(
        botocore_session=getServiceCredentialsProvider("<service-credential-name>")
    )
    client = session.client("secretsmanager", region_name="<aws-region>")
    resp = client.get_secret_value(SecretId=SECRET_ARN)
    secret_data = json.loads(resp["SecretString"])
    return secret_data["peg_jwt"], secret_data["peg_host"]

def handler_function(
    batch_iter: Iterator[Tuple[pd.Series, pd.Series, pd.Series]]
) -> Iterator[pd.Series]:

    jwt, base_url = get_credentials()
    do_as_user = TaskContext.get().getLocalProperty("user")

    headers = {
        "Authorization": f"Bearer {jwt}",
        "Content-Type": "application/json"
    }

    for value_series, scheme_series, presentation_scheme_series in batch_iter:
        values = value_series.tolist()

        if all(pd.isna(v) for v in values):
            yield value_series
            continue

        payload = {
            "request": [
                {
                    "data": values,
                    "scheme": scheme_series.iloc[0],
                    "action": "UNPROTECT"
                }
            ],
            "doAs": do_as_user,
            "action": "UNPROTECT"
        }

        # Optional presentation scheme
        pres = presentation_scheme_series.iloc[0]
        if pd.notna(pres):
            payload["request"][0]["presentationScheme"] = pres

        resp = requests.post(
            f"{base_url}/api/peg/v2/multi/unprotect",
            json=payload,
            headers=headers,
            timeout=30
        )
        resp.raise_for_status()

        decrypted_values = resp.json()["data"][0]
        yield pd.Series(decrypted_values)
$$;

Create Mask UDF:

SQL
CREATE OR REPLACE FUNCTION <catalog>.<schema>.mask(
  s STRING,
  scheme STRING
)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (`<service-credential-name>` DEFAULT)
AS $$
import boto3
import json
import requests
import pandas as pd
from typing import Iterator, Tuple
from databricks.service_credentials import getServiceCredentialsProvider
from pyspark.taskcontext import TaskContext

SECRET_ARN = "<secret-arn>"

def get_credentials():
    session = boto3.Session(
        botocore_session=getServiceCredentialsProvider("<service-credential-name>")
    )
    client = session.client("secretsmanager", region_name="<aws-region>")
    resp = client.get_secret_value(SecretId=SECRET_ARN)
    secret_data = json.loads(resp["SecretString"])
    return secret_data["peg_jwt"], secret_data["peg_host"]

def handler_function(
    batch_iter: Iterator[Tuple[pd.Series, pd.Series]]
) -> Iterator[pd.Series]:

    jwt, base_url = get_credentials()
    do_as_user = TaskContext.get().getLocalProperty("user")

    headers = {
        "Authorization": f"Bearer {jwt}",
        "Content-Type": "application/json"
    }

    for value_series, scheme_series in batch_iter:
        values = value_series.tolist()

        if all(pd.isna(v) for v in values):
            yield value_series
            continue

        payload = {
            "request": [
                {
                    "data": values,
                    "scheme": scheme_series.iloc[0],
                    "action": "MASK"
                }
            ],
            "doAs": do_as_user,
            "action": "MASK"
        }

        resp = requests.post(
            f"{base_url}/api/peg/v2/multi/mask",
            json=payload,
            headers=headers,
            timeout=30
        )
        resp.raise_for_status()

        masked_values = resp.json()["data"][0]
        yield pd.Series(masked_values)
$$;

Note

Replace all placeholder values with your actual configuration: - <catalog>.<schema>: Your target catalog and schema - <service-credential-name>: Service credential name from Step 5 - <secret-arn>: AWS Secrets Manager ARN from Step 4 - <aws-region>: AWS region (e.g., us-east-1)

Step 9: Configure Privacera Resource Policy

Grant the Databricks Unity Catalog user access to the required resources.

  1. Navigate to Access Management > Resource Policy in Privacera Portal
  2. Select resource group: privacera_databricks_unity_catalog
  3. Grant permissions to the target user for:
    • Catalog
    • Schema
    • Table
  4. Save the policy

Step 10: Create Encryption Scheme

  1. Navigate to Encryption and Masking > Schemes in Privacera Portal
  2. Click Create Scheme
  3. Configure the scheme name, algorithm, format type, and key configuration
  4. Save the scheme

Step 11: Create PEG Scheme Policies

Impersonation Policy

  1. Navigate to Access Management > Scheme Policies > Privacera PEG
  2. Click Add New Policy
  3. Configure:
    • Policy Type: Impersonate
    • Subject: Privacera portal user (created in Step 1)
    • Resource: PEG service
  4. Save the policy

Data Protection Policy

  1. Click Add New Policy
  2. Configure:
    • Subject: Databricks Unity Catalog user
    • Actions: Protect, Unprotect, Mask
    • Scheme: Select the scheme created in Step 9
  3. Save the policy

Step 12: Using Encryption UDFs

Create a shared cluster and test the encryption UDFs.

Encrypt Data

SQL
1
2
3
4
5
6
SELECT
  first_name,
  last_name,
  <catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email
FROM <catalog>.<schema>.<table>
LIMIT 10;

Saving Encrypted Data

It is recommended to save the encrypted data to a separate table before decrypting. You can use a CTAS (Create Table As Select) query to create a table with encrypted data:

SQL
1
2
3
4
5
6
CREATE TABLE <catalog>.<schema>.<encrypted_table> AS
SELECT
  first_name,
  last_name,
  <catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email
FROM <catalog>.<schema>.<table>;

Decrypt Data

The unprotect function supports an optional presentation scheme parameter. Pass NULL if you don't want to use a presentation scheme, or provide the presentation scheme name if you want formatted decryption.

Decrypt without presentation scheme:

SQL
1
2
3
4
5
6
7
8
SELECT
  first_name,
  last_name,
  email,
  <catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email,
  <catalog>.<schema>.unprotect(encrypted_email, 'SYSTEM_EMAIL', NULL) AS decrypted_email
FROM <catalog>.<schema>.<table>
LIMIT 10;

Decrypt with presentation scheme:

SQL
1
2
3
4
5
6
7
8
SELECT
  first_name,
  last_name,
  email,
  <catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email,
  <catalog>.<schema>.unprotect(encrypted_email, 'SYSTEM_EMAIL', 'SYSTEM_PRESENTATION_EMAIL') AS decrypted_email
FROM <catalog>.<schema>.<table>
LIMIT 10;

Mask Data

SQL
1
2
3
4
5
6
SELECT
  first_name,
  last_name,
  <catalog>.<schema>.mask(email, 'MASKING_SCHEME_NAME') AS masked_email
FROM <catalog>.<schema>.<table>
LIMIT 10;