Apache Kafka , thường được gọi đơn giản là Kafka, là một nền tảng mã nguồn mở phổ biến để quản lý và xử lý luồng. Kafka được xây dựng xung quanh khái niệm về một sự kiện. Các tác nhân bên ngoài, độc lập và không đồng bộ, gửi và nhận thông báo sự kiện đến và đi từ Kafka. Kafka chấp nhận một luồng sự kiện liên tục từ nhiều máy khách, lưu trữ chúng và có khả năng chuyển tiếp chúng đến một nhóm máy khách thứ hai để xử lý thêm. Nó linh hoạt, mạnh mẽ, đáng tin cậy, độc lập và cung cấp độ trễ thấp cùng với thông lượng cao. LinkedIn ban đầu đã phát triển Kafka, nhưng Apache Software Foundation cung cấp phiên bản mã nguồn mở hiện tại.

Trước khi bạn bắt đầu

  1. Nếu bạn chưa thực hiện, hãy tạo một tài khoản Linode và Compute Instance. Xem hướng dẫn Bắt đầu với Linode và Tạo Compute Instance của chúng tôi .
  2. Làm theo hướng dẫn Thiết lập và Bảo mật Phiên bản Compute của chúng tôi để cập nhật hệ thống của bạn. Bạn cũng có thể muốn đặt múi giờ, cấu hình tên máy chủ, tạo tài khoản người dùng giới hạn và tăng cường quyền truy cập SSH.

Ghi chú: Hướng dẫn này được viết cho người dùng không phải root. Các lệnh yêu cầu quyền nâng cao được thêm tiền tố sudo. Nếu bạn không quen với sudo lệnh này, hãy xem hướng dẫn Người dùng và Nhóm Linux .

Tóm tắt về quy trình cài đặt Apache Kafka

Cài đặt Kafka hoàn chỉnh bao gồm các bước cấp cao được liệt kê bên dưới. Mỗi bước được mô tả trong một phần riêng. Các hướng dẫn này được thiết kế cho Ubuntu 24.04 nhưng nhìn chung có giá trị đối với bất kỳ bản phân phối Linux nào dựa trên Debian.

  1. Cài đặt Java
  2. Tải xuống và cài đặt Apache Kafka
  3. Chạy Kafka
  4. Tạo chủ đề Kafka
  5. Viết và đọc sự kiện Kafka
  6. Xử lý dữ liệu với Kafka Streams
  7. Tạo tập tin hệ thống cho Zookeeper và Kafka

Cài đặt Java

Bạn phải cài đặt Java trước khi có thể sử dụng Apache Kafka. Hướng dẫn này giải thích cách cài đặt OpenJDK, phiên bản mã nguồn mở của Java.

1.Cập nhật các gói Ubuntu của bạn.

sudo apt update

2.Cài đặt OpenJDK bằng apt.

sudo apt install openjdk-21-jdk

3.Xác nhận bạn đã cài đặt phiên bản Java mong muốn.

java -version

Java trả về một số thông tin cơ bản về cài đặt. Thông tin có thể thay đổi tùy theo phiên bản bạn đã cài đặt.

openjdk 21.0.3 2024-04-16
OpenJDK Runtime Environment (build 21.0.3+9-Ubuntu-1ubuntu1)
OpenJDK 64-Bit Server VM (build 21.0.3+9-Ubuntu-1ubuntu1, mixed mode, sharing)

Tải xuống và cài đặt Apache Kafka

Có thể tải xuống kho lưu trữ Tar cho Apache Kafka trực tiếp từ Trang web Apache và cài đặt theo quy trình được nêu trong phần này. Tên tải xuống Kafka thay đổi tùy theo phiên bản phát hành. Thay thế tên tệp của riêng bạn bất cứ nơi nào bạn thấy kafka_2.13-3.7.0.tgz.

1.Điều hướng đến trang Tải xuống Apache Kafka và chọn bản phát hành Kafka bạn muốn. Chúng tôi khuyên bạn nên chọn phiên bản mới nhất, hiện tại là Apache Kafka 2.7. Liên kết này sẽ đưa bạn đến trang đích nơi bạn có thể sử dụng HTTP hoặc FTP để tải xuống tệp tar.

2.Nếu bạn đã tải phần mềm xuống một máy tính khác với máy chủ, hãy chuyển các tệp Apache Kafka sang máy chủ thông qua scpftp, hoặc một phương pháp chuyển tệp khác. Thay thế các giá trị uservà yourhostbằng tên người dùng và địa chỉ IP máy chủ của bạn:

