Phiên bản 3.9 của RabbitMQ giới thiệu một cấu trúc dữ liệu mới gọi là luồng . Luồng và hàng đợi khác nhau về cách chúng hoạt động với dữ liệu. Luồng tin nhắn dựa trên đăng ký, dựa vào tệp nhật ký được sắp xếp theo chủ đề để phân phối. Trong khi đó, hàng đợi tin nhắn là điểm-đến-điểm. Những khác biệt khác giữa luồng và hàng đợi được mô tả sau, nhưng sự khác biệt chính là cách chúng quản lý phân phối dữ liệu. Sự khác biệt này xác định các trường hợp sử dụng mà mỗi cấu trúc dữ liệu có hiệu quả nhất.

Điều quan trọng nữa là không nên nhầm lẫn RabbitMQ với các sản phẩm như Apache Kafka , vì các trường hợp sử dụng khác nhau. RabbitMQ cung cấp một trình môi giới tin nhắn mục đích chung, trong khi Kafka cung cấp một nền tảng phát trực tuyến sự kiện.

RabbitMQ là gì?

RabbitMQ là một hệ thống môi giới tin nhắn phân tán dựa trên Giao thức xếp hàng tin nhắn nâng cao (AMPQ) để truyền tin nhắn an toàn. Nhà sản xuất (hoặc nhà xuất bản) tạo tin nhắn, gửi tin nhắn đến nhà môi giới và người tiêu dùng (hoặc người đăng ký) sẽ nhận tin nhắn. Điều này cho phép giao tiếp không đồng bộ, trong đó nhà sản xuất và người tiêu dùng không cần phải có mặt cùng lúc. Do bản chất không đồng bộ của nó, thiết lập kết quả nhanh, có thể mở rộng và bền vững. Hướng dẫn “Hello world!” của RabbitMQ đi sâu hơn vào cách thức hoạt động của điều này.

Môi giới tin nhắn

Phần trung tâm của RabbitMQ là message broker, đóng vai trò là trung gian cho việc nhắn tin. Hãy nghĩ về message broker như một loại bảng tin. Một producer đặt một tin nhắn trên một bảng tin cụ thể và một consumer quan tâm sẽ gỡ tin nhắn đó xuống. Message broker hỗ trợ nhiều loại hàng đợi giữ tin nhắn cho đến khi consumer nhận được chúng. Để tạo tin nhắn, producer mở kết nối đến một kênh cụ thể, tạo một hàng đợi tại đó, rồi tải tin nhắn lên. Tương tự như vậy, consumer mở kết nối đến cùng một kênh, định vị hàng đợi và tải xuống bất kỳ tin nhắn nào mà nó tìm thấy.

Các loại hàng đợi

Hàng đợi là một cấu trúc dữ liệu chứa các thông điệp. Khi làm việc với RabbitMQ, tất cả các hàng đợi đều theo nguyên tắc vào trước, ra trước (FIFO). Giống như việc xếp hàng, một thông điệp bắt đầu ở cuối hàng đợi và tiến dần lên đầu hàng đợi. Có ba loại hàng đợi phổ biến:

  • Classic : Kiểu hàng đợi ban đầu, được thiết lập để loại bỏ trong phiên bản tương lai do những hạn chế về mặt kỹ thuật. Bài viết Classic Queue Mirroring có nhiều thông tin hơn về chúng và cách di chuyển đến hàng đợi quorum.
  • Quorum : Đây là một dạng hàng đợi mới hơn cung cấp lợi thế về cả độ tin cậy và tốc độ so với hàng đợi cổ điển. Nó dựa vào sự đồng thuận giữa phần lớn các bản sao khi có sự khác biệt giữa các bản sao riêng lẻ, đảm bảo tính an toàn và tính nhất quán của dữ liệu.
  • Luồng : Luồng là một loại hàng đợi tăng cường giải quyết các yêu cầu lập trình cụ thể như được mô tả trong phần còn lại của hướng dẫn này. Về cơ bản, đây là một hàng đợi không phá hủy, trong đó các thông báo không bị xóa ngay sau khi đọc.

