Skip to content

MSK Ops Client

MSK Ops Client is a utility that manages the transformation of events/messages from Amazon Managed Streaming for Apache Kafka (MSK) and sends them to the Ops Server.

Event driven flow

sequenceDiagram
    participant AWS Managed Kafka 
    participant AWS Lambda
    participant PrivaceraOpsServer
    participant PrivaceraConnector


    AWS Managed Kafka ->> AWS Lambda: Filter event by as per rules
    AWS Lambda ->> PrivaceraOpsServer: Forward event after pre-processed
    PrivaceraConnector ->> PrivaceraOpsServer: Pull event/task details 
    PrivaceraConnector ->> As per the event/task details: Sync resources/ Audit/ Permissions etc

Overview

This document provides a comprehensive guide on configuring the MSK Ops Client with the Privacera Platform using Privacera Manager. It includes detailed steps for setting up the MSK Ops Client, creating and deploying a Lambda function, configuring a Kafka client.

Prerequisites

Before configuring the MSK Ops Client, ensure the following prerequisites are met:

Steps to Enable MSK Ops Client

  1. SSH to the machine where Privacera Manager is installed.
  2. Edit the vars.ops-bridge.yaml file:
    Bash
    cd ~/privacera/privacera-manager
    vi config/custom-vars/vars.ops-bridge.yaml
    
  3. Update the following properties in the vars.ops-bridge.yaml file:
    YAML
    OPS_CONNECTOR_MSK_ENABLED: "true"
    OPS_CONNECTOR_MSK_AWS_ACCOUNT_ID: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_AWS_REGION: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_VPC_ID: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_SUBNET_IDS: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_SECURITY_GROUP_ID: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_CLUSTER_NAME: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_CLUSTER_UUID: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_TOPIC_NAME: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_CLOUDFORMATION_RESOURCE_PREFIX: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_USE_MTLS_AUTH: "<PLEASE_CHANGE>"
    OPS_CONNECTOR_MSK_MTLS_SECRET_ARN: "<PLEASE_CHANGE>"
    

Property Descriptions

Property Description Default Value
OPS_CONNECTOR_MSK_ENABLED Enable MSK false
OPS_CONNECTOR_MSK_AWS_ACCOUNT_ID AWS account ID where the resources will be deployed N/A
OPS_CONNECTOR_MSK_AWS_REGION AWS region where the resources will be deployed N/A
OPS_CONNECTOR_MSK_VPC_ID ID of the VPC in which to create the resources N/A
OPS_CONNECTOR_MSK_SUBNET_IDS Comma-separated list of subnet IDs within the specified VPC N/A
OPS_CONNECTOR_MSK_SECURITY_GROUP_ID Security group ID to associate with the resources N/A
OPS_CONNECTOR_MSK_CLUSTER_NAME Name of the MSK cluster N/A
OPS_CONNECTOR_MSK_CLUSTER_UUID UUID of the MSK cluster N/A
OPS_CONNECTOR_MSK_TOPIC_NAME Name of the MSK topic N/A
OPS_CONNECTOR_MSK_CLOUDFORMATION_RESOURCE_PREFIX Prefix for naming CloudFormation resources privacera
OPS_CONNECTOR_MSK_LOG_LEVEL Log level for MSK Connector INFO
OPS_CONNECTOR_MSK_USE_MTLS_AUTH Is mTLS authentication for MSK cluster false
OPS_CONNECTOR_MSK_MTLS_SECRET_ARN ARN of the mTLS AWS secret N/A
  1. Execute the following commands to update your Privacera Manager platform instance:

    a. Generate the helm charts.

    Bash
    cd ~/privacera/privacera-manager
    ./privacera-manager.sh setup
    
    b. Apply the helm charts.
    Bash
    ./pm_with_helm.sh upgrade
    
    c. Generate Plugin tar ball, update Route 53 DNS, etc.
    Bash
    ./privacera-manager post-install
    
    5. After the update, the CloudFormation template files will be available at:
    Bash
    ~/privacera/privacera-manager/output/ops-bridge/
    

