Skip to content

Setup for Access Management for Apache Flink on Kubernetes

This section outlines the steps to set Apache Flink on Kubernetes Cluster with the Privacera Plugin. Please ensure that all prerequisites are completed before beginning the setup process.

Generate the configuration files

Perform following steps to configure Apache Flink connector:

  1. SSH to the instance where Privacera Manager is installed.
  2. Run the following command to navigate to the config/ directory.
    Bash
    cd ~/privacera/privacera-manager/config
    
  3. Run the following command to copy the Apache Flink OLAC yml file from sample-vars, if it's not already present in custom-vars.
    Bash
    cp -n sample-vars/vars.flink.yml custom-vars/vars.flink.yml
    
  4. Update the following properties in the vars.flink.yml file, refer below table for the values to be provided.

    Property Description Sample Value
    FLINK_HOME_DIR Path where Apache Flink is installed in the Docker image of Apache Flink /opt/flink
    FLINK_S3_FS_PLUGIN_DIR Path to the folder which has Apache Flink plugin for S3 /opt/flink/plugins/s3-fs-hadoop
  5. Once the configuration has been done, execute the below command to complete the deployment.

    Bash
    cd ~/privacera/privacera-manager
    ./privacera-manager.sh post-install
    

    • Once the deployment is successful, the setup script and configuration folder i.e flink_custom_conf.zip and privacera_flink_setup.sh are available in the ~/privacera/privacera-manager/output/flink location of the Privacera Manager host. This needs to be copied to the server where the Docker image of Apache Flink is built.
  1. Create the Privacera setup folder on the machine where your Dockerfile for creating Apache Flink Docker exists

    Bash
    1
    2
    3
    4
    # Update the DOCKERFILE_FOLDER with the path where you have your Dockerfile for Apache Flink
    DOCKERFILE_FOLDER=<PLEASE CHANGE>
    mkdir -p ${DOCKERFILE_FOLDER}/privacera-flink-plugin
    cd ${DOCKERFILE_FOLDER}/privacera-flink-plugin
    

  2. The installation files can be found on the Privacera Manager host machine. Copy the installation files from Privacera Manager output folder to the machine where you will be building the Docker image for Apache Flink

    Bash
    1
    2
    3
    4
    cd ~/privacera/privacera-manager/output/flink
    # Copy the files from this folder to the Dockerfile folder on the machine
    # where you will be building the Docker image for Apache Flink.
    # That would be ${DOCKERFILE_FOLDER}/privacera-flink-plugin
    

  3. Update your Dockerfile to add the Privacera’s plugin for Apache Flink

    Note

    Please replace the APACHE_FLINK_VERSION with the version of Apache Flink you are using. Refer Supported Apache Flink Versions for the supported versions.

    Dockerfile
    FROM flink:<APACHE_FLINK_VERSION>
    
    # Switch to root user to install dependencies and copy files
    USER root
    
    # Privacera's install script requires unzip. 
    RUN apt-get update
    RUN apt-get -y install unzip
    
    # Create necessary directories
    RUN mkdir -p /opt/privacera
    
    # Copy the local files into the image
    COPY privacera-flink-plugin/privacera_flink_setup.sh /opt/privacera/
    COPY privacera-flink-plugin/flink_custom_conf.zip /opt/privacera/
    
    # Run privacera flink setup
    RUN cd /opt/privacera \
      && chmod +x privacera_flink_setup.sh \
      && ./privacera_flink_setup.sh
    
    # Set file permissions for flink user
    RUN chown flink:flink /opt/flink/conf/global-truststore.p12
    RUN chown flink:flink /opt/flink/conf/privacera_flink.properties
    RUN chown -R flink:flink /opt/flink/plugins/s3-fs-hadoop
    
    # Switch back to flink user
    USER flink
    
  4. Build the Docker image. You can update the Docker image name and tag as per your naming convention

    Bash
    docker build -t privacera-flink:latest .
    

  5. Push the docker image to your Docker registry. In AWS it could be to your AWS ECR.

  6. You will need to create a Kubernetes Config Map with a JWT token that will be used by the Privacera plugin to authenticate the service and human user. You can update your existing Config Map to add Privacera’s plugin configuration. Below is the code which needs to be embedded in your Config Map. Please make sure to update the placeholders.

    Tip

    You need to generate JWT which is valid till the container is running. Since JWT are short lived, you will have to generate them every time before you submit the job.

    Here is the template for config map which needs to be created in the Kubernetes cluster.

    Note

    Please replace the value for JWT_TOKEN_HERE with the actual JWT token

    flink-configuration-configmap.yaml
    apiVersion: v1
        kind: ConfigMap
        metadata:
            name: flink-config
            labels:
                app: flink
        data:
            flink-conf.yaml: |+
                fs.s3a.custom.signers: FlinkSigner:com.privacera.flink.signer.aws.sdkv1.FlinkSigner
                fs.s3a.s3.signing-algorithm: FlinkSigner
                fs.s3a.access.key: P_ACCESS_KEY
                fs.s3a.secret.key: P_SECRET_KEY
                jobmanager.rpc.address: flink-jobmanager
                blob.server.port: 6124
                jobmanager.rpc.port: 6123
                jobmanager.memory.process.size: 2048m
                parallelism.default: 2
                taskmanager.rpc.port: 6122
                taskmanager.memory.process.size: 2048m
                taskmanager.numberOfTaskSlots: 2
        ptoken.dat: |+
        <JWT_TOKEN_HERE>
    
  7. Here is a sample jobmanager-service.yaml. There is no Privacera specific configuration needed in this file.

    Note

    Below file is only for reference. Nothing specific for Privacera needs to be added here.

    YAML
    apiVersion: v1
       kind: Service
       metadata:
           name: flink-jobmanager
       spec:
           type: ClusterIP
           ports:
               - name: rpc
                 port: 6123
               - name: blob-server
                 port: 6124
               - name: webui
                 port: 8081
           selector:
               app: flink
               component: jobmanager
    
  8. You will need to update your jobmanager-session-deployment-non-ha.yaml file

    Note

    In the below file please make sure to replace the placeholder DOCKER_IMAGE_URL with the docker url of the custom Apache Flink image with plugin installed

    jobmanager-session-deployment-non-ha.yaml
    apiVersion: apps/v1
    kind: Deployment
    metadata:
       name: flink-jobmanager
    spec:
       replicas: 1
       selector:
           matchLabels:
               app: flink
               component: jobmanager
           template:
               metadata:
                   labels:
                       app: flink
                       component: jobmanager
               spec:
                   containers:
                       - name: jobmanager
                         image: <DOCKER_IMAGE_URL>
                         args: ["jobmanager"]
                         ports:
                           - containerPort: 6123
                             name: rpc
                           - containerPort: 6124
                             name: blob-server
                           - containerPort: 8081
                             name: webui
                         livenessProbe:
                           tcpSocket:
                               port: 6123
                           initialDelaySeconds: 30
                           periodSeconds: 60
                         volumeMounts:
                           - name: flink-config-volume
                             mountPath: /opt/flink/conf/flink-conf.yaml
                             subPath: flink-conf.yaml
                           - name: ptoken-volume
                             mountPath: /tmp/ptoken.dat
                             subPath: ptoken.dat
                         securityContext:
                           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
               volumes:
               - name: flink-config-volume
                 configMap:
                   name: flink-config
                   items:
                   - key: flink-conf.yaml
                     path: flink-conf.yaml
                   - name: ptoken-volume
                     configMap:
                       name: flink-config
                       items:
                       - key: ptoken.dat
                         path: ptoken.dat
    
  9. You will need to update your taskmanager-session-deployment.yaml file with the Privacera specific configuration

    Note

    In the below file please make sure to replace the placeholder DOCKER_IMAGE_URL with the docker url of the custom Apache Flink image with plugin installed

    taskmanager-session-deployment.yaml
    apiVersion: apps/v1
     kind: Deployment
     metadata:
        name: flink-taskmanager
     spec:
        replicas: 2
        selector:
            matchLabels:
                app: flink
                component: taskmanager
            template:
                metadata:
                    labels:
                        app: flink
                        component: taskmanager
                spec:
                    containers:
                        - name: taskmanager
                          image: <DOCKER_IMAGE_URL>
                          args: ["taskmanager"]
                          ports:
                            - containerPort: 6122
                              name: rpc
                          livenessProbe:
                            tcpSocket:
                                port: 6122
                            initialDelaySeconds: 30
                            periodSeconds: 60
                          volumeMounts:
                            - name: flink-config-volume
                              mountPath: /opt/flink/conf/flink-conf.yaml
                              subPath: flink-conf.yaml
                            - name: ptoken-volume
                              mountPath: /tmp/ptoken.dat
                              subPath: ptoken.dat
                          securityContext:
                            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
                volumes:
                - name: flink-config-volume
                  configMap:
                    name: flink-config
                    items:
                    - key: flink-conf.yaml
                      path: flink-conf.yaml
                    - name: ptoken-volume
                      configMap:
                        name: flink-config
                        items:
                        - key: ptoken.dat
                          path: ptoken.dat
    
  10. Run below command by replacing the PLEASE_CHANGE placeholder with the Kubernetes namespace where you want to deploy Apache Flink

    Bash
    1
    2
    3
    4
    5
    6
    export FLINK_NAMESPACE=<PLEASE_CHANGE>
    kubectl create namespace ${FLINK_NAMESPACE}
    kubectl create -f flink-configuration-configmap.yaml -n ${FLINK_NAMESPACE}
    kubectl create -f jobmanager-service.yaml -n ${FLINK_NAMESPACE}
    kubectl create -f jobmanager-session-deployment-non-ha.yaml -n ${FLINK_NAMESPACE}
    kubectl create -f taskmanager-session-deployment.yaml -n ${FLINK_NAMESPACE}
    
  11. Verify the deployment by executing the below command to get all the resources and make sure all the pods are up and running

    Bash
    kubectl get all -n ${FLINK_NAMESPACE}
    

