Test networking between each Container

Table of Contents:

python library for airflow

#airflowpip #airflowlibrary

문제

airflow ui > connection에 있는 conn typeapache kafka가 없음.

  • 위의 링크에서 찾은 라이브러리를 추가해줌
pip install airflow-provider-kafka
pip install apache-airflow-providers-apache-kafka==1.3.1
pip install confluent-kafka

해결쓰

  • 드루와

![[Pasted image 20240207105155.png]]

error

bash:

airflow webserver -p 8080

get error:

TypeError: SqlAlchemySessionInterface.__init__() missing 4 required positional arguments: 'sequence', 'schema', 'bind_key', and 'sid_length'

![[Pasted image 20240207095631.png]]

solve:

pip install Flask-Session==0.5.0 

network check

  • [b] REF from GTP #networkcheck #nc_vz #telnet ```markdown Test Communication: Once you have the IP addresses, you can test communication between the containers. You can use tools like curl or telnet to test connectivity over specific ports.

For example, on vm1, you can run: curl <vm2_container_ip>:<port>

And on vm2, you can run: curl <vm1_container_ip>:<port>

Replace <vm2_container_ip> and <port> with the IP address and port of the container in vm2, and <vm1_container_ip> and <port> with the IP address and port of the container in vm1.

Netcat Test:

  • Install netcat (nc) on both vm1 and vm2 if not already installed.
  • From vm2, run nc -zv <vm1_public_ip> <container1_port>.
  • If you see a successful connection message, it confirms that the communication is established.
  • ```

1. vm1 🔗 vm2

$ telnet {VM1orVM2_MASTER_IP} 22
  • airflow vm에서 kafka vm 통신 체크
$ telnet 133.186.240.216 22
Trying 133.186.240.216...
Connected to 133.186.240.216.
Escape character is '^]'.
SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4
  • kafka vm에서 airflow vm 통신 체크
$ telnet 133.186.155.37 22
Trying 133.186.155.37...
Connected to 133.186.155.37.
Escape character is '^]'.
SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4

2. vm1 - vm2's docker container

$ nc -vz {vm2_MASTER_IP} {OPENPORT} #내가 확인하고 싶은 포트
  • airflow vm에서 kafka의 컨테이너 포트로 연결 확인
nc -vz 133.186.240.216 19092
Connection to 133.186.240.216 19092 port [tcp/*] succeeded!
  • kafka container 안에서 airflow vm
nc -vz 133.186.155.37 8080
Ncat: Version 7.92 ( https://nmap.org/ncat )
Ncat: Connected to 133.186.155.37:8080.
Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.

⭐ Kafka Container 안에서 bootstrap-server connect

[appuser@broker ~]$ kafka-topics --list --bootstrap-server 133.186.240.216:19092
__consumer_offsets
my-topic

[appuser@broker ~]$ kafka-topics --list --bootstrap-server broker:9092
__consumer_offsets
my-topic

python Example (최종)

AwaitKafkaMessageOperator
ConsumeFromTopicOperator
    pip install apache-airflow-providers-apache-kafka==1.3.1
ProduceToTopicOperator
    pip install apache-airflow-providers-apache-kafka==1.3.1
PythonOperator

kafka-console-producer –topic my-topic –bootstrap-server 133.186.240.216:19092

kafka-console-consumer –topic my-topic –bootstrap-server 133.186.240.216:19092

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from datetime import datetime
from airflow.models import TaskInstance

# Define Kafka broker
bootstrap_servers = '133.186.240.216:19092'
topic_name = 'airflowtopic6'

# Function to create Kafka topic
def create_topic():
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
    admin_client.create_topics([topic])

# Function to check if Kafka is connected
def check_connected():
    try:
        producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
        consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers)
        return True
    except Exception as e:
        print(f"Error connecting to Kafka: {str(e)}")
        return False

# Function to produce a message to Kafka
def produce_message():
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    message = f"Hello from Airflow. Message produced at {datetime.now()}"
    producer.send(topic_name, message.encode())
    producer.flush()

# Function to consume a message from Kafka
def consume_message(**context):
    consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
    try:
        message = next(consumer)
        print(f"Message consumed: {message.value.decode()}")
        # Mark the task as success
        context['task_instance'].log.info("Message consumed successfully")
        context['task_instance'].state = "success"
    except StopIteration:
        # No message available, mark the task as failed
        context['task_instance'].log.info("No message available to consume")
        context['task_instance'].state = "failed"
    finally:
        # Close the consumer to release resources
        consumer.close()



# Function to finish
def finish():
    print("All tasks completed.")

# Define the DAG
default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': datetime(2024, 2, 7),
    'retries': 1,
}

dag = DAG(
    'airflow_kafka_interworking',
    default_args=default_args,
    description='DAG for testing interworking of Airflow and Kafka',
    schedule_interval=None,
)

# Define tasks
create_topic_task = PythonOperator(
    task_id='create_topic',
    python_callable=create_topic,
    dag=dag,
)

check_connected_task = PythonOperator(
    task_id='check_connected',
    python_callable=check_connected,
    dag=dag,
)

produce_message_task = PythonOperator(
    task_id='produce_message',
    python_callable=produce_message,
    dag=dag,
)

consume_message_task = PythonOperator(
    task_id='consume_message',
    python_callable=consume_message,
    dag=dag,
)

finish_task = PythonOperator(
    task_id='finish',
    python_callable=finish,
    dag=dag,
)

# Define task dependencies
create_topic_task >> check_connected_task >> produce_message_task >> consume_message_task >> finish_task


끝~

살펴보면 좋은 글 ref: kafka의 데이터 저장

마치며..

network가 어떻게 돌아가는지 이미지를 연상하여 루트를 그려 볼 것.
통신 테스트를 하는 workflow를 보고 있으니 일단 내부 아이피로 외부에서 접근 하려는 시도도 있고, 외부 아이피가 어디서 쓰이는지, 어떤걸 대변하는지도 잘 몰랐던 것 같음.
외부에서 접근 할 수 있는 public ip는 해당 vm의 내부 아이피를 대변 하는 것 이다.
airflow나 kafka나 설정 하는 것은 그닥 어렵지 않았는데 네트쿼그가 문제라고 생각했지..ㅋ
그리고 python 간단한 예제는 만들 수 있는 정도로 배우고 싶다. gtp말고 ㅠㅠ

python library for airflow

#airflowpip #airflowlibrary

문제

airflow ui > connection에 있는 conn typeapache kafka가 없음.

  • 위의 링크에서 찾은 라이브러리를 추가해줌
pip install airflow-provider-kafka
pip install apache-airflow-providers-apache-kafka==1.3.1
pip install confluent-kafka

해결쓰

  • 드루와

!Pasted image 20240207105155.png

error

bash:

airflow webserver -p 8080

get error:

TypeError: SqlAlchemySessionInterface.__init__() missing 4 required positional arguments: 'sequence', 'schema', 'bind_key', and 'sid_length'

!Pasted image 20240207095631.png

solve:

pip install Flask-Session==0.5.0 

network check

  • [b] REF from GTP #networkcheck #nc_vz #telnet ```markdown Test Communication: Once you have the IP addresses, you can test communication between the containers. You can use tools like curl or telnet to test connectivity over specific ports.