Hàng đợi có thể bền vững hoặc tạm thời. Dữ liệu cho hàng đợi bền vững được lưu trữ trên đĩa để nó có thể tồn tại sau khi khởi động lại. Ngoài ra, dữ liệu cho hàng đợi tạm thời được lưu trữ trong bộ nhớ bất cứ khi nào có thể. Hàng đợi bền vững là lựa chọn tối ưu cho bất kỳ nhu cầu nào đòi hỏi luồng thông điệp có độ tin cậy cao. Trong khi đó, hàng đợi tạm thời hoạt động tốt hơn đối với dữ liệu nhạy cảm với thời gian, trong đó tốc độ được coi trọng hơn tính bền bỉ.

Đặc điểm hàng đợi có thể được sửa đổi tùy thuộc vào loại hàng đợi. Ví dụ, bạn có thể đặt giá trị thời gian tồn tại (TTL) cho các tin nhắn trong hàng đợi để dữ liệu không được xử lý khi không còn liên quan nữa. Các đặc điểm này cũng kiểm soát các vấn đề như độ dài tin nhắn tối đa và mức độ ưu tiên tối đa mà một tin nhắn có thể có.

Ưu điểm và nhược điểm của luồng tin nhắn so với các loại hàng đợi khác

Luồng tin nhắn có ưu điểm và nhược điểm khi so sánh với hàng đợi. Hiểu loại ứng dụng đang được sử dụng là chìa khóa. Một số ứng dụng được hưởng lợi rất nhiều từ luồng tin nhắn, trong khi những ứng dụng khác thì không.

Thuận lợi

Ưu điểm của việc sử dụng luồng vượt xa nhược điểm đối với các trường hợp sử dụng luồng thường được sử dụng. Ví dụ, trong môi trường nhà xuất bản/người đăng ký, việc cố gắng viết mã bằng hàng đợi cổ điển hoặc hàng đợi số đông có thể khó khăn. Tuy nhiên, tất cả các hỗ trợ cần thiết đều được tích hợp vào luồng.

Các luồng không phá hủy

Sự khác biệt quan trọng nhất giữa luồng và hàng đợi là luồng không phá hủy. Nhiều người dùng có thể đọc cùng một thông điệp, khiến luồng giống một tờ báo hơn là một bảng thông báo.

Các luồng không cần nhiều ràng buộc để phục vụ nhiều máy khách

Hàng đợi yêu cầu nhiều ràng buộc nếu bạn muốn gửi cùng một thông điệp đến nhiều người dùng. Lý do là khi người dùng đọc một thông điệp, thông điệp đó sẽ biến mất khỏi hàng đợi. Do đó, mỗi người dùng yêu cầu một ràng buộc riêng biệt để đảm bảo nhận được bản sao thông điệp của mình. Ngược lại, các thông điệp trong một luồng không bị xóa, cho phép nhiều người dùng truy cập thông điệp thông qua một ràng buộc duy nhất. Cách tiếp cận này tiết kiệm bộ nhớ và các tài nguyên khác, đồng thời làm cho luồng nhanh hơn vì mã tốn ít thời gian hơn để sao chép thông điệp.

Có thể truy cập các luồng ở bất kỳ vị trí nào trong dòng thời gian của tin nhắn

Để làm cho luồng hữu ích nhất có thể, người dùng có thể chỉ định tin nhắn nào cần đọc bằng cách sử dụng độ lệch tuyệt đối hoặc dấu thời gian. Ngoài ra, người dùng có thể đọc cùng một tin nhắn nhiều lần tùy theo nhu cầu bằng cách sử dụng cùng một phương pháp. Khả năng truy cập các tin nhắn cụ thể ở bất kỳ đâu trong luồng này giúp giảm nhu cầu lưu trữ của người dùng vì người dùng không cần lưu trữ bản sao của mọi tin nhắn.

