济源市网站建设_网站建设公司_H5网站_seo优化
2025/12/19 9:23:39 网站建设 项目流程

Installing the Kafka Schema Registry typically involves using the Confluent Platform distribution, as the registry is a Confluent-led project. You can install it using Docker (best for development), package managers like apt or yum (best for Linux servers), or manual ZIP/TAR archives.

 

Prerequisites

Before you begin, ensure you have a running Kafka cluster (with either ZooKeeper or KRaft).

  • Java: JDK 11, 17, or 21 (required for non-Docker installs).

  • Network: The Registry usually runs on port 8081.


Method 1: Using Docker (Recommended for Dev)

This is the fastest way to get started. You can use a docker-compose.yml file to spin up Kafka and the Registry together.

  1. Create a docker-compose.yml file:

YAML
 
services:broker:image: confluentinc/cp-kafka:7.7.1ports:- "9092:9092"environment:KAFKA_PROCESS_ROLES: 'broker,controller'KAFKA_NODE_ID: 1KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093'KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT'schema-registry:image: confluentinc/cp-schema-registry:7.7.1hostname: schema-registrycontainer_name: schema-registrydepends_on:- brokerports:- "8081:8081"environment:SCHEMA_REGISTRY_HOST_NAME: schema-registrySCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
  1. Run the command:

    docker-compose up -d


Method 2: Ubuntu/Debian (apt)

If you are running on a Linux server, you can install it as a system service.

  1. Install the Confluent Public Key:

    wget -qO - https://packages.confluent.io/deb/7.7/archive.key | sudo gpg --dearmor -o /usr/share/keyrings/confluent.gpg

  2. Add the Repository:

    echo "deb [arch=amd64 signed-by=/usr/share/keyrings/confluent.gpg] https://packages.confluent.io/deb/7.7 stable main" | sudo tee /etc/apt/sources.list.d/confluent.list

  3. Install the Registry:

    sudo apt-get update && sudo apt-get install confluent-schema-registry

  4. Configure: Edit /etc/schema-registry/schema-registry.properties and point it to your Kafka broker:

    kafkastore.bootstrap.servers=PLAINTEXT://your-kafka-broker:9092

  5. Start the service:

    sudo systemctl start confluent-schema-registry


Method 3: Manual Install (ZIP/TAR)

Use this if you don't have root access or want a portable installation.

  1. Download the Confluent Platform ZIP/TAR from the Confluent website.

  2. Extract the archive: tar -xvf confluent-7.7.1.tar.gz

  3. Navigate to the folder: cd confluent-7.7.1

  4. Start it:

    ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties


Verifying the Installation

Once started, you can verify the service is healthy by hitting its REST API:

Bash
curl -X GET http://localhost:8081/subjects

If it returns [] (an empty list), the service is running correctly but has no schemas yet.

Would you like me to help you write a Python or Java producer that uses this newly installed registry to send Avro messages?

 

Yes, Python please.

 

To send messages to a Kafka topic using the Schema Registry in Python, you need the confluent-kafka library. This library handles the "heavy lifting" of serializing your data into Avro format and communicating with the Registry.

1. Install Dependencies

You will need the Kafka client with Avro support:

Bash
pip install confluent-kafka fastavro

2. The Python Producer Script

This script defines a schema, registers it automatically with the Registry, and sends a message to Kafka.

Python
from confluent_kafka import Producer
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField# 1. Configure the Schema Registry Client
sr_config = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(sr_config)# 2. Define the Avro Schema
schema_str = """
{"namespace": "io.confluent.examples","type": "record","name": "User","fields": [{"name": "name", "type": "string"},{"name": "favorite_number", "type": "int"},{"name": "favorite_color", "type": "string"}]
}
"""# 3. Create the Avro Serializer
avro_serializer = AvroSerializer(schema_registry_client, schema_str)# 4. Configure the Producer
producer_config = {'bootstrap.servers': 'localhost:9092',
}
producer = Producer(producer_config)def delivery_report(err, msg):if err is not None:print(f"Delivery failed: {err}")else:print(f"Message delivered to {msg.topic()} [{msg.partition()}]")# 5. Produce Data
topic = "user_updates"
user_data = {"name": "Alice","favorite_number": 42,"favorite_color": "Blue"
}# The Serializer handles the interaction with the Registry for you
producer.produce(topic=topic,key=str(user_data["name"]),value=avro_serializer(user_data, SerializationContext(topic, MessageField.VALUE)),on_delivery=delivery_report
)producer.flush()

How the Magic Happens

The flow behind this code ensures that your data is always valid before it ever reaches the Kafka broker.

  1. Schema Check: When you call avro_serializer(...), the library checks if this schema is already in the Registry.

  2. Registration: If it's a new schema, the producer sends it to the Registry (port 8081). The Registry assigns it a unique Schema ID.

  3. Serialization: The producer converts your Python dictionary into binary Avro bytes and prepends that 5-byte Schema ID to the message.

  4. Send: The tiny binary payload is sent to Kafka (port 9092).