Validation

To validate the integration of Privacera Access Management with Apache Flink on Kubernetes, follow these steps :

Prerequisites

Ensure the following prerequisites are met before proceeding:

  • Apache Flink is deployed on a Kubernetes cluster and secured using the steps mentioned in the previous sections.
  • Create two resource-based policies in the privacera_s3 repository in Privacera, one for the source S3 bucket and another for the destination bucket. The user should have read access to the source S3 object and write access to the folder where the output S3 object will be written. Ensure these policies align with the paths and operations used in your validation jobs. For example, you might use:
    • Source S3 Path: s3://my-source-bucket/data/input/test_object.txt – This path is used to read data for processing. Note: This file should be present in the specified location.
    • Destination S3 Path: s3://my-destination-bucket/data/output/ – This path is used to write the processed output data.

Steps to Validate

  1. Connect to the Job Manager Pod

    Use kubectl exec to access the job-manager pod in your Apache Flink Kubernetes cluster. Once connected, navigate to the Flink installation directory and prepare to download the test jar for validation.

    Bash
    # Connect to the job-manager pod
    kubectl exec -it <job-manager-pod-name> -- /bin/bash
    

    Make sure to replace <job-manager-pod-name> with the actual name of your job-manager pod.

  2. Download the Test Jar

    Replace <PRIVACERA_BASE_DOWNLOAD_URL> with your Privacera Manager base download URL, then run the following commands to download and extract the test jar. The test jar contains the Flink job that reads data from an S3 source.

    Bash
    1
    2
    3
    4
    5
    6
    7
    8
    9
    # Set the Privacera base download URL
    export PRIVACERA_BASE_DOWNLOAD_URL=<PRIVACERA_BASE_DOWNLOAD_URL>
    
    # Navigate to the Flink installation directory
    cd /opt/flink
    
    # Download the test jar and extract it
    wget ${PRIVACERA_BASE_DOWNLOAD_URL}/privacera-flink-test.tar.gz -O privacera-flink-test.tar.gz
    tar -xvf privacera-flink-test.tar.gz
    
  3. Verify the Downloaded Files

    Check that the jar file privacera-flink-test-1.0.0-SNAPSHOT-S3ReadWriteJob.jar and other related files are correctly extracted in the privacera-flink-test directory.