Luồng nhanh hơn hàng đợi

Stream sử dụng tài nguyên hiệu quả hơn hàng đợi vì bạn không cần phải sao chép tin nhắn. Ngoài ra, stream không yêu cầu nhiều ràng buộc để hoàn thành công việc. Sau khi tin nhắn được ghi vào đĩa, nó không tiêu tốn bất kỳ bộ nhớ máy chủ nào, giải phóng nhiều bộ nhớ tốc độ cao hơn cho các tác vụ khác. Nhờ những tối ưu hóa này và các tối ưu hóa khác, stream cực kỳ nhanh khi so sánh với hàng đợi. Thêm vào khả năng thu gọn mã ứng dụng, hiệu quả của stream trở nên rõ ràng hơn nữa.

Nhược điểm

Không có giải pháp lập trình hoàn hảo nào, chỉ có giải pháp hoạt động tốt nhất trong các trường hợp sử dụng cụ thể. Mặc dù các luồng RabbitMQ cung cấp nhiều chức năng, nhưng chúng cũng có những nhược điểm khiến chúng kém hiệu quả hơn đối với một số trường hợp sử dụng.

Có thể khá lớn

Các luồng có thể tiếp tục phát triển vô thời hạn, điều đó có nghĩa là việc cấu hình đúng một luồng là điều cần thiết. RabbitMQ không bao giờ xóa bất kỳ thông báo nào khỏi luồng trừ khi bạn cung cấp các ràng buộc luồng. Bạn có thể kiểm soát kích thước của luồng theo hai cách. Cách đầu tiên là sử dụng các thiết lập x-max-length-bytesvà/hoặc x-stream-max-segment-size-bytes. Ở đây, các thông báo cũ hơn sẽ bị xóa bất cứ khi nào việc thêm một thông báo mới vượt quá các tham số thiết lập. Phương pháp thứ hai là sử dụng thiết x-max-agelập để kiểm soát thời gian luồng lưu giữ các thông báo của nó. Ở đây, khi thông báo hết thời gian, nó phải được tạo lại.

Không đảm bảo khả năng phân phối

Điều cần thiết là phải biết “bảo đảm phân phối ít nhất một lần” có nghĩa là gì. Các luồng đảm bảo rằng một số người dùng nhìn thấy tin nhắn ít nhất một lần bằng cách sử dụng sự kết hợp giữa xác nhận của nhà xuất bản và loại bỏ trùng lặp tin nhắn ở phía nhà xuất bản. Tuy nhiên, nó không đảm bảo rằng một người dùng cụ thể nhìn thấy tin nhắn. Điều này có nghĩa là mã người dùng phải được viết sao cho người dùng kiểm tra mọi tin nhắn. Một tin nhắn cũng có thể bị mất giữa chừng đối với người dùng trừ khi ứng dụng dựa vào cơ chế xác nhận của nhà xuất bản.

Các luồng đạt được đặc điểm nhanh chóng của chúng bằng cách sử dụng một số phím tắt. Ví dụ, các luồng không xóa bộ nhớ đệm của hệ điều hành một cách rõ ràng, điều đó có nghĩa là việc tắt máy chủ không được kiểm soát có thể dẫn đến mất dữ liệu.

Trước khi bạn bắt đầu