scp /localpath/kafka_2.13-3.7.0.tgz user@192.0.2.0:~/

Ghi chú: Nếu việc truyền dữ liệu bị chặn, hãy xác minh tường lửa của bạn không chặn kết nối. Thực hiện lệnh sudo ufw allow 22/tcpallow ufw để cho phép scp truyền dữ liệu.

3.Tùy chọn : Bạn có thể xác nhận bạn đã tải xuống tệp đúng cách bằng tổng kiểm tra SHA512. Bạn có thể tìm tệp tổng kiểm tra trên trang Tải xuống Apache Kafka . Mỗi bản phát hành đều có liên kết đến sha512tệp tương ứng. Tải xuống tệp này và chuyển tệp này đến máy chủ Kafka của bạn bằng cách sử dụng scp. Đặt tệp tổng kiểm tra vào cùng thư mục với tệp tar của bạn. Thực hiện lệnh sau để tạo tổng kiểm tra cho tệp tar:

gpg --print-md SHA512 kafka_2.13-3.7.0.tgz

So sánh đầu ra từ lệnh này với nội dung của tệp SHA512. Hai tổng kiểm tra phải khớp nhau. Bước này không xác nhận tính xác thực của tệp, chỉ xác nhận tính hợp lệ của tệp. Đầu ra tổng kiểm tra có định dạng sau:

kafka_2.13-3.7.0.tgz: F3DD1FD8 8766D915 0D3D395B 285BFA75 F5B89A83 58223814
                      90C8428E 6E568889 054DDB5F ADA1EB63 613A6441 989151BC
                      7C7D6CDE 16A871C6 674B909C 4EDD4E28

4.Để bảo mật hơn, hãy xác nhận tệp đã được ký. Tải xuống tệp .ascvà khóa ký liên quan đến bản phát hành. Bạn có thể tìm thấy các tệp này trên trang Tải xuống Apache Kafka . Liên kết đến KEYStệp nằm ở đầu trang. Mỗi bản phát hành bao gồm một liên kết đến asctệp của nó. Tải xuống các tệp này và chuyển chúng đến máy chủ Kafka của bạn bằng cách sử dụng scp. Đặt các tệp này vào cùng thư mục với tệp tar của bạn.

Nhập khóa từ KEYStệp. Thao tác này sẽ cài đặt toàn bộ bộ khóa.

gpg --import KEYS

Sử dụng gpg để xác minh chữ ký.

gpg --verify kafka_2.13-3.7.0.tgz.asc  kafka_2.13-3.7.0.tgz

Đầu ra sẽ liệt kê khóa RSA thực tế và người đã ký khóa đó.

gpg: Signature made Wed Dec 16 14:03:36 2020 UTC
gpg:                using RSA key DFB5ABA9CD50A02B5C2A511662A9813636302260
gpg:                issuer "bbejeck@apache.org"
gpg: Good signature from "Bill Bejeck (CODE SIGNING KEY) <bbejeck@apache.org>" [unknown]

Ghi chú: Gpgcó thể cảnh báo bạn rằng “khóa không được chứng nhận bằng chữ ký đáng tin cậy”. Thật không may, không có cách dễ dàng nào để xác nhận tính xác thực của người ký và đối với hầu hết các lần triển khai, điều này là không cần thiết. Đối với xác thực không đủ điều kiện cho các lần triển khai bảo mật cao, hãy làm theo các bước để Xác thực tính xác thực của Khóa trên trang Xác thực Apache Kafka .

5.Giải nén các tập tin bằng tartiện ích. Sau khi quá trình giải nén hoàn tất, hãy xóa tệp lưu trữ hoặc lưu trữ ở nơi an toàn khác trên hệ thống của bạn.

tar -zxvf kafka_2.13-3.7.0.tgz

6.Tùy chọn : Tạo một thư mục tập trung mới cho Kafka và di chuyển các tệp đã giải nén vào thư mục gốc Kafka mới này.

sudo mkdir /home/kafka
sudo mv kafka_2.13-3.7.0 /home/kafka

Run Kafka

Kafka có thể được khởi chạy trực tiếp từ dòng lệnh. Bạn phải khởi chạy mô-đun Zookeeper trước khi chạy Kafka.