In this use case, we will submit a Flink job that reads data from an S3 source and writes the processed data to an S3 destination.

  1. Submit a Flink Job to Read Data from S3 and Write to S3

    While still in the job-manager pod, submit the following job to read data from an S3 source and write to an S3 destination. Update the placeholders with the correct values:

    • SOURCE_PATH_WITH_PROTOCOL: The full S3 path (including the protocol) of the source data to read from. For example, s3://my-source-bucket/data/input/test_object.txt – This path is used to read data for processing.
    • DESTINATION_PATH_WITH_PROTOCOL: The full S3 path (including the protocol) where the data will be written. For example, s3://my-destination-bucket/data/output/ – This path is used to write the processed output data.
    Bash
    1
    2
    3
    4
    5
    ./bin/flink run \
     -d privacera-flink-test/privacera-flink-test-1.0.0-SNAPSHOT-S3ReadWriteJob.jar \
     --sourcePath "<SOURCE_PATH_WITH_PROTOCOL>" \
     --destinationPath "<DESTINATION_PATH_WITH_PROTOCOL>" \
     -c com.privacera.flink.S3ReadWriteJob
    
  2. Verify the Job Execution

To ensure that the job worked correctly, verify the following:

  • Check that the expected output objects are present in the destination S3 path (s3://my-destination-bucket/data/output/). The output should reflect the processed data.
  • Review the audit logs in Privacera to confirm that the access events for both reading from the source S3 and writing to the destination S3 were recorded. This will validate that the data access was correctly managed and audited by Privacera.

In this use case, we will submit a Flink job that reads data from a Kafka source and writes the processed data to an S3 destination.

  1. Submit a Flink Job to Read Data from Kafka and Write to S3

    Use the following command to submit a job that reads data from a Kafka source and writes to an S3 destination. Replace the placeholders as needed: - KAFKA_IP: The IP address of the Kafka host. - KAFKA_PORT: The port number on which Kafka is running. - DESTINATION_PATH_WITH_PROTOCOL: The full S3 path (including the protocol) where the data will be written.

    Bash
    1
    2
    3
    4
    5
    ./bin/flink run \
       -d privacera-flink-test/privacera-flink-test-1.0.0-SNAPSHOT-KafkaToS3Job.jar \
       --bootstrapServers "<KAFKA_IP>:<KAFKA_PORT>" \
       --destinationPath "<DESTINATION_PATH_WITH_PROTOCOL>" \
       -c com.privacera.flink.KafkaToS3Job
    
  2. Verify the Job Execution

To ensure that the job worked correctly, verify the following:

  • Check that the expected output objects are present in the destination S3 path (s3://my-destination-bucket/data/output/). The output should reflect the processed data.
  • Review the audit logs in Privacera to confirm that the access events for both reading from the source S3 and writing to the destination S3 were recorded. This will validate that the data access was correctly managed and audited by Privacera.

Comments