For example, on vm1, you can run: curl <vm2_container_ip>:<port>

And on vm2, you can run: curl <vm1_container_ip>:<port>

Replace <vm2_container_ip> and <port> with the IP address and port of the container in vm2, and <vm1_container_ip> and <port> with the IP address and port of the container in vm1.

Netcat Test:

  • Install netcat (nc) on both vm1 and vm2 if not already installed.
  • From vm2, run nc -zv <vm1_public_ip> <container1_port>.
  • If you see a successful connection message, it confirms that the communication is established.
  • ```

1. vm1 🔗 vm2

$ telnet {VM1orVM2_MASTER_IP} 22
  • airflow vm에서 kafka vm 통신 체크
$ telnet 133.186.240.216 22
Trying 133.186.240.216...
Connected to 133.186.240.216.
Escape character is '^]'.
SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4
  • kafka vm에서 airflow vm 통신 체크
$ telnet 133.186.155.37 22
Trying 133.186.155.37...
Connected to 133.186.155.37.
Escape character is '^]'.
SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.4

2. vm1 - vm2's docker container

$ nc -vz {vm2_MASTER_IP} {OPENPORT} #내가 확인하고 싶은 포트
  • airflow vm에서 kafka의 컨테이너 포트로 연결 확인
nc -vz 133.186.240.216 19092
Connection to 133.186.240.216 19092 port [tcp/*] succeeded!
  • kafka container 안에서 airflow vm
nc -vz 133.186.155.37 8080
Ncat: Version 7.92 ( https://nmap.org/ncat )
Ncat: Connected to 133.186.155.37:8080.
Ncat: 0 bytes sent, 0 bytes received in 0.01 seconds.

⭐ Kafka Container 안에서 bootstrap-server connect

[appuser@broker ~]$ kafka-topics --list --bootstrap-server 133.186.240.216:19092
__consumer_offsets
my-topic

[appuser@broker ~]$ kafka-topics --list --bootstrap-server broker:9092
__consumer_offsets
my-topic

python Example (최종)

AwaitKafkaMessageOperator
ConsumeFromTopicOperator
    pip install apache-airflow-providers-apache-kafka==1.3.1
ProduceToTopicOperator
    pip install apache-airflow-providers-apache-kafka==1.3.1
PythonOperator

kafka-console-producer –topic my-topic –bootstrap-server 133.186.240.216:19092

kafka-console-consumer –topic my-topic –bootstrap-server 133.186.240.216:19092

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
from datetime import datetime
from airflow.models import TaskInstance

# Define Kafka broker
bootstrap_servers = '133.186.240.216:19092'
topic_name = 'airflowtopic6'

# Function to create Kafka topic
def create_topic():
    admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
    topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
    admin_client.create_topics([topic])

# Function to check if Kafka is connected
def check_connected():
    try:
        producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
        consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers)
        return True
    except Exception as e:
        print(f"Error connecting to Kafka: {str(e)}")
        return False

# Function to produce a message to Kafka
def produce_message():
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
    message = f"Hello from Airflow. Message produced at {datetime.now()}"
    producer.send(topic_name, message.encode())
    producer.flush()

# Function to consume a message from Kafka
def consume_message(**context):
    consumer = KafkaConsumer(topic_name, bootstrap_servers=bootstrap_servers, auto_offset_reset='earliest')
    try:
        message = next(consumer)
        print(f"Message consumed: {message.value.decode()}")
        # Mark the task as success
        context['task_instance'].log.info("Message consumed successfully")
        context['task_instance'].state = "success"
    except StopIteration:
        # No message available, mark the task as failed
        context['task_instance'].log.info("No message available to consume")
        context['task_instance'].state = "failed"
    finally:
        # Close the consumer to release resources
        consumer.close()



# Function to finish
def finish():
    print("All tasks completed.")

# Define the DAG
default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': datetime(2024, 2, 7),
    'retries': 1,
}

dag = DAG(
    'airflow_kafka_interworking',
    default_args=default_args,
    description='DAG for testing interworking of Airflow and Kafka',
    schedule_interval=None,
)

# Define tasks
create_topic_task = PythonOperator(
    task_id='create_topic',
    python_callable=create_topic,
    dag=dag,
)

check_connected_task = PythonOperator(
    task_id='check_connected',
    python_callable=check_connected,
    dag=dag,
)

produce_message_task = PythonOperator(
    task_id='produce_message',
    python_callable=produce_message,
    dag=dag,
)

consume_message_task = PythonOperator(
    task_id='consume_message',
    python_callable=consume_message,
    dag=dag,
)

finish_task = PythonOperator(
    task_id='finish',
    python_callable=finish,
    dag=dag,
)

# Define task dependencies
create_topic_task >> check_connected_task >> produce_message_task >> consume_message_task >> finish_task


끝~

살펴보면 좋은 글 ref: kafka의 데이터 저장

마치며..

network가 어떻게 돌아가는지 이미지를 연상하여 루트를 그려 볼 것.
통신 테스트를 하는 workflow를 보고 있으니 일단 내부 아이피로 외부에서 접근 하려는 시도도 있고, 외부 아이피가 어디서 쓰이는지, 어떤걸 대변하는지도 잘 몰랐던 것 같음.
외부에서 접근 할 수 있는 public ip는 해당 vm의 내부 아이피를 대변 하는 것 이다.
airflow나 kafka나 설정 하는 것은 그닥 어렵지 않았는데 네트쿼그가 문제라고 생각했지..ㅋ
그리고 python 간단한 예제는 만들 수 있는 정도로 배우고 싶다. gtp말고 ㅠㅠ

About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.