Kafka Message Check
Table of Contents:
- [b] REF
kafka official docs getting start 시작하기: [[Quick Start]]
- airflow url
{MASTER_IP}:8080
- Topic 만들기
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
- producer
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event
- consumer
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 This is my first event This is my second event
Connection
## Airflow Kafka 통합
- [b] REF
Kafka 연산자
- Consume: 메세지를 사용하다
- tasks
consume_task = ConsumeFromTopicOperator( task_id='consume_from_topic', topic='your_topic', apply_function=process_messages_function )
- Produce: 메세지를 생성
- tasks
produce_task = ProduceToTopicOperator( task_id='produce_to_topic', topic='your_topic', producer_function=create_messages_function )
Airflow Dag와 연결
- [?] 연결 확인 용도로 쓰이는건가 ?
- tasks
# Example of using KafkaConsumerOperator consume_task = KafkaConsumerOperator( task_id='consume_kafka_topic', topic='your_topic', consumer_config={'bootstrap.servers': 'localhost:9092'} )
Airflow의 Kafka producer 사용해 메세지 생성
- tasks ```python from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
def producer_function(**context): return 'key', 'message to send'
produce_task = ProduceToTopicOperator( task_id='produce_to_kafka_topic', producer_function=producer_function, kafka_conn_id='kafka_default', topic='your_topic' )
### Airflow의 Kafka *consumer* 사용해 메세지 사용
```bash
pip install apache-airflow-providers-apache-kafka
- tasks ```python from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
consume_task = ConsumeFromTopicOperator( task_id='consume_from_topic', topic='your_topic', apply_function=your_processing_function, max_messages=10, kafka_conn_id='kafka_default' )
- Dags
```python
# Example DAG to consume messages from a Kafka topic
from airflow import DAG
from airflow.utils.dates import days_ago
with DAG('kafka_consumer_example',
start_date=days_ago(1),
schedule_interval=None) as dag:
consume_task
Airflow 연동에 필요한 패키지
![[Pasted image 20240201153446.png]]
- [b] REF
https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html
pip install apache-airflow==2.8.1
pip install confluent-kafka
pip install asgiref
버전 업그레이드에 따른 Parameter key값 변경
- [b] REF
https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/stable/_api/airflow/providers/apache/kafka/operators/consume/index.html
pip install apache-airflow-providers-google
- [b] REF
- airflow url
{MASTER_IP}:8080
- Topic 만들기
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
- producer
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 This is my first event This is my second event
- consumer
bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 This is my first event This is my second event
Connection
## Airflow Kafka 통합
- [b] REF
Kafka 연산자
- Consume: 메세지를 사용하다
- tasks
consume_task = ConsumeFromTopicOperator( task_id='consume_from_topic', topic='your_topic', apply_function=process_messages_function )
- Produce: 메세지를 생성
- tasks
produce_task = ProduceToTopicOperator( task_id='produce_to_topic', topic='your_topic', producer_function=create_messages_function )
Airflow Dag와 연결
- [?] 연결 확인 용도로 쓰이는건가 ?
- tasks
# Example of using KafkaConsumerOperator consume_task = KafkaConsumerOperator( task_id='consume_kafka_topic', topic='your_topic', consumer_config={'bootstrap.servers': 'localhost:9092'} )
Airflow의 Kafka producer 사용해 메세지 생성
- tasks ```python from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
def producer_function(**context): return 'key', 'message to send'
produce_task = ProduceToTopicOperator( task_id='produce_to_kafka_topic', producer_function=producer_function, kafka_conn_id='kafka_default', topic='your_topic' )
### Airflow의 Kafka *consumer* 사용해 메세지 사용
```bash
pip install apache-airflow-providers-apache-kafka
- tasks ```python from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
consume_task = ConsumeFromTopicOperator( task_id='consume_from_topic', topic='your_topic', apply_function=your_processing_function, max_messages=10, kafka_conn_id='kafka_default' )
- Dags
```python
# Example DAG to consume messages from a Kafka topic
from airflow import DAG
from airflow.utils.dates import days_ago
with DAG('kafka_consumer_example',
start_date=days_ago(1),
schedule_interval=None) as dag:
consume_task
Airflow 연동에 필요한 패키지
!Pasted image 20240201153446.png
- [b] REF
https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html
pip install apache-airflow==2.8.1
pip install confluent-kafka
pip install asgiref
버전 업그레이드에 따른 Parameter key값 변경
- [b] REF
https://airflow.apache.org/docs/apache-airflow-providers-apache-kafka/stable/_api/airflow/providers/apache/kafka/operators/consume/index.html
pip install apache-airflow-providers-google
About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.