Các ví dụ trong hướng dẫn này giả định rằng bạn có cài đặt RabbitMQ. Làm theo hướng dẫn và liên kết bên dưới để thiết lập phiên bản RabbitMQ:

  1. Nếu bạn chưa thực hiện, hãy tạo một tài khoản Linode và Compute Instance. Xem hướng dẫn Bắt đầu trên Nền tảng Linode và Tạo Compute Instance của chúng tôi. Ubuntu 22.04 LTS, Nanode 1 GB, phiên bản CPU dùng chung là đủ cho các ví dụ trong hướng dẫn này.
  2. Làm theo hướng dẫn Thiết lập và Bảo mật Phiên bản Compute của chúng tôi để cập nhật hệ thống của bạn. Bạn cũng có thể muốn đặt múi giờ, cấu hình tên máy chủ, tạo tài khoản người dùng giới hạn và tăng cường quyền truy cập SSH.
  3. Thực hiện theo hướng dẫn có trong phần CloudSmith của hướng dẫn chính thức của RabbitMQ về Cài đặt trên Debian và Ubuntu . Nó cung cấp mọi thứ cần thiết cho một cài đặt cơ bản và là thiết lập được sử dụng cho hướng dẫn này. Để sử dụng tính năng luồng được đề cập trong hướng dẫn này, hãy đảm bảo rằng bạn đã cài đặt RabbitMQ 3.9 trở lên trên máy chủ của mình:sudo rabbitmqctl version
  4. Ngoài ra, hãy đảm bảo bật các plugin rabbitmq_managementrabbitmq_management_agent, và rabbitmq_web_dispatch:sudo rabbitmq-plugins enable rabbitmq_management
  5. Sử dụng lệnh sau để cài đặt pipnếu nó chưa được cài đặt trên hệ thống của bạn:sudo apt install python3-pip
  6. Sử dụng pipđể cài đặt Pika hỗ trợ Python:python3 -m pip install pika --upgrade

Ghi chúCác bước trong hướng dẫn này yêu cầu quyền root. Hãy đảm bảo chạy các bước bên dưới dưới dạng 

roothoặc với 

sudotiền tố. Để biết thêm thông tin về quyền, hãy xem hướng dẫn 

Người dùng và Nhóm Linux của chúng tôi .

Cách tạo luồng tin nhắn với RabbitMQ

Có một số tùy chọn để tạo luồng trong RabbitMQ, trong đó phương pháp lập trình và plugin là phổ biến nhất.

Luồng nên được quản lý thông qua RabbitMQ Core hay Plugin?

Hai phương pháp cơ bản để quản lý luồng RabbitMQ là sử dụng lõi luồng hoặc plug-in luồng . Bạn sử dụng phương pháp nào tùy thuộc vào cách bạn định làm việc với luồng. Nếu bạn định làm việc chủ yếu tại máy chủ bằng mã ứng dụng, thì lõi luồng có thể hoạt động tốt nhất. Mặt khác, nếu bạn định làm việc nhiều với máy khách, thì sử dụng plug-in luồng có lẽ là lựa chọn tốt nhất. Ngoài ra, plug-in luồng cung cấp quyền truy cập GUI thông qua bảng điều khiển quản lý. Xem phần so sánh Stream Core với Stream Plugin để biết thêm chi tiết.

Cần dành bao nhiêu dung lượng lưu trữ cho luồng RabbitMQ?

Phần Có thể khá lớn thảo luận về các tham số cấu hình để kiểm soát không gian lưu trữ và tính liên tục của tin nhắn trong luồng. Bạn phải phân bổ đủ không gian lưu trữ cho tin nhắn mà bạn định lưu trữ. Hãy nhớ rằng tin nhắn cũ vẫn còn cho đến khi hết hạn hoặc được thay thế bằng tin nhắn mới hơn trong hàng đợi. Đảm bảo bạn thiết lập cảnh báo không gian trống khi cấu hình RabbitMQ. Điều này cho phép bạn giải quyết các vấn đề về tin nhắn một cách chủ động, thay vì chờ ứng dụng bị sập.

Ví dụ

Một cách dễ dàng để hiểu cách hoạt động của luồng là tạo một ứng dụng đơn giản bao gồm một nhà sản xuất và một người tiêu dùng.

1.Đầu tiên, tạo một tệp python để chứa mã nhà sản xuất:

nano producer.py

Cung cấp cho tập tin những nội dung sau:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test_stream',
                      durable=True,
                      arguments={"x-queue-type": "stream"})

channel.basic_publish(exchange='',
                      routing_key='test_stream',
                      body='This is a test message.')

print("Sent message to stream.")

connection.close()

