Kafka
Port's Kafka integration allows you to model Kafka resources in your software catalog and ingest data into them.
Overviewโ
- Map and organize your desired Kafka resources and their metadata in Port (see supported resources below).
- Watch for Kafka object changes (create/update/delete) in real-time, and automatically apply the changes to your entities in Port.
Setupโ
Choose one of the following installation methods:
- Real-time (self-hosted)
- Scheduled (CI)
Using this installation option means that the integration will be able to update Port in real time using webhooks.
Prerequisites
To install the integration, you need a Kubernetes cluster that the integration's container chart will be deployed to.
Please make sure that you have kubectl
and helm
installed on your machine, and that your kubectl
CLI is connected to the Kubernetes cluster where you plan to install the integration.
If you are having trouble installing this integration, please refer to these troubleshooting steps.
For details about the available parameters for the installation, see the table below.
- Helm
- ArgoCD
To install the integration using Helm:
-
Go to the Kafka data source page in your portal.
-
Select the
Real-time and always on
method: -
A
helm
command will be displayed, with default values already filled out (e.g. your Port cliend ID, client secret, etc).
Copy the command, replace the placeholders with your values, then run it in your terminal to install the integration.
The baseUrl
, port_region
, port.baseUrl
, portBaseUrl
, port_base_url
and OCEAN__PORT__BASE_URL
parameters are used to select which instance or Port API will be used.
Port exposes two API instances, one for the EU region of Port, and one for the US region of Port.
- If you use the EU region of Port (https://app.getport.io), your API URL is
https://api.getport.io
. - If you use the US region of Port (https://app.us.getport.io), your API URL is
https://api.us.getport.io
.
To install the integration using ArgoCD:
- Create a
values.yaml
file inargocd/my-ocean-kafka-integration
in your git repository with the content:
Remember to replace the placeholders for KAFKA_CLUSTER_CONFIG_MAPPING
.
initializePortResources: true
scheduledResyncInterval: 120
integration:
identifier: my-ocean-kafka-integration
type: kafka
eventListener:
type: POLLING
secrets:
clusterConfMapping: KAFKA_CLUSTER_CONFIG_MAPPING
- Install the
my-ocean-kafka-integration
ArgoCD Application by creating the followingmy-ocean-kafka-integration.yaml
manifest:
Remember to replace the placeholders for YOUR_PORT_CLIENT_ID
YOUR_PORT_CLIENT_SECRET
and YOUR_GIT_REPO_URL
.
Multiple sources ArgoCD documentation can be found here.
ArgoCD Application
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: my-ocean-kafka-integration
namespace: argocd
spec:
destination:
namespace: my-ocean-kafka-integration
server: https://kubernetes.default.svc
project: default
sources:
- repoURL: 'https://port-labs.github.io/helm-charts/'
chart: port-ocean
targetRevision: 0.1.14
helm:
valueFiles:
- $values/argocd/my-ocean-kafka-integration/values.yaml
parameters:
- name: port.clientId
value: YOUR_PORT_CLIENT_ID
- name: port.clientSecret
value: YOUR_PORT_CLIENT_SECRET
- name: port.baseUrl
value: https://api.getport.io
- repoURL: YOUR_GIT_REPO_URL
targetRevision: main
ref: values
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- CreateNamespace=true
The baseUrl
, port_region
, port.baseUrl
, portBaseUrl
, port_base_url
and OCEAN__PORT__BASE_URL
parameters are used to select which instance or Port API will be used.
Port exposes two API instances, one for the EU region of Port, and one for the US region of Port.
- If you use the EU region of Port (https://app.getport.io), your API URL is
https://api.getport.io
. - If you use the US region of Port (https://app.us.getport.io), your API URL is
https://api.us.getport.io
.
- Apply your application manifest with
kubectl
:
kubectl apply -f my-ocean-kafka-integration.yaml
This table summarizes the available parameters for the installation.
Parameter | Description | Example | Required |
---|---|---|---|
port.clientId | Your port client id | โ | |
port.clientSecret | Your port client secret | โ | |
port.baseUrl | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | โ | |
integration.secrets.clusterConfMapping | The Mapping of Kafka cluster names to Kafka client config | โ | |
integration.eventListener.type | The event listener type. Read more about event listeners | โ | |
integration.type | The integration to be installed | โ | |
scheduledResyncInterval | The number of minutes between each resync. When not set the integration will resync for each event listener resync event. Read more about scheduledResyncInterval | โ | |
initializePortResources | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | โ | |
sendRawDataExamples | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | โ |
This workflow/pipeline will run the Kafka integration once and then exit, this is useful for scheduled ingestion of data.
If you want the integration to update Port in real time using webhooks you should use the Real-time (self-hosted) installation option.
- GitHub
- Jenkins
- Azure Devops
- GitLab
Make sure to configure the following Github Secrets:
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | โ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | โ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | โ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | โ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | โ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | โ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | โ |
Here is an example for kafka-integration.yml
workflow file:
name: Kafka Exporter Workflow
on:
workflow_dispatch:
schedule:
- cron: '0 */1 * * *' # Determines the scheduled interval for this workflow. This example runs every hour.
jobs:
run-integration:
runs-on: ubuntu-latest
timeout-minutes: 30 # Set a time limit for the job
steps:
- uses: port-labs/ocean-sail@v1
with:
type: 'kafka'
port_client_id: ${{ secrets.OCEAN__PORT__CLIENT_ID }}
port_client_secret: ${{ secrets.OCEAN__PORT__CLIENT_SECRET }}
port_base_url: https://api.getport.io
config: |
cluster_conf_mapping: ${{ secrets.OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING }}
Your Jenkins agent should be able to run docker commands.
Make sure to configure the following Jenkins Credentials of Secret Text
type:
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | โ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | โ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | โ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | โ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | โ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | โ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | โ |
Here is an example for Jenkinsfile
groovy pipeline file:
pipeline {
agent any
stages {
stage('Run Kafka Integration') {
steps {
script {
withCredentials([
string(credentialsId: 'OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING', variable: 'OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING'),
string(credentialsId: 'OCEAN__PORT__CLIENT_ID', variable: 'OCEAN__PORT__CLIENT_ID'),
string(credentialsId: 'OCEAN__PORT__CLIENT_SECRET', variable: 'OCEAN__PORT__CLIENT_SECRET'),
]) {
sh('''
#Set Docker image and run the container
integration_type="kafka"
version="latest"
image_name="ghcr.io/port-labs/port-ocean-${integration_type}:${version}"
docker run -i --rm --platform=linux/amd64 \
-e OCEAN__EVENT_LISTENER='{"type":"ONCE"}' \
-e OCEAN__INITIALIZE_PORT_RESOURCES=true \
-e OCEAN__SEND_RAW_DATA_EXAMPLES=true \
-e OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING=$OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING \
-e OCEAN__PORT__CLIENT_ID=$OCEAN__PORT__CLIENT_ID \
-e OCEAN__PORT__CLIENT_SECRET=$OCEAN__PORT__CLIENT_SECRET \
-e OCEAN__PORT__BASE_URL='https://api.getport.io' \
$image_name
exit $?
''')
}
}
}
}
}
}
Your Azure Devops agent should be able to run docker commands. Learn more about agents here.
Variable groups store values and secrets you'll use in your pipelines across your project. Learn more
Setting Up Your Credentials
- Create a Variable Group: Name it port-ocean-credentials.
- Store the required variables (see the table below).
- Authorize Your Pipeline:
- Go to "Library" -> "Variable groups."
- Find port-ocean-credentials and click on it.
- Select "Pipeline Permissions" and add your pipeline to the authorized list.
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | โ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | โ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | โ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | โ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | โ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | โ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | โ |
Here is an example for kafka-integration.yml
pipeline file:
trigger:
- main
pool:
vmImage: "ubuntu-latest"
variables:
- group: port-ocean-credentials
steps:
- script: |
# Set Docker image and run the container
integration_type="kafka"
version="latest"
image_name="ghcr.io/port-labs/port-ocean-$integration_type:$version"
docker run -i --rm \
-e OCEAN__EVENT_LISTENER='{"type":"ONCE"}' \
-e OCEAN__INITIALIZE_PORT_RESOURCES=true \
-e OCEAN__SEND_RAW_DATA_EXAMPLES=true \
-e OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING=$(OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING) \
-e OCEAN__PORT__CLIENT_ID=$(OCEAN__PORT__CLIENT_ID) \
-e OCEAN__PORT__CLIENT_SECRET=$(OCEAN__PORT__CLIENT_SECRET) \
-e OCEAN__PORT__BASE_URL='https://api.getport.io' \
$image_name
exit $?
displayName: 'Ingest Data into Port'
Make sure to configure the following GitLab variables:
Parameter | Description | Example | Required |
---|---|---|---|
OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING | Mapping of Kafka cluster names to Kafka client config | โ | |
OCEAN__PORT__CLIENT_ID | Your Port client (How to get the credentials) id | โ | |
OCEAN__PORT__CLIENT_SECRET | Your Port client (How to get the credentials) secret | โ | |
OCEAN__PORT__BASE_URL | Your Port API URL - https://api.getport.io for EU, https://api.us.getport.io for US | โ | |
OCEAN__INITIALIZE_PORT_RESOURCES | Default true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResources | โ | |
OCEAN__SEND_RAW_DATA_EXAMPLES | Enable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is true | โ | |
OCEAN__INTEGRATION__IDENTIFIER | The identifier of the integration that will be installed | โ |
Here is an example for .gitlab-ci.yml
pipeline file:
default:
image: docker:24.0.5
services:
- docker:24.0.5-dind
before_script:
- docker info
variables:
INTEGRATION_TYPE: kafka
VERSION: latest
stages:
- ingest
ingest_data:
stage: ingest
variables:
IMAGE_NAME: ghcr.io/port-labs/port-ocean-$INTEGRATION_TYPE:$VERSION
script:
- |
docker run -i --rm --platform=linux/amd64 \
-e OCEAN__EVENT_LISTENER='{"type":"ONCE"}' \
-e OCEAN__INITIALIZE_PORT_RESOURCES=true \
-e OCEAN__SEND_RAW_DATA_EXAMPLES=true \
-e OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING=$OCEAN__INTEGRATION__CONFIG__CLUSTER_CONF_MAPPING \
-e OCEAN__PORT__CLIENT_ID=$OCEAN__PORT__CLIENT_ID \
-e OCEAN__PORT__CLIENT_SECRET=$OCEAN__PORT__CLIENT_SECRET \
-e OCEAN__PORT__BASE_URL='https://api.getport.io' \
$IMAGE_NAME
rules: # Run only when changes are made to the main branch
- if: '$CI_COMMIT_BRANCH == "main"'
The baseUrl
, port_region
, port.baseUrl
, portBaseUrl
, port_base_url
and OCEAN__PORT__BASE_URL
parameters are used to select which instance or Port API will be used.
Port exposes two API instances, one for the EU region of Port, and one for the US region of Port.
- If you use the EU region of Port (https://app.getport.io), your API URL is
https://api.getport.io
. - If you use the US region of Port (https://app.us.getport.io), your API URL is
https://api.us.getport.io
.
For advanced configuration such as proxies or self-signed certificates, click here.
Configurationโ
Port integrations use a YAML mapping block to ingest data from the third-party api into Port.
The mapping makes use of the JQ JSON processor to select, modify, concatenate, transform and perform other operations on existing fields and values from the integration API.
Examplesโ
Examples of blueprints and the relevant integration configurations:
Clusterโ
Cluster blueprint
{
"identifier": "kafkaCluster",
"title": "Cluster",
"icon": "Kafka",
"schema": {
"properties": {
"controllerId": {
"title": "Controller ID",
"type": "string"
}
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id
Brokerโ
Broker blueprint
{
"identifier": "kafkaBroker",
"title": "Broker",
"icon": "Kafka",
"schema": {
"properties": {
"address": {
"title": "Address",
"type": "string"
},
"region": {
"title": "Region",
"type": "string"
},
"version": {
"title": "Version",
"type": "string"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: broker
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + (.id | tostring)
title: .cluster_name + " " + (.id | tostring)
blueprint: '"kafkaBroker"'
properties:
address: .address
region: .config."broker.rack"
version: .config."inter.broker.protocol.version"
config: .config
relations:
cluster: .cluster_name
Topicโ
Topic blueprint
{
"identifier": "kafkaTopic",
"title": "Topic",
"icon": "Kafka",
"schema": {
"properties": {
"replicas": {
"title": "Replicas",
"type": "number"
},
"partitions": {
"title": "Partitions",
"type": "number"
},
"compaction": {
"title": "Compaction",
"type": "boolean"
},
"retention": {
"title": "Retention",
"type": "boolean"
},
"deleteRetentionTime": {
"title": "Delete Retention Time",
"type": "number"
},
"partitionsMetadata": {
"title": "Partitions Metadata",
"type": "array"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
},
"brokers": {
"target": "kafkaBroker",
"required": false,
"many": true
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: topic
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + .name
title: .cluster_name + " " + .name
blueprint: '"kafkaTopic"'
properties:
replicas: .partitions[0].replicas | length
partitions: .partitions | length
compaction: .config."cleanup.policy" | contains("compact")
retention: .config."cleanup.policy" | contains("delete")
deleteRetentionTime: .config."delete.retention.ms"
partitionsMetadata: .partitions
config: .config
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'
Consumer Groupโ
Consumer Group blueprint
{
"identifier": "kafkaConsumerGroup",
"title": "Consumer Group",
"icon": "Kafka",
"schema": {
"properties": {
"state": {
"title": "State",
"type": "string",
"description": "The current state of the consumer group."
},
"members": {
"title": "Members",
"type": "array",
"description": "List of members in the consumer group.",
"items": {
"type": "object",
"properties": {
"id": {
"title": "Member ID",
"type": "string",
"description": "Unique identifier for the consumer member."
},
"client_id": {
"title": "Client ID",
"type": "string",
"description": "Client ID of the consumer member."
},
"host": {
"title": "Host",
"type": "string",
"description": "Host address of the consumer member."
},
"assignment": {
"title": "Assignment",
"type": "object",
"description": "Details of the topic partitions assigned to the member.",
"properties": {
"topic_partitions": {
"title": "Topic Partitions",
"type": "array",
"description": "List of topic-partition pairs assigned to the member.",
"items": {
"type": "object",
"properties": {
"topic": {
"title": "Topic",
"type": "string",
"description": "Name of the topic."
},
"partition": {
"title": "Partition",
"type": "number",
"description": "Partition number within the topic."
}
}
}
}
}
}
}
}
},
"coordinator": {
"title": "Coordinator",
"type": "number",
"description": "Broker ID of the coordinator for the consumer group."
},
"partition_assignor": {
"title": "Partition Assignor",
"type": "string",
"description": "Strategy used to assign partitions to consumers."
},
"is_simple_consumer_group": {
"title": "Is Simple Consumer Group",
"type": "boolean",
"description": "Indicates if the group is a simple consumer group."
},
"authorized_operations": {
"title": "Authorized Operations",
"type": "array",
"description": "List of operations authorized for the consumer group.",
"items": {
"type": "string"
}
}
}
},
"calculationProperties": {},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: consumer_group
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + .group_id
title: .group_id
blueprint: '"kafkaConsumerGroup"'
properties:
state: .state
members: .members
coordinator: .coordinator.id
partition_assignor: .partition_assignor
is_simple_consumer_group: .is_simple_consumer_group
authorized_operations: .authorized_operations
relations:
cluster: .cluster_name