Skip to main content

Check out Port for yourselfย 

Kafka

Port's Kafka integration allows you to model Kafka resources in your software catalog and ingest data into them.

Overviewโ€‹

  • Map and organize your desired Kafka resources and their metadata in Port (see supported resources below).
  • Watch for Kafka object changes (create/update/delete) in real-time, and automatically apply the changes to your entities in Port.

Setupโ€‹

Choose one of the following installation methods:

Using this installation option means that the integration will be able to update Port in real time using webhooks.

Prerequisites

To install the integration, you need a Kubernetes cluster that the integration's container chart will be deployed to.

Please make sure that you have kubectl and helm installed on your machine, and that your kubectl CLI is connected to the Kubernetes cluster where you plan to install the integration.

Troubleshooting

If you are having trouble installing this integration, please refer to these troubleshooting steps.

For details about the available parameters for the installation, see the table below.

To install the integration using Helm:

  1. Go to the Kafka data source page in your portal.

  2. Select the Real-time and always on method:

  3. A helm command will be displayed, with default values already filled out (e.g. your Port cliend ID, client secret, etc).
    Copy the command, replace the placeholders with your values, then run it in your terminal to install the integration.

Selecting a Port API URL by account region

The baseUrl, port_region, port.baseUrl, portBaseUrl, port_base_url and OCEAN__PORT__BASE_URL parameters are used to select which instance or Port API will be used.

Port exposes two API instances, one for the EU region of Port, and one for the US region of Port.

This table summarizes the available parameters for the installation.

ParameterDescriptionExampleRequired
port.clientIdYour port client idโœ…
port.clientSecretYour port client secretโœ…
port.baseUrlYour Port API URL - https://api.getport.io for EU, https://api.us.getport.io for USโœ…
integration.secrets.clusterConfMappingThe Mapping of Kafka cluster names to Kafka client configโœ…
integration.eventListener.typeThe event listener type. Read more about event listenersโœ…
integration.typeThe integration to be installedโœ…
scheduledResyncIntervalThe number of minutes between each resync. When not set the integration will resync for each event listener resync event. Read more about scheduledResyncIntervalโŒ
initializePortResourcesDefault true, When set to true the integration will create default blueprints and the port App config Mapping. Read more about initializePortResourcesโŒ
sendRawDataExamplesEnable sending raw data examples from the third party API to port for testing and managing the integration mapping. Default is trueโŒ
Advanced integration configuration

For advanced configuration such as proxies or self-signed certificates, click here.

Configurationโ€‹

Port integrations use a YAML mapping block to ingest data from the third-party api into Port.

The mapping makes use of the JQ JSON processor to select, modify, concatenate, transform and perform other operations on existing fields and values from the integration API.

Examplesโ€‹

Examples of blueprints and the relevant integration configurations:

Clusterโ€‹

