[ad_1]
Tôi bắt tay vào sứ mệnh tích hợp Apache Flink với Kafka và PostgreSQL bằng Docker. Điều khiến nỗ lực này trở nên đặc biệt thú vị là việc sử dụng pyFlink – phiên bản Python của Flink – vừa mạnh mẽ vừa tương đối hiếm. Thiết lập này nhằm mục đích xử lý việc xử lý và lưu trữ dữ liệu theo thời gian thực một cách hiệu quả. Trong các phần tiếp theo, tôi sẽ trình bày cách tôi đạt được điều này, thảo luận về những thách thức gặp phải và cách tôi vượt qua chúng. Tôi sẽ kết thúc bằng hướng dẫn từng bước để bạn có thể tự mình xây dựng và thử nghiệm quy trình phát trực tuyến này.
Cơ sở hạ tầng chúng tôi sẽ xây dựng được minh họa dưới đây. Bên ngoài, có một mô-đun dành cho nhà xuất bản mô phỏng các thông báo cảm biến IoT, tương tự như những gì đã được thảo luận trong phần bài trước. Bên trong vùng chứa Docker, chúng tôi sẽ tạo hai chủ đề Kafka. Chủ đề đầu tiên, cảm biến, sẽ lưu trữ tin nhắn đến từ các thiết bị IoT trong thời gian thực. Sau đó, ứng dụng Flink sẽ sử dụng các tin nhắn từ chủ đề này, lọc những tin nhắn có nhiệt độ trên 30°C và xuất bản chúng sang chủ đề thứ hai, cảnh báo. Ngoài ra, ứng dụng Flink sẽ chèn các thông báo đã sử dụng vào bảng PostgreSQL được tạo riêng cho mục đích này. Thiết lập này cho phép chúng tôi duy trì dữ liệu cảm biến ở định dạng có cấu trúc, dạng bảng, tạo cơ hội cho việc chuyển đổi và phân tích thêm. Các công cụ trực quan hóa như Tableau hoặc Energy BI có thể được kết nối với dữ liệu này để lập biểu đồ và bảng thông tin theo thời gian thực.
Hơn nữa, chủ đề cảnh báo có thể được các khách hàng khác sử dụng để bắt đầu các hành động dựa trên thông báo mà nó lưu giữ, chẳng hạn như kích hoạt hệ thống điều hòa không khí hoặc kích hoạt các giao thức an toàn hỏa hoạn.
Để theo dõi hướng dẫn, bạn có thể sao chép phần sau kho lưu trữ. Docker-compose.yml được đặt trong thư mục gốc của dự án để bạn có thể khởi tạo ứng dụng nhiều vùng chứa. Hơn nữa, bạn có thể tìm thấy hướng dẫn chi tiết trong tệp README.
Sự cố với cổng Kafka trong docker-compose.yml
Ban đầu, tôi gặp phải vấn đề với cấu hình cổng của Kafka khi sử dụng hình ảnh Kafka Docker hợp lưu, một lựa chọn phổ biến cho các thiết lập như vậy. Vấn đề này trở nên rõ ràng thông qua nhật ký, nhấn mạnh tầm quan trọng của việc không chạy docker-compose up ở chế độ tách rời (-d) trong giai đoạn thiết lập và khắc phục sự cố ban đầu.
Nguyên nhân thất bại là do các máy chủ bên trong và bên ngoài sử dụng cùng một cổng, dẫn đến sự cố kết nối. Tôi đã sửa lỗi này bằng cách thay đổi cổng nội bộ thành 19092. Tôi đã tìm thấy cái này bài đăng trên weblog khá rõ ràng.
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092
Định cấu hình Flink ở Chế độ phiên
Để chạy Flink trong chế độ phiên (cho phép nhiều công việc trong một cụm duy nhất), tôi đang sử dụng các lệnh sau trong docker-compose.yml.
Hình ảnh Docker tùy chỉnh cho PyFlink
Do những hạn chế của hình ảnh Apache Flink Docker mặc định, không bao gồm hỗ trợ Python, tôi đã tạo một hình ảnh Docker tùy chỉnh cho pyFlink. Hình ảnh tùy chỉnh này đảm bảo rằng Flink có thể chạy các công việc Python và bao gồm các phần phụ thuộc cần thiết để tích hợp với Kafka và PostgreSQL. Dockerfile được sử dụng cho việc này nằm trong thư mục con pyflink.
- Hình ảnh cơ sở: Chúng ta bắt đầu với hình ảnh Flink chính thức.
- Cài đặt Python: Python và pip đã được cài đặt, nâng cấp pip lên phiên bản mới nhất.
- Quản lý phụ thuộc: Các phần phụ thuộc được cài đặt thông qua require.txt. Ngoài ra, các dòng được nhận xét để minh họa cách cài đặt thủ công các phần phụ thuộc từ tệp cục bộ, hữu ích cho việc triển khai trong môi trường không có truy cập web.
- Thư viện kết nối: Trình kết nối cho Kafka và PostgreSQL được tải trực tiếp vào thư mục lib của Flink. Điều này cho phép Flink tương tác với Kafka và PostgreSQL trong quá trình thực hiện công việc.
- Sao chép tập lệnh: Các tập lệnh từ kho lưu trữ được sao chép vào thư mục /decide/flink để trình quản lý tác vụ Flink thực thi.
Với hình ảnh Docker tùy chỉnh này, chúng tôi đảm bảo pyFlink có thể chạy đúng cách trong bộ chứa Docker, được trang bị các thư viện cần thiết để tương tác liền mạch với Kafka và PostgreSQL. Cách tiếp cận này mang lại sự linh hoạt và phù hợp cho cả môi trường phát triển và sản xuất.
Ghi chú: Đảm bảo rằng mọi cân nhắc về mạng hoặc bảo mật khi tải xuống trình kết nối và các phần phụ thuộc khác đều được giải quyết theo chính sách của môi trường triển khai của bạn.
Tích hợp PostgreSQL
Để kết nối Apache Flink với cơ sở dữ liệu PostgreSQL, cần có trình kết nối JDBC thích hợp. Hình ảnh Docker tùy chỉnh cho pyFlink tải xuống trình kết nối JDBC cho PostgreSQL, tương thích với PostgreSQL 16.
Để đơn giản hóa quy trình này, tập lệnh download_libs.sh được bao gồm trong kho lưu trữ, phản ánh các hành động được thực hiện trong vùng chứa Flink Docker. Tập lệnh này tự động tải xuống các thư viện cần thiết, đảm bảo tính nhất quán giữa Docker và môi trường cục bộ.
Ghi chú: Trình kết nối thường có hai phiên bản. Trong trường hợp cụ thể này, vì tôi đang sử dụng Flink 1.18, phiên bản ổn định mới nhất hiện có nên tôi đã tải xuống 3.1.2–1.18. Tôi đoán là phiên bản đầu tiên theo dõi việc triển khai JDBC cho một số cơ sở dữ liệu. Chúng có sẵn trong thư mục maven.
env.add_jars(
f"file://{current_dir}/flink-connector-jdbc-3.1.2–1.18.jar",
f"file://{current_dir}/postgresql-42.7.3.jar"
)
Xác định phần chìm JDBC
Trong tác vụ Flink của chúng tôi, có một hàm quan trọng có tên configure_postgre_sink nằm trong tệp usr_jobs/postgres_sink.py. Hàm này chịu trách nhiệm định cấu hình phần chìm PostgreSQL chung. Để sử dụng nó một cách hiệu quả, bạn cần cung cấp câu lệnh Ngôn ngữ thao tác dữ liệu SQL (DML) và các loại giá trị tương ứng. Các loại được sử dụng trong dữ liệu phát trực tuyến được định nghĩa là TYPE_INFO… tôi phải mất một thời gian mới đưa ra khai báo chính xác 😅.
Cũng lưu ý rằng JdbcSink có một tham số tùy chọn để xác định ExecutionOptions. Đối với trường hợp cụ thể này, tôi sẽ sử dụng khoảng thời gian cập nhật là 1 giây và giới hạn số lượng hàng ở mức 200. Bạn có thể tìm thêm thông tin trong phần tài liệu chính thức. Vâng, bạn đã đoán được rồi, vì tôi đang xác định một khoảng thời gian, nên đây có thể được coi là một ETL vi lô. Tuy nhiên, do tính tune tune của Flink, bạn có thể xử lý nhiều luồng cùng một lúc bằng một tập lệnh đơn giản, đồng thời, dễ theo dõi.
Ghi chú: Đừng quên tạo bảng raw_sensors_data trong Postgres, nơi sẽ nhận dữ liệu thô từ cảm biến IoT. Điều này được đề cập trong hướng dẫn từng bước trong các phần bên dưới.
Chìm dữ liệu vào Kafka
Tôi đã trình bày cách sử dụng dữ liệu từ một chủ đề Kafka trong cuộc thảo luận trước đó. Tuy nhiên, tôi vẫn chưa định cấu hình bồn rửa và đó là những gì chúng ta sẽ làm. Cấu hình này có một số điều phức tạp và được xác định trong một hàm, tương tự như phần chìm của Postgres. Ngoài ra, bạn phải xác định loại cho luồng dữ liệu trước khi đưa nó vào Kafka. Lưu ý rằng luồng Alarms_data được truyền chính xác dưới dạng một chuỗi có đầu ra_type=Sorts.STRING() trước khi chuyển nó vào Kafka, vì tôi đã khai báo bộ tuần tự hóa là SimpleStringSchema().
Tôi sẽ chỉ cho bạn cách tìm nạp dữ liệu từ chủ đề cảnh báo trong các bước sau.
Cấu hình cục bộ hoặc được chứa trong vùng chứa
Một trong những điều tuyệt vời nhất về cấu hình docker này là bạn có thể chạy Flink từ cục bộ hoặc bên trong vùng chứa dưới dạng tác vụ được quản lý. Thiết lập Flink cục bộ được mô tả trong hình sau, trong đó bạn có thể thấy ứng dụng Flink của chúng tôi được tách ra khỏi vùng chứa docker. Điều này có thể giúp khắc phục sự cố Flink không có bộ công cụ quan sát gốc tốt. Trên thực tế, chúng tôi muốn thử datorios công cụ cho Flink, chúng rất hứa hẹn cho mục đích giám sát.
Nếu bạn muốn dùng thử ứng dụng Flink cục bộ, bạn phải xác định chính xác các máy chủ và cổng được tập lệnh sử dụng. Thực tế đây là hai hằng số trong tệp usr_jobs/postgres_sink.py:
Để chạy container, hãy sử dụng:
KAFKA_HOST = "kafka:19092"
POSTGRES_HOST = "postgres:5432"
Để chạy cục bộ, hãy sử dụng:
KAFKA_HOST = "localhost:9092"
POSTGRES_HOST = "localhost:5432"
Theo mặc định, repo thiết lập ứng dụng Flink để chạy bên trong vùng chứa. Bạn có thể giám sát các công việc đang chạy bằng giao diện người dùng net, truy cập từ http://localhost:8081. Bạn sẽ không thể nhìn thấy nó nếu bạn chọn chạy công việc cục bộ.
Ghi chú: Nếu bạn chạy công việc cục bộ, bạn cần cài đặt các phần phụ thuộc Flink có trong tệp require.txt. Ngoài ra, tệp pyproject.toml cũng được cung cấp nếu bạn muốn thiết lập môi trường bằng thơ.
[ad_2]
Source link