Setup - Shared Cluster Encryption
This guide provides instructions for setting up Privacera Encryption on Databricks Unity Catalog shared clusters using Python UDFs.
Step 1: Create Privacera Portal User
Create an integration user in Privacera Portal to generate the JWT token for authentication.
- Navigate to Settings > User Management > Add User
- Select Role as
ROLE_ENCRYPTION_READ - Save the user
Step 2: Generate JWT Token
Generate the JWT token for the impersonate user created in Step 1:
- Navigate to Settings > Token Management
- Select the impersonate user created in Step 1
- Click Generate Token
- Save the token securely
Note
The JWT token generated here is for the impersonate user and will be used to authenticate encryption requests to Privacera Encryption Gateway (PEG).
Step 3: Create IAM Role
Create an IAM role that Unity Catalog will use to access AWS Secrets Manager.
- Open AWS Console and navigate to IAM > Roles > Create role
- Select Another AWS account
- Enable Require External ID (the External ID will be generated in Step 5)
- Provide a role name (e.g.,
privacera-dbx-uc-peg-role)
Add Trust Policy
Add the following trust relationship to the role:
| JSON |
|---|
| {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": [
"arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL",
"arn:aws:iam::<AWS_ACCOUNT_ID>:role/<IAM_ROLE_NAME>"
]
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<EXTERNAL_ID>"
}
}
}
]
}
|
Note
Replace <AWS_ACCOUNT_ID>, <IAM_ROLE_NAME>, and <EXTERNAL_ID> with your values. The External ID will be generated in Step 5.
Add Permissions Policy
Attach the following inline policy to the IAM role. This policy grants the required permissions for Secrets Manager access:
| JSON |
|---|
| {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:GetRole",
"iam:UpdateAssumeRolePolicy",
"iam:PutRolePolicy"
],
"Resource": "arn:aws:iam::<AWS_ACCOUNT_ID>:role/<IAM_ROLE_NAME>"
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
],
"Resource": "arn:aws:secretsmanager:<REGION>:<AWS_ACCOUNT_ID>:secret:<SECRET_NAME>-*"
}
]
}
|
Required Permissions
The IAM role must have the following permissions: - secretsmanager:GetSecretValue - To retrieve secret values from AWS Secrets Manager - secretsmanager:DescribeSecret - To describe secret metadata - iam:GetRole, iam:UpdateAssumeRolePolicy, iam:PutRolePolicy - For role management
Step 4: Create AWS Secret
- Open AWS Console and navigate to Secrets Manager
- Click Store a new secret
- Select Other type of secret
-
Add the following key-value pairs:
| Key | Value |
peg_host | URL of your Privacera Encryption Gateway (e.g., https://peg-host.example.com) |
peg_jwt | JWT token generated in Step 2 |
-
Provide a secret name (e.g., db/dbx-peg-secret)
Getting SECRET_NAME
The SECRET_NAME is the name you provide when creating the secret in AWS Secrets Manager. It appears in the format db/<your-secret-name> and is used in the service credential setup script.
-
Select the appropriate region
-
Save and note the Secret ARN
Placeholders for the service credential setup script
The following placeholders are used in the env setup Python script (Step 5). Set these in Cell 1 of the notebook:
| Placeholder | Description | Example |
SECRET_NAME | Name of the secret in AWS Secrets Manager | db/dbx-peg-secret |
SECRET_ARN | Full ARN of the secret (from secret details after saving) | arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>-<suffix> |
ROLE_NAME | IAM role name for Unity Catalog access | privacera-dbx-uc-peg-role |
AWS_REGION | AWS region where the secret is stored | us-east-1 |
SERVICE_CREDENTIAL_NAME | Name for the Databricks service credential | Your chosen name |
DB_SECRET_SCOPE | (Optional) Databricks secret scope name for AWS credentials | None or scope name |
DB_SECRET_AWS_ACCESS_KEY | (Optional) Key in secret scope for AWS access key | None or key name |
DB_SECRET_AWS_SECRET_ACCESS_KEY | (Optional) Key in secret scope for AWS secret key | None or key name |
DB_SECRET_AWS_SESSION_TOKEN | (Optional) Key in secret scope for AWS session token | None or key name |
Add Secret Resource Policy
Attach a resource policy to restrict access to the IAM role:
| JSON |
|---|
| {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::<AWS_ACCOUNT_ID>:role/<IAM_ROLE_NAME>"
},
"Action": [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
],
"Resource": "*"
}
]
}
|
Configure a service credential in Databricks to establish a secure connection with AWS.
-
In Databricks, create a service credential with the IAM role ARN
Service Credential Setup Script
Execute the following Python script in a Databricks notebook. Each section can be run in a separate cell. In each cell, add or replace the placeholders with your values as indicated in the comments below.
Cell 1: Imports and Configuration
In this cell, add: SECRET_NAME, SECRET_ARN, ROLE_NAME, AWS_REGION, SERVICE_CREDENTIAL_NAME, and optionally DB_SECRET_SCOPE, DB_SECRET_AWS_ACCESS_KEY, DB_SECRET_AWS_SECRET_ACCESS_KEY, DB_SECRET_AWS_SESSION_TOKEN.
| Python |
|---|
| import json
import time
import boto3
import requests
from botocore.exceptions import ClientError
# ============================================================================
# Configuration Variables
# ============================================================================
# AWS Secrets Manager configuration
SECRET_NAME = "<db/dbx-peg-secret>" # Secret name from AWS Secrets Manager
SECRET_ARN = "arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>-<suffix>"
ROLE_NAME = "<example-role-name>" # IAM role name for Unity Catalog access
AWS_REGION = "<aws-region>" # AWS region where secret is stored (e.g., us-east-1)
# Databricks service credential configuration
# Note: SERVICE_CREDENTIAL_NAME should be set by the user to their preferred credential name
SERVICE_CREDENTIAL_NAME = "<service-credential-name>" # Name for the Databricks service credential
# Unity Catalog master role (fixed value for Databricks)
UC_MASTER_ROLE_ARN = "arn:aws:iam::414351767826:role/unity-catalog-prod-UCMasterRole-14S5ZJVKOTYTL"
# AWS Credentials - Set to None to be prompted, or provide Databricks secret scope details
DB_SECRET_SCOPE = None # Databricks secret scope name (if using secret scope)
DB_SECRET_AWS_ACCESS_KEY = None # Key name in secret scope for AWS access key
DB_SECRET_AWS_SECRET_ACCESS_KEY = None # Key name in secret scope for AWS secret key
DB_SECRET_AWS_SESSION_TOKEN = None # Key name in secret scope for AWS session token
|
Cell 2: AWS Credentials Setup
In this cell, add: AWS credentials (via input prompt)
| Python |
|---|
| # ============================================================================
# Get AWS Credentials and Create Session
# ============================================================================
# Load credentials from Databricks secret scope or prompt for input
if (
DB_SECRET_SCOPE
and DB_SECRET_AWS_ACCESS_KEY
and DB_SECRET_AWS_SECRET_ACCESS_KEY
and DB_SECRET_AWS_SESSION_TOKEN
):
# Load from Databricks secret scope
aws_access_key = dbutils.secrets.get(scope=DB_SECRET_SCOPE, key=DB_SECRET_AWS_ACCESS_KEY)
aws_secret_key = dbutils.secrets.get(scope=DB_SECRET_SCOPE, key=DB_SECRET_AWS_SECRET_ACCESS_KEY)
aws_session_token = dbutils.secrets.get(scope=DB_SECRET_SCOPE, key=DB_SECRET_AWS_SESSION_TOKEN)
else:
# Prompt for credentials
aws_access_key = input("AWS Access Key ID: ")
aws_secret_key = input("AWS Secret Access Key: ")
aws_session_token = input("AWS Session Token (optional): ") or None
# Create boto3 session with AWS credentials
session = boto3.Session(
aws_access_key_id=aws_access_key if aws_access_key else None,
aws_secret_access_key=aws_secret_key if aws_secret_key else None,
aws_session_token=aws_session_token,
region_name=AWS_REGION,
)
# Get AWS Account ID and construct IAM Role ARN
AWS_ACCOUNT_ID = session.client("sts").get_caller_identity()["Account"]
ROLE_ARN = f"arn:aws:iam::{AWS_ACCOUNT_ID}:role/{ROLE_NAME}"
|
Cell 3: Verify Secret Exists
| Python |
|---|
| # ============================================================================
# Verify AWS Secret Exists
# ============================================================================
sm = session.client("secretsmanager")
try:
# Use SECRET_ARN if provided, otherwise use SECRET_NAME
if SECRET_ARN:
response = sm.describe_secret(SecretId=SECRET_ARN)
else:
response = sm.describe_secret(SecretId=SECRET_NAME)
SECRET_ARN = response["ARN"] # Update SECRET_ARN with actual ARN
except ClientError:
raise Exception(f"Secret not found: {SECRET_ARN or SECRET_NAME}")
|
Cell 4: IAM Role Setup
| Python |
|---|
| # ============================================================================
# IAM Role Setup and Helper Functions
# ============================================================================
iam = session.client("iam")
PLACEHOLDER_EXTERNAL_ID = "0000" # Placeholder until External ID is generated
def build_trust_policy(external_id, include_self_assume=True):
"""
Build IAM trust policy document.
Args:
external_id: External ID for the trust policy condition
include_self_assume: Whether to include self-assume capability
Returns:
Trust policy document as dict
"""
principals = [UC_MASTER_ROLE_ARN] # Unity Catalog master role
if include_self_assume:
principals.append(ROLE_ARN) # Add self-assume capability
return {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": principals},
"Action": "sts:AssumeRole",
"Condition": {"StringEquals": {"sts:ExternalId": external_id}},
}
],
}
def role_has_self_assume():
"""
Check if IAM role has self-assume capability.
Returns:
True if role exists and has self-assume, False if exists without self-assume, None if doesn't exist
"""
try:
policy = iam.get_role(RoleName=ROLE_NAME)["Role"]["AssumeRolePolicyDocument"]
for stmt in policy.get("Statement", []):
principals = stmt.get("Principal", {}).get("AWS", [])
if ROLE_ARN in (principals if isinstance(principals, list) else [principals]):
return True
return False
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchEntity":
return None # Role doesn't exist
raise
# Check role status and create/update as needed
has_self_assume = role_has_self_assume()
if has_self_assume is None:
# Create new IAM role
iam.create_role(
RoleName=ROLE_NAME,
AssumeRolePolicyDocument=json.dumps(
build_trust_policy(PLACEHOLDER_EXTERNAL_ID, False)
),
Description="IAM role for Databricks UC service credential",
)
time.sleep(10) # Wait for AWS IAM consistency
# Add self-assume capability with retry logic
for attempt in range(1, 6):
try:
iam.update_assume_role_policy(
RoleName=ROLE_NAME,
PolicyDocument=json.dumps(
build_trust_policy(PLACEHOLDER_EXTERNAL_ID, True)
),
)
break
except ClientError as e:
if (
attempt < 5
and e.response.get("Error", {}).get("Code") == "MalformedPolicyDocument"
):
time.sleep(5 * attempt) # Exponential backoff
else:
raise
elif not has_self_assume:
# Update existing role to add self-assume
iam.update_assume_role_policy(
RoleName=ROLE_NAME,
PolicyDocument=json.dumps(build_trust_policy(PLACEHOLDER_EXTERNAL_ID, True)),
)
# Display required IAM permissions policy (must be attached separately by IAM admin)
SECRET_ARN_PATTERN = f"arn:aws:secretsmanager:{AWS_REGION}:{AWS_ACCOUNT_ID}:secret:{SECRET_NAME}*"
print(f"Required IAM Permissions Policy for role '{ROLE_NAME}':")
print(json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret",
],
"Resource": SECRET_ARN_PATTERN,
}
],
},
indent=2,
))
|
Cell 5: Create Databricks Service Credential
| Python |
|---|
| # ============================================================================
# Create or Get Databricks Service Credential
# ============================================================================
# Get Databricks workspace URL and API token
workspace_url = spark.conf.get("spark.databricks.workspaceUrl")
api_token = (
dbutils.notebook.entry_point.getDbutils()
.notebook()
.getContext()
.apiToken()
.get()
)
headers = {"Authorization": f"Bearer {api_token}", "Content-Type": "application/json"}
# Check if service credential already exists
resp = requests.get(
f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
headers=headers,
)
if resp.status_code != 200:
# Create new service credential
create_resp = requests.post(
f"https://{workspace_url}/api/2.1/unity-catalog/credentials",
headers=headers,
json={
"name": SERVICE_CREDENTIAL_NAME,
"purpose": "SERVICE",
"aws_iam_role": {"role_arn": ROLE_ARN},
},
)
if create_resp.status_code not in [200, 201]:
raise Exception(f"Failed to create credential: {create_resp.text}")
# Retrieve the created credential to get External ID
resp = requests.get(
f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
headers=headers,
)
# Extract External ID from service credential
cred_data = resp.json()
EXTERNAL_ID = cred_data.get("aws_iam_role", {}).get("external_id")
|
Cell 6: Update IAM Trust Policy with External ID
| Python |
|---|
| # ============================================================================
# Update IAM Trust Policy with Generated External ID
# ============================================================================
if not EXTERNAL_ID:
raise Exception("No External ID found in service credential")
# Check if trust policy already has this External ID
current_policy = iam.get_role(RoleName=ROLE_NAME)["Role"]["AssumeRolePolicyDocument"]
current_ext_ids = [
stmt.get("Condition", {}).get("StringEquals", {}).get("sts:ExternalId")
for stmt in current_policy.get("Statement", [])
]
if EXTERNAL_ID not in current_ext_ids:
# Update trust policy with External ID
iam.update_assume_role_policy(
RoleName=ROLE_NAME,
PolicyDocument=json.dumps(build_trust_policy(EXTERNAL_ID)),
)
time.sleep(10) # Wait for IAM propagation
# Recreate service credential to force revalidation with updated trust policy
requests.delete(
f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
headers=headers,
)
create_resp = requests.post(
f"https://{workspace_url}/api/2.1/unity-catalog/credentials",
headers=headers,
json={
"name": SERVICE_CREDENTIAL_NAME,
"purpose": "SERVICE",
"aws_iam_role": {"role_arn": ROLE_ARN},
},
)
if create_resp.status_code not in [200, 201]:
raise Exception(f"Failed to recreate credential: {create_resp.text}")
|
Cell 7: Final Validation
| Python |
|---|
| # ============================================================================
# Final Validation and Testing
# ============================================================================
print("=" * 60)
print("VALIDATION RESULTS")
print("=" * 60)
# Test 1: Verify service credential and External ID
resp = requests.get(
f"https://{workspace_url}/api/2.1/unity-catalog/credentials/{SERVICE_CREDENTIAL_NAME}",
headers=headers,
)
if resp.status_code == 200:
cred_data = resp.json()
expected_external_id = cred_data.get("aws_iam_role", {}).get("external_id")
print(f"✅ Service Credential: {SERVICE_CREDENTIAL_NAME}")
print(f"✅ External ID: {expected_external_id}")
# Test 2: Verify IAM trust policy
role_info = iam.get_role(RoleName=ROLE_NAME)
trust_policy = role_info["Role"]["AssumeRolePolicyDocument"]
print(f"\n✅ IAM Role: {ROLE_NAME}")
print(f"✅ Trust Policy configured with External ID")
# Test 3: Test STS AssumeRole
try:
sts = session.client("sts")
assume_response = sts.assume_role(
RoleArn=ROLE_ARN,
RoleSessionName="test-assume-role",
ExternalId=EXTERNAL_ID,
)
print(f"✅ STS AssumeRole: Success")
# Test 4: Test secret access with assumed role
assumed_creds = assume_response["Credentials"]
test_session = boto3.Session(
aws_access_key_id=assumed_creds["AccessKeyId"],
aws_secret_access_key=assumed_creds["SecretAccessKey"],
aws_session_token=assumed_creds["SessionToken"],
region_name=AWS_REGION,
)
sm_test = test_session.client("secretsmanager")
secret_test = sm_test.get_secret_value(SecretId=SECRET_NAME)
print(f"✅ Secret Access: Success (length: {len(secret_test.get('SecretString', ''))} chars)")
except Exception as e:
print(f"❌ STS/Secret Access Test Failed: {type(e).__name__}: {e}")
# Test 5: Test service credential access to Secrets Manager
print("\nTesting service credential access...")
for attempt in range(1, 4):
try:
boto3_session = boto3.Session(
botocore_session=dbutils.credentials.getServiceCredentialsProvider(
SERVICE_CREDENTIAL_NAME
),
region_name=AWS_REGION,
)
sm_test = boto3_session.client("secretsmanager")
test_response = sm_test.get_secret_value(SecretId=SECRET_NAME)
print(f"✅ Service Credential Access: Success (length: {len(test_response.get('SecretString', ''))} chars)")
break
except Exception as e:
if attempt < 3:
print(f"⚠️ Attempt {attempt}/3 failed. Waiting 30s for IAM propagation...")
time.sleep(30)
else:
print(f"❌ Service Credential Access Failed: {type(e).__name__}: {e}")
# Final summary
print("\n" + "=" * 60)
print("SETUP COMPLETE")
print("=" * 60)
print(f"AWS Secret ARN: {SECRET_ARN}")
print(f"IAM Role ARN: {ROLE_ARN}")
print(f"Service Credential: {SERVICE_CREDENTIAL_NAME}")
print(f"External ID: {EXTERNAL_ID}")
print("=" * 60)
|
Note
Replace all placeholder values (e.g., <example-secret-name>, <region>, <account-id>, etc.) with your actual AWS and Databricks configuration values.
-
Note the generated External ID
- Update the IAM role trust policy (from Step 3) with the generated External ID
Step 6: Update IAM Trust Policy
Update the IAM role trust policy with the External ID generated in Step 5:
| JSON |
|---|
| "Condition": {
"StringEquals": {
"sts:ExternalId": "<GENERATED_EXTERNAL_ID>"
}
}
|
Step 7: Enable PEG URL Access in Databricks
Before creating the UDFs, you must allow Databricks to access the Privacera Encryption Gateway (PEG) URL.
- In Databricks, click on your profile icon in the top right corner
- Navigate to Settings > Security > External Access
- Under Embed dashboards, click Manage
- Add your PEG URL (e.g.,
https://peg-host.privacera.us) - Click Allow approved domain to add the URL
- Save the changes
Note
This step is required to allow Databricks to make API calls to the Privacera Encryption Gateway. Without this configuration, UDF calls to PEG will fail.
Step 8: Create Python UDFs
Create Python UDFs in the target catalog for data protection operations.
Execute the following SQL commands in a Databricks notebook to create the protect, unprotect, and mask UDFs:
Python UDF Creation Scripts
Replace the following placeholders with your actual values: - <catalog>.<schema>: Target catalog and schema where UDFs will be created - <service-credential-name>: Service credential name created in Step 5 - <secret-arn>: AWS Secrets Manager ARN from Step 4 - <aws-region>: AWS region where the secret is stored
Create Protect UDF:
| SQL |
|---|
| CREATE OR REPLACE FUNCTION <catalog>.<schema>.protect(
s STRING,
scheme STRING
)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (`<SERVICE_CREDENTIAL_NAME>` DEFAULT)
AS $$
import boto3
import json
import requests
import pandas as pd
from typing import Iterator, Tuple
from databricks.service_credentials import getServiceCredentialsProvider
from pyspark.taskcontext import TaskContext
SECRET_ARN = "<secret-arn>"
def get_credentials():
session = boto3.Session(
botocore_session=getServiceCredentialsProvider("<service-credential-name>")
)
client = session.client("secretsmanager", region_name="<aws-region>")
resp = client.get_secret_value(SecretId=SECRET_ARN)
secret_data = json.loads(resp["SecretString"])
return secret_data["peg_jwt"], secret_data["peg_host"]
def handler_function(
batch_iter: Iterator[Tuple[pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
jwt, base_url = get_credentials()
do_as_user = TaskContext.get().getLocalProperty("user")
headers = {
"Authorization": f"Bearer {jwt}",
"Content-Type": "application/json"
}
for value_series, scheme_series in batch_iter:
values = value_series.tolist()
if all(pd.isna(v) for v in values):
yield value_series
continue
payload = {
"request": [
{
"data": values,
"scheme": scheme_series.iloc[0],
"action": "PROTECT"
}
],
"doAs": do_as_user,
"action": "PROTECT"
}
resp = requests.post(
f"{base_url}/api/peg/v2/multi/protect",
json=payload,
headers=headers,
timeout=30
)
resp.raise_for_status()
encrypted_values = resp.json()["data"][0]
yield pd.Series(encrypted_values)
$$;
|
Create Unprotect UDF:
| SQL |
|---|
| CREATE OR REPLACE FUNCTION <catalog>.<schema>.unprotect(
s STRING,
scheme STRING,
presentation_scheme STRING
)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (`<service-credential-name>` DEFAULT)
AS $$
import boto3
import json
import requests
import pandas as pd
from typing import Iterator, Tuple
from databricks.service_credentials import getServiceCredentialsProvider
from pyspark.taskcontext import TaskContext
SECRET_ARN = "<secret-arn>"
def get_credentials():
session = boto3.Session(
botocore_session=getServiceCredentialsProvider("<service-credential-name>")
)
client = session.client("secretsmanager", region_name="<aws-region>")
resp = client.get_secret_value(SecretId=SECRET_ARN)
secret_data = json.loads(resp["SecretString"])
return secret_data["peg_jwt"], secret_data["peg_host"]
def handler_function(
batch_iter: Iterator[Tuple[pd.Series, pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
jwt, base_url = get_credentials()
do_as_user = TaskContext.get().getLocalProperty("user")
headers = {
"Authorization": f"Bearer {jwt}",
"Content-Type": "application/json"
}
for value_series, scheme_series, presentation_scheme_series in batch_iter:
values = value_series.tolist()
if all(pd.isna(v) for v in values):
yield value_series
continue
payload = {
"request": [
{
"data": values,
"scheme": scheme_series.iloc[0],
"action": "UNPROTECT"
}
],
"doAs": do_as_user,
"action": "UNPROTECT"
}
# Optional presentation scheme
pres = presentation_scheme_series.iloc[0]
if pd.notna(pres):
payload["request"][0]["presentationScheme"] = pres
resp = requests.post(
f"{base_url}/api/peg/v2/multi/unprotect",
json=payload,
headers=headers,
timeout=30
)
resp.raise_for_status()
decrypted_values = resp.json()["data"][0]
yield pd.Series(decrypted_values)
$$;
|
Create Mask UDF:
| SQL |
|---|
| CREATE OR REPLACE FUNCTION <catalog>.<schema>.mask(
s STRING,
scheme STRING
)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (`<service-credential-name>` DEFAULT)
AS $$
import boto3
import json
import requests
import pandas as pd
from typing import Iterator, Tuple
from databricks.service_credentials import getServiceCredentialsProvider
from pyspark.taskcontext import TaskContext
SECRET_ARN = "<secret-arn>"
def get_credentials():
session = boto3.Session(
botocore_session=getServiceCredentialsProvider("<service-credential-name>")
)
client = session.client("secretsmanager", region_name="<aws-region>")
resp = client.get_secret_value(SecretId=SECRET_ARN)
secret_data = json.loads(resp["SecretString"])
return secret_data["peg_jwt"], secret_data["peg_host"]
def handler_function(
batch_iter: Iterator[Tuple[pd.Series, pd.Series]]
) -> Iterator[pd.Series]:
jwt, base_url = get_credentials()
do_as_user = TaskContext.get().getLocalProperty("user")
headers = {
"Authorization": f"Bearer {jwt}",
"Content-Type": "application/json"
}
for value_series, scheme_series in batch_iter:
values = value_series.tolist()
if all(pd.isna(v) for v in values):
yield value_series
continue
payload = {
"request": [
{
"data": values,
"scheme": scheme_series.iloc[0],
"action": "MASK"
}
],
"doAs": do_as_user,
"action": "MASK"
}
resp = requests.post(
f"{base_url}/api/peg/v2/multi/mask",
json=payload,
headers=headers,
timeout=30
)
resp.raise_for_status()
masked_values = resp.json()["data"][0]
yield pd.Series(masked_values)
$$;
|
Note
Replace all placeholder values with your actual configuration: - <catalog>.<schema>: Your target catalog and schema - <service-credential-name>: Service credential name from Step 5 - <secret-arn>: AWS Secrets Manager ARN from Step 4 - <aws-region>: AWS region (e.g., us-east-1)
Grant the Databricks Unity Catalog user access to the required resources.
- Navigate to Access Management > Resource Policy in Privacera Portal
- Select resource group:
privacera_databricks_unity_catalog - Grant permissions to the target user for:
- Save the policy
Step 10: Create Encryption Scheme
- Navigate to Encryption and Masking > Schemes in Privacera Portal
- Click Create Scheme
- Configure the scheme name, algorithm, format type, and key configuration
- Save the scheme
Step 11: Create PEG Scheme Policies
Impersonation Policy
- Navigate to Access Management > Scheme Policies > Privacera PEG
- Click Add New Policy
- Configure:
- Policy Type: Impersonate
- Subject: Privacera portal user (created in Step 1)
- Resource: PEG service
- Save the policy
Data Protection Policy
- Click Add New Policy
- Configure:
- Subject: Databricks Unity Catalog user
- Actions: Protect, Unprotect, Mask
- Scheme: Select the scheme created in Step 9
- Save the policy
Step 12: Using Encryption UDFs
Create a shared cluster and test the encryption UDFs.
Encrypt Data
| SQL |
|---|
| SELECT
first_name,
last_name,
<catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email
FROM <catalog>.<schema>.<table>
LIMIT 10;
|
Saving Encrypted Data
It is recommended to save the encrypted data to a separate table before decrypting. You can use a CTAS (Create Table As Select) query to create a table with encrypted data:
| SQL |
|---|
| CREATE TABLE <catalog>.<schema>.<encrypted_table> AS
SELECT
first_name,
last_name,
<catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email
FROM <catalog>.<schema>.<table>;
|
Decrypt Data
The unprotect function supports an optional presentation scheme parameter. Pass NULL if you don't want to use a presentation scheme, or provide the presentation scheme name if you want formatted decryption.
Decrypt without presentation scheme:
| SQL |
|---|
| SELECT
first_name,
last_name,
email,
<catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email,
<catalog>.<schema>.unprotect(encrypted_email, 'SYSTEM_EMAIL', NULL) AS decrypted_email
FROM <catalog>.<schema>.<table>
LIMIT 10;
|
Decrypt with presentation scheme:
| SQL |
|---|
| SELECT
first_name,
last_name,
email,
<catalog>.<schema>.protect(email, 'SYSTEM_EMAIL') AS encrypted_email,
<catalog>.<schema>.unprotect(encrypted_email, 'SYSTEM_EMAIL', 'SYSTEM_PRESENTATION_EMAIL') AS decrypted_email
FROM <catalog>.<schema>.<table>
LIMIT 10;
|
Mask Data
| SQL |
|---|
| SELECT
first_name,
last_name,
<catalog>.<schema>.mask(email, 'MASKING_SCHEME_NAME') AS masked_email
FROM <catalog>.<schema>.<table>
LIMIT 10;
|