Creating Lambda Function using CloudFormation Template

  1. Copy CloudFormation templates to ~/privacera/msk-connector:

    Bash
    mkdir -p ~/privacera/msk-connector
    cp -n ~/privacera/privacera-manager/output/ops-bridge/{ops-connector-msk.yml,ops-msk-cloudformation-env.json} ~/privacera/msk-connector
    

    1. Deploy the Lambda function using the CloudFormation CLI:

      Bash
      1
      2
      3
      4
      5
      aws cloudformation create-stack \
      --stack-name privacera-msk-lambda-creation \
      --template-body file://~/privacera/msk-connector/ops-connector-msk.yml \
      --parameters file://~/privacera/msk-connector/ops-msk-cloudformation-env.json \
      --capabilities CAPABILITY_NAMED_IAM
      

      If you want to update te stack, use:

      Bash
      1
      2
      3
      4
      5
      aws cloudformation update-stack \
      --stack-name privacera-msk-lambda-creation \
      --template-body file://~/privacera/msk-connector/ops-connector-msk.yml \
      --parameters file://~/privacera/msk-connector/ops-msk-cloudformation-env.json \
      --capabilities CAPABILITY_NAMED_IAM
      

MSK Ops Connector Validation

Field Explanations

This section describes the fields used in the request payload.

General Fields

  • appType: Specifies the application type. Here, it is set to "PS_CONNECTOR".
  • type: Defines the type of operation. In this case, it is "RESOURCE_SYNC".
  • source: Indicates the source of the event, which is "MSK".

Request Information (requestInfo)

Contains detailed information about the request.

  • requestor_id: The unique ID of the requestor initiating the sync request.
  • requestor_create_time: The epoch timestamp (in milliseconds) when the request was created.

Resources (resources)

A list of resources to be synced.

  • type: The type of the resource. In this example, it is "table".
  • values: Contains specific attributes of the resource.

    • database: The name of the database.
      • Example: "privacera"
    • table: The name of the table.
      • Example: "customer_data"
Example JSON Representation
JSON
{
  "appType": "PS_CONNECTOR",
  "type": "RESOURCE_SYNC",
  "source": "MSK",
  "requestInfo": {
    "requestor_id": "user-123",
    "requestor_create_time": 1711439300000,
    "resources": [
      {
        "type": "table",
        "values": {
          "database": "privacera",
          "table": "customer_data"
        }
      }
    ]
  }
}

Sending a Test Event to a Kafka Topic

This guide explains how to send a test event to a Kafka topic using the Kafka console producer.

Prerequisites

Before proceeding, ensure you have the following:

  • Kafka installed and configured.
  • Access to the Kafka cluster.
  • Necessary permissions to publish messages.

Step 1: Configure the Kafka Client

Create a producer configuration file (client.properties):

Properties
security.protocol=PLAINTEXT

Step 2: Create a Console Producer

  1. Export the Kafka bootstrap server:

    Bash
    export BOOTSTRAP_SERVER="<PLEASE_CHANGE>"
    

  2. Run the Kafka console producer command:

    Bash
    1
    2
    3
    4
    <path-to-your-kafka-installation>/bin/kafka-console-producer.sh \
      --broker-list ${BOOTSTRAP_SERVER} \
      --producer.config bin/client.properties \
      --topic <PLEASE_CHANGE_TOPIC_NAME>
    

Step 3: Send a Test Event

Enter the following JSON message into the console and press Enter to send it to the Kafka topic:

JSON
{"appType":"PS_CONNECTOR","type":"RESOURCE_SYNC","source":"MSK","requestInfo":{"requestor_id":"586968457","requestor_create_time":1718186681002,"resources":[{"type":"table","values":{"database":"privacera","table":"customer_data"}}]}}

Step 4 : Verification

a. Confirm the message is published to the Kafka topic.
b. The message is consumed by the Lambda function based on the configured batching behaviour.
c. The Lambda function processes the message and sends it to the Ops Server to initiate the sync in connector.
Example: If you have configured databricks_sql_analytics connector for on-demand sync, then you will see the icon on Privacera portal -> Resource Policies -> databricks_sql_analytics repository.

Notes

  1. Replace <PLEASE_CHANGE> placeholders with actual values.
  2. Ensure the Kafka broker address is correct.
  3. If using a secured Kafka cluster, update client.properties accordingly (e.g., SASL_SSL, authentication settings).

Comments