1.Xem lại các thiết lập có trong kafka_2.13-3.7.0/config/server.propertiestệp trong thư mục Kafka của bạn. Hiện tại, các thiết lập mặc định là tốt. Nhưng chúng tôi khuyên bạn nên đặt delete.topic.enablethuộc tính trueở cuối tệp. Điều này cho phép bạn xóa bất kỳ chủ đề nào bạn có thể tạo trong quá trình thử nghiệm.

delete.topic.enable = true

2.Chuyển đến thư mục gốc của Kafka và khởi động Zookeeper.

cd /home/kafka/kafka_2.13-3.7.0/
bin/zookeeper-server-start.sh config/zookeeper.properties

Ghi chú: Giữ nguyên mọi cài đặt ở Zookeeper.propertieschế độ mặc định cho hầu hết các lần triển khai.

3.Mở phiên điều khiển mới và khởi chạy Kafka.

cd /home/kafka/kafka_2.13-3.7.0/
bin/kafka-server-start.sh config/server.properties

Tạo chủ đề Kafka

Trước khi bạn có thể gửi bất kỳ sự kiện nào đến Kafka, bạn phải tạo một chủ đề để chứa các sự kiện. Bạn có thể tìm thấy lời giải thích về các chủ đề trong Giới thiệu về Kafka của Linode .

1.Mở một phiên điều khiển mới.

2.Thay đổi thư mục thành thư mục Kafka của bạn và tạo một chủ đề mới có tên test-events:

cd /home/kafka/kafka_2.13-3.7.0/
bin/kafka-topics.sh --create --topic test-events --bootstrap-server localhost:9092

Kafka xác nhận chủ đề đã được tạo:

Created topic test-events.

3.Tạo danh sách tất cả các chủ đề trong cụm với tùy --listchọn:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Bạn sẽ thấy được test-eventsliệt kê trong kết quả đầu ra:

test-events

4.Sử dụng describecờ để hiển thị tất cả thông tin về chủ đề mới:

bin/kafka-topics.sh --describe --topic test-events --bootstrap-server localhost:9092

Kafka trả về bản tóm tắt về chủ đề, bao gồm số lượng phân vùng và hệ số sao chép:

Topic: test-events  TopicId: URC3EPiqTUW2fBkJuW5AYQ PartitionCount: 1   ReplicationFactor: 1    Configs:
    Topic: test-events  Partition: 0    Leader: 0   Replicas: 0 Isr: 0

Viết và đọc sự kiện Kafka

Giao diện dòng lệnh của Kafka cho phép bạn nhanh chóng kiểm tra chủ đề mới. Sử dụng API để tạo Producer và viết một số sự kiện vào chủ đề. Sau đó, tạo consumer và đọc các sự kiện bạn đã viết.

1.Mở phiên điều khiển mới cho nhà sản xuất và thay đổi thư mục thành thư mục gốc Kafka.

cd /home/kafka/kafka_2.13-3.7.0/

2.Cấu hình một nhà sản xuất và chỉ định chủ đề cho các sự kiện của nó. Bạn chưa tạo bất kỳ sự kiện nào, chỉ có một máy khách có khả năng gửi sự kiện. Kafka trả về lời nhắc >cho biết nhà sản xuất đã sẵn sàng.

bin/kafka-console-producer.sh --topic test-events --bootstrap-server localhost:9092

3.Gửi một vài cặp khóa-giá trị đến Kafka. Tách các khóa và giá trị bằng dấu :. Bạn có thể chọn viết tin nhắn bằng các khóa khác nhau hoặc bằng cùng một khóa. Nếu bạn không chỉ định khóa và chỉ định giá trị, sự kiện sẽ được gán khóa NULL.

key1: This is event 1
key2: This is event 2
key1: This is event 3

4.Mở phiên điều khiển mới để chạy trình tiêu dùng và thay đổi thư mục thành thư mục gốc Kafka.

cd /home/kafka/kafka_2.13-3.7.0/

5.Tạo người dùng, chỉ định chủ test-eventsđề mà nó sẽ đọc. --from-beginningCờ cho biết nó sẽ đọc tất cả các sự kiện bắt đầu từ đầu chủ đề.

bin/kafka-console-consumer.sh --topic test-events --from-beginning --bootstrap-server localhost:9092

Ghi chú: API Consumer của Kafka cung cấp các tùy chọn để định dạng các sự kiện đến. Chạy lệnh sau để xem danh sách đầy đủ.bin/kafka-console-consumer.sh

6.Người dùng ngay lập tức thăm dò Kafka về bất kỳ sự kiện nổi bật nào trong chủ đề và hiển thị chúng trên màn hình. Bạn sẽ có thể thấy tất cả các sự kiện bạn đã gửi trước đó.

key1: This is event 1
key2: This is event 2
key1: This is event 3

7.Quay lại bảng điều khiển của nhà sản xuất (nhà sản xuất vẫn đang chạy) và tạo một sự kiện mới khác.

key2: This is event 4

8.Sự kiện sẽ xuất hiện ngay lập tức trên bảng điều khiển của người dùng.

key2: This is event 4

9.Dừng nhà sản xuất hoặc người tiêu dùng bất cứ lúc nào bạn muốn bằng một Ctrl-Clệnh.

Ghi chú: Sự kiện có độ bền và có thể được đọc nhiều lần tùy ý. Bạn có thể tạo người dùng thứ hai cho cùng một chủ đề và yêu cầu người dùng đó đọc tất cả các sự kiện giống nhau.

Xử lý dữ liệu với Kafka Streams

Kafka Streams là một thư viện để thực hiện các chuyển đổi và phân tích theo thời gian thực trên một luồng. Ứng dụng Kafka Streams thường hoạt động như một người tiêu dùng và một nhà sản xuất. Nó thăm dò một chủ đề để tìm sự kiện mới, xử lý dữ liệu và truyền đầu ra của nó dưới dạng sự kiện đến một chủ đề thứ hai. Các ứng dụng khác là người tiêu dùng của chủ đề thứ hai này. Kafka Streams được giải thích trong Giới thiệu về Apache Kafka của Linode .

Bạn có thể sử dụng WordCountDemoứng dụng Java đi kèm với Kafka Streams để chạy bản demo nhanh. WordCountDemosử dụng streams-plaintext-inputcác sự kiện. Nó phân tích và xử lý các dòng, lưu trữ các từ và số lượng trong một bảng. Số lượng từ được cập nhật được chuyển đổi thành một luồng sự kiện và được gửi đến chủ streams-plaintext-inputđề. Toàn bộ tệp được bao gồm bên dưới.

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(
      "streams-plaintext-input",
      Consumed.with(stringSerde, stringSerde)
    );

KTable<String, Long> wordCounts = textLines
    // Split each text line, by whitespace, into words.
    .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

    // Group the text words as message keys
    .groupBy((key, value) -> value)

    // Count the occurrences of each word (message key).
    .count();

// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

1.Tạo một chủ đề trên cụm Kafka để lưu trữ dữ liệu số lượng từ mẫu.

cd /home/kafka/kafka_2.13-3.7.0/
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-plaintext-input

Kafka xác nhận đã tạo ra chủ đề này:

Created topic streams-plaintext-input.

2.Tạo chủ đề thứ hai để lưu trữ đầu ra của ứng dụng Kafka Streams. Đặt chính sách dọn dẹp thành mục nhập nhỏ gọn để chỉ lưu trữ số lượng từ đã cập nhật.

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic streams-wordcount-output --config cleanup.policy=compact

Kafka một lần nữa khẳng định chính nó đã tạo ra chủ đề:

Created topic streams-wordcount-output.

3.Chạy WordCountDemoứng dụng.

bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

4.Khởi chạy trình sản xuất để gửi dữ liệu thử nghiệm tới luồng WordCountDemo dưới dạng streams-plaintext-inputsự kiện.

cd /home/kafka/kafka_2.13-3.7.0/
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input

5.Tạo một consumer để lắng nghe streams-wordcount-outputluồng. Luồng này chứa các kết quả được cập nhật của ứng WordCountDemodụng. Đặt các thuộc tính định dạng như sau để tạo ra đầu ra dễ đọc hơn.

cd /home/kafka/kafka_2.13-3.7.0/
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

Nhập một số dữ liệu thử nghiệm tại dấu nhắc của nhà sản xuất.

This is not the end

Xác minh số lượng từ được hiển thị trong cửa sổ người dùng.

this    1
is      1
not     1
the     1
end     1

8.Sử dụng trình sản xuất để viết thêm dữ liệu đầu vào thử nghiệm.

The end of the line

Xem lại kết quả mới từ người tiêu dùng. Lưu ý cách số lượng từ đã được cập nhật.