Common Pitfalls to Watch For:

  • Field Types: If you try to send a string in the favorite_number field, the avro_serializer will raise an error locally before the message is even sent.

  • Registry URL: Ensure your Python script can reach http://localhost:8081. If you are running Python inside a Docker container, you might need to use the container name (e.g., http://schema-registry:8081).

  • Schema Evolution: If you later change the schema (e.g., adding a field), the Registry will check your Compatibility Level before allowing the producer to send data.

Would you like me to show you the Consumer code so you can verify the data is being read and decoded correctly?

 

Yes, thanks.

 

To complete the circuit, you need an Avro Consumer. The consumer's job is to see that 5-byte ID at the start of the message, look up the matching schema from the Registry, and convert the binary data back into a readable Python dictionary.

The Python Consumer Script

Python
 
from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField# 1. Configure Schema Registry Client
sr_config = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(sr_config)# 2. Create the Deserializer 
# (You don't need to provide the schema string here; it fetches it via the ID)
avro_deserializer = AvroDeserializer(schema_registry_client)# 3. Configure the Consumer
consumer_config = {'bootstrap.servers': 'localhost:9092','group.id': 'python-consumer-group','auto.offset.reset': 'earliest'
}
consumer = Consumer(consumer_config)
topic = "user_updates"
consumer.subscribe([topic])print(f"Listening for messages on {topic}...")try:while True:msg = consumer.poll(1.0) # Wait for a messageif msg is None:continueif msg.error():print(f"Consumer error: {msg.error()}")continue# 4. Deserialize the binary value into a Python dictuser = avro_deserializer(msg.value(), SerializationContext(topic, MessageField.VALUE))if user is not None:print(f"Successfully decoded user: {user['name']} (Color: {user['favorite_color']})")except KeyboardInterrupt:pass
finally:consumer.close()

How the Interaction Works

The Consumer doesn't need to know the schema beforehand. It learns it "on the fly" by talking to the Registry.

  1. Fetch Message: The consumer pulls raw bytes from the Kafka Broker.

  2. Extract ID: It looks at the first few bytes to find the Schema ID.

  3. Registry Lookup: If it hasn't seen this ID before, it asks the Schema Registry: "What is the schema for ID #1?"

  4. Cache: The consumer caches that schema locally so it doesn't have to ask the Registry for every single message.

  5. Decode: It uses the fetched schema to turn the binary bytes back into a Python object.

Why this is powerful:

If your Producer updates the schema (e.g., adds a middle_name field), the Consumer will automatically detect the new Schema ID, fetch the new schema, and start processing the updated records without you needing to restart or update the Consumer code.

Quick Troubleshooting Tips:

  • Topic Names: Ensure the topic variable in both scripts matches exactly.

  • Port 8081: If the Consumer hangs, verify it can reach the Registry.

  • Data Types: Remember that Avro is strict. If the producer sent an int and you try to treat it as a string in your logic, Python will handle it fine, but the deserializer ensures the structure matches the contract.

Would you like me to show you how to check the Registry's "Subject" list using a curl command to see your saved schemas?

 

Yes, I would.

 

To see what is happening inside your Schema Registry, you can use the REST API. This is helpful for verifying that your Python producer actually registered the schema and for seeing the raw schema structure.

1. List all "Subjects"

In the Schema Registry, a Subject is essentially the name of the schema associated with a topic. By default, it follows the format topic-name-value.

Bash
 
curl http://localhost:8081/subjects

Expected Output: ["user_updates-value"]


2. View Versions for a Subject

A single subject can have multiple versions if you have updated the schema.

Bash
 
curl http://localhost:8081/subjects/user_updates-value/versions

Expected Output: [1] (or more if you've made changes).


3. View the Full Schema Definition

To see the actual Avro JSON for the latest version, use this command:

Bash
 
curl http://localhost:8081/subjects/user_updates-value/versions/latest

Expected Output:

JSON
 
{"subject": "user_updates-value","version": 1,"id": 1,"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"io.confluent.examples\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":\"int\"},{\"name\":\"favorite_color\",\"type\":\"string\"}]}"
}

4. Check Compatibility Settings

This determines if you are allowed to make "breaking" changes to your schema.

Bash
 
curl http://localhost:8081/config

Default Output: {"compatibilityLevel":"BACKWARD"}


Visualizing the Data Flow

When you run these commands, you are looking at the central "brain" that coordinates the communication between your producer and consumer.

Summary of the Workflow

  1. Producer: Sends the schema to /subjects/user_updates-value/versions.

  2. Registry: Validates compatibility and returns a unique Schema ID.

  3. Consumer: Sees the ID in a message and calls /schemas/ids/1 to get the definition.

  4. You: Use the curl commands above to inspect that metadata.

 

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询