Mã sản xuất bắt đầu bằng cách tạo kết nối đến RabbitMQ. Nó sử dụng kết nối này để tạo hàng test_streamđợi. Bạn phải định nghĩa hàng đợi là bền vững, nếu không bạn sẽ nhận được thông báo lỗi. Đặt đối x-queue-typesố thành streamchuyển đổi hàng đợi thành luồng, bạn có thể xác minh thông qua bảng điều khiển RabbitMQ.

Khi hoàn tất, nhấn CTRLX, tiếp theo là Ythen Enterđể lưu tệp và thoát nano.

2.Bây giờ hãy tạo một tệp python thứ hai để chứa mã người dùng:

nano consumer.py

Cung cấp cho tập tin những nội dung sau:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='test_stream',
                      durable=True,
                      arguments={"x-queue-type": "stream"})

def print_msg(msg):
    print(f"Received {msg}")

def callback(ch, method, properties, body):
    print_msg(body)

channel.basic_qos(prefetch_count=100)

channel.basic_consume('test_stream', callback)

channel.start_consuming()

connection.close()

Mã người dùng nhận bất kỳ tin nhắn mới nào được gửi đến luồng. Cần có mã bổ sung để truy cập các tin nhắn đã có trong luồng, nhưng đây là điểm khởi đầu tốt. Nó bắt đầu bằng cách thiết lập kết nối và sau đó truy cập luồng. Cũng như khi gửi tin nhắn, hãy đảm bảo rằng luồng được định nghĩa là bền vững và bao gồm các đối số chính xác. Ứng dụng đặt số lượng truy xuất trước bằng cách sử dụng prefetch_count=100. Thiết lập này chỉ định số lượng tin nhắn mà người dùng truy xuất từ ​​luồng cùng một lúc. Người dùng có thể xử lý một loạt tin nhắn trong bộ nhớ cục bộ trước khi phải thực hiện một chuyến khứ hồi khác đến máy chủ. Lệnh gọi đến channel.basic_consume()xác định những việc cần làm với tin nhắn khi đến nơi. Sau khi ứng dụng được thiết lập, nó sẽ gọi channel.start_consuming()để liên tục xử lý tin nhắn cho đến khi nhận được lệnh ngắt bàn phím ( CTRLC).

Khi hoàn tất, nhấn CTRLX, tiếp theo là Ythen Enterđể lưu tệp và thoát nano.

3.Để kiểm tra ứng dụng, trước tiên hãy chạy mã người dùng:

python3 consumer.py

4.Bây giờ hãy mở cửa sổ terminal thứ hai và chạy mã nhà sản xuất:

python3 producer.py

Thiết bị đầu cuối của nhà sản xuất sẽ hiển thị thông báo thành công:

Sent message to stream.

Trong khi đó, thiết bị đầu cuối của người dùng sẽ hiển thị thông báo thử nghiệm của nhà sản xuất:

Received b'This is a test message.'

5.Khi hoàn tất, nhấn CTRLCđể dừng mã người tiêu dùng.

Phần kết luận

Các luồng RabbitMQ mở ra vô số khả năng để làm việc với RabbitMQ. Quan trọng nhất là chúng làm giảm lượng công việc cần thiết để triển khai một số loại tình huống lập trình nhất định. Sử dụng các luồng an toàn, đơn giản và nhất quán, loại bỏ nhu cầu các nhà phát triển phải phát minh lại bánh xe bất cứ khi nào cần một hàng đợi không phá hủy.

Thông tin thêm

Bạn có thể muốn tham khảo các nguồn sau để biết thêm thông tin về chủ đề này. Mặc dù chúng tôi cung cấp với hy vọng rằng chúng sẽ hữu ích, nhưng xin lưu ý rằng chúng tôi không thể đảm bảo tính chính xác hoặc tính kịp thời của các tài liệu được lưu trữ bên ngoài.

Nguồn : https://www.linode.com/docs/guides/how-to-create-message-stream-rabbitmq/