the     2
end     2
of      1
the     3
line    1

10.Khi bạn hoàn tất bản demo, hãy sử dụng Ctrl-Cđể dừng nhà sản xuất, người tiêu dùng và ứng dụng WordCountDemo.

Tạo tập tin hệ thống cho Zookeeper và Kafka

Cho đến bây giờ, bạn đã khởi động Zookeeper và Kafka từ dòng lệnh bên trong thư mục Kafka. Điều này hoàn toàn chấp nhận được, nhưng sẽ dễ dàng hơn nhiều nếu tạo các mục nhập cho chúng bên trong /etc/systemd/system/và khởi động chúng bằng systemctl enable.

Ghi chú: Nếu Kafka và Zookeeper vẫn đang chạy, hãy tắt chúng bằng tổ hợp phím CTRLC trước khi thực hiện theo các bước dưới đây.

1.Tạo một tệp hệ thống cho Zookeeper có tên là /etc/systemd/system/zookeeper.service.

sudo nano /etc/systemd/system/zookeeper.service

2.Chỉnh sửa tệp và thêm thông tin sau. Sử dụng vị trí thư mục Kafka của bạn trong tên đường dẫn.

[Unit]
Description=Apache Zookeeper Server
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
ExecStart=/home/kafka/kafka_2.13-3.7.0/bin/zookeeper-server-start.sh /home/kafka/kafka_2.13-3.7.0/config/zookeeper.properties
ExecStop=/home/kafka/kafka_2.13-3.7.0/bin/zookeeper-server-stop.sh

Restart=on-abnormal

[Install]
WantedBy=multi-user.target

3.Tạo tệp thứ hai cho máy chủ Kafka có tên là /etc/systemd/system/kafka.service.

sudo nano /etc/systemd/system/kafka.service

4.Chỉnh sửa tệp và thêm thông tin sau. Xác minh đường dẫn đầy đủ đến ứng dụng Java của bạn và nhập nó làm đường JAVA_HOMEdẫn.

[Unit]
Description=Apache Kafka Server
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
Environment="JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64"
ExecStart=/home/kafka/kafka_2.13-3.7.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-3.7.0/config/server.properties
ExecStop=/home/kafka/kafka_2.13-3.7.0/bin/kafka-server-stop.sh

Restart=on-abnormal

[Install]
WantedBy=multi-user.target

5.Tải lại systemddaemon và khởi động cả hai ứng dụng.

sudo systemctl daemon-reload
sudo systemctl enable --now zookeeper
sudo systemctl enable --now kafka

6.Xác nhận cả Kafka và Zookeeper đều đang chạy như mong đợi. Xác minh trạng thái của cả hai quy trình bằng systemctl status.

sudo systemctl status kafka zookeeper

Cả hai mục nhập đều phải hiển thị là đang hoạt động.

kafka.service - Apache Kafka Server
    Loaded: loaded (/etc/systemd/system/kafka.service; enabled; vendor preset: enabled)
    Active: active (running) since Thu 2021-01-21 15:13:45 UTC; 4s ago
...

Tắt môi trường Kafka

Khi bạn hoàn tất việc sử dụng Kafka, chúng tôi khuyên bạn nên tắt tất cả các thành phần và xóa mọi nhật ký không cần thiết.

1.Tắt mọi trình tiêu thụ và trình sản xuất Kafka cũng như mọi ứng dụng Kafka Streams bằng ctrl-Clệnh.

2.Tắt Kafka và sau đó là Zookeeper bằng systemctl stoplệnh. Nếu bạn không đăng ký ứng dụng Kafka của mình với systemddaemon, hãy tắt chúng bằng Ctrl-Clệnh.

sudo systemctl stop kafka
sudo systemctl stop zookeeper

Dọn sạch mọi dữ liệu thử nghiệm bằng lệnh sau:

sudo rm -rf /tmp/kafka-logs /tmp/zookeeper

Thông tin thêm

Bạn có thể muốn tham khảo các nguồn sau để biết thêm thông tin về chủ đề này. Mặc dù chúng tôi cung cấp với hy vọng rằng chúng sẽ hữu ích, nhưng xin lưu ý rằng chúng tôi không thể đảm bảo tính chính xác hoặc tính kịp thời của các tài liệu được lưu trữ bên ngoài.

Nguồn: https://www.linode.com/docs/guides/how-to-install-apache-kafka-on-ubuntu/