Cluster blueprint
{
"identifier": "kafkaCluster",
"title": "Cluster",
"icon": "Kafka",
"schema": {
"properties": {
"controllerId": {
"title": "Controller ID",
"type": "string"
}
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: cluster
selector:
query: "true"
port:
entity:
mappings:
identifier: .name
title: .name
blueprint: '"kafkaCluster"'
properties:
controllerId: .controller_id

Brokerโ€‹

Broker blueprint
{
"identifier": "kafkaBroker",
"title": "Broker",
"icon": "Kafka",
"schema": {
"properties": {
"address": {
"title": "Address",
"type": "string"
},
"region": {
"title": "Region",
"type": "string"
},
"version": {
"title": "Version",
"type": "string"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: broker
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + (.id | tostring)
title: .cluster_name + " " + (.id | tostring)
blueprint: '"kafkaBroker"'
properties:
address: .address
region: .config."broker.rack"
version: .config."inter.broker.protocol.version"
config: .config
relations:
cluster: .cluster_name

Topicโ€‹

Topic blueprint
{
"identifier": "kafkaTopic",
"title": "Topic",
"icon": "Kafka",
"schema": {
"properties": {
"replicas": {
"title": "Replicas",
"type": "number"
},
"partitions": {
"title": "Partitions",
"type": "number"
},
"compaction": {
"title": "Compaction",
"type": "boolean"
},
"retention": {
"title": "Retention",
"type": "boolean"
},
"deleteRetentionTime": {
"title": "Delete Retention Time",
"type": "number"
},
"partitionsMetadata": {
"title": "Partitions Metadata",
"type": "array"
},
"config": {
"title": "Config",
"type": "object"
}
}
},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
},
"brokers": {
"target": "kafkaBroker",
"required": false,
"many": true
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: topic
selector:
query: "true"
port:
entity:
mappings:
identifier: .cluster_name + "_" + .name
title: .cluster_name + " " + .name
blueprint: '"kafkaTopic"'
properties:
replicas: .partitions[0].replicas | length
partitions: .partitions | length
compaction: .config."cleanup.policy" | contains("compact")
retention: .config."cleanup.policy" | contains("delete")
deleteRetentionTime: .config."delete.retention.ms"
partitionsMetadata: .partitions
config: .config
relations:
cluster: .cluster_name
brokers: '[.cluster_name + "_" + (.partitions[].replicas[] | tostring)] | unique'

Consumer Groupโ€‹

Consumer Group blueprint
{
"identifier": "kafkaConsumerGroup",
"title": "Consumer Group",
"icon": "Kafka",
"schema": {
"properties": {
"state": {
"title": "State",
"type": "string",
"description": "The current state of the consumer group."
},
"members": {
"title": "Members",
"type": "array",
"description": "List of members in the consumer group.",
"items": {
"type": "object",
"properties": {
"id": {
"title": "Member ID",
"type": "string",
"description": "Unique identifier for the consumer member."
},
"client_id": {
"title": "Client ID",
"type": "string",
"description": "Client ID of the consumer member."
},
"host": {
"title": "Host",
"type": "string",
"description": "Host address of the consumer member."
},
"assignment": {
"title": "Assignment",
"type": "object",
"description": "Details of the topic partitions assigned to the member.",
"properties": {
"topic_partitions": {
"title": "Topic Partitions",
"type": "array",
"description": "List of topic-partition pairs assigned to the member.",
"items": {
"type": "object",
"properties": {
"topic": {
"title": "Topic",
"type": "string",
"description": "Name of the topic."
},
"partition": {
"title": "Partition",
"type": "number",
"description": "Partition number within the topic."
}
}
}
}
}
}
}
}
},
"coordinator": {
"title": "Coordinator",
"type": "number",
"description": "Broker ID of the coordinator for the consumer group."
},
"partition_assignor": {
"title": "Partition Assignor",
"type": "string",
"description": "Strategy used to assign partitions to consumers."
},
"is_simple_consumer_group": {
"title": "Is Simple Consumer Group",
"type": "boolean",
"description": "Indicates if the group is a simple consumer group."
},
"authorized_operations": {
"title": "Authorized Operations",
"type": "array",
"description": "List of operations authorized for the consumer group.",
"items": {
"type": "string"
}
}
}
},
"calculationProperties": {},
"relations": {
"cluster": {
"target": "kafkaCluster",
"required": true,
"many": false
}
}
}
Integration configuration
createMissingRelatedEntities: false
deleteDependentEntities: true
resources:
- kind: consumer_group
selector:
query: 'true'
port:
entity:
mappings:
identifier: .cluster_name + "_" + .group_id
title: .group_id
blueprint: '"kafkaConsumerGroup"'
properties:
state: .state
members: .members
coordinator: .coordinator.id
partition_assignor: .partition_assignor
is_simple_consumer_group: .is_simple_consumer_group
authorized_operations: .authorized_operations
relations:
cluster: .cluster_name