The long adventure to success Pipeline...3) Do I need 'hostaliases' for connected Public?
Table of Contents:
[!tip]
The Answer is "NONONONONONO"
Project Name: Hostaliases 설정 및 spark 코드 작성
#KAFKA_SPARK_DOCKER_COMPOSE
version: '2'
networks:
common-network:
external: true
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.15
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- common-network
broker:
image: confluentinc/cp-kafka:6.1.15
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "19092:19092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://broker:9092,EXTERNAL://133.186.244.202:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
networks:
- common-network
spark:
image: seunghyejeong/spark:1.0
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
- SPARK_MASTER_OPTS="-Dspark.rpc.message.maxSize=512"
ports:
- '8080:8080'
- '7077:7077'
networks:
- common-network
spark-worker:
image: seunghyejeong/spark:1.0
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://bami-cluster2:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
- SPARK_PUBLIC_DNS=bami-cluster2
- SPARK_WORKER_OPTS="-Dspark.rpc.message.maxSize=512"
networks:
- common-network
helm upgrade airflow apache-airflow/airflow --namespace airflow -f custom_values.yaml
sudo docker compose exec broker kafka-topics --create --topic devices --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
- 확인하기
sudo docker compose exec broker kafka-topics --describe --topic devices --bootstrap-server broker:9092
kafka-console-producer --topic devices --bootstrap-server broker:9092
- 예제 토픽
{"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502", "eventOffset": 10001, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D001", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode cluster \
--conf <key>=<value> \
... # 다른 옵션
<application-jar> \
[application-arguments]
BUG: FileNotFoundError: [Errno 2] No such file or directory: '/home/airflow/.local/python/pyspark/shell.py' && /home/airflow/.local/bin/load-spark-env.sh: line 68: ps: command not found
-
SPARK_HOME을 다른 경로로 수정하기
ENV SPARK_HOME=/home/airflow/.local/assembly/target/scala-2.12
airflow webserver-ui pod
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Connection
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from airflow.hooks.base_hook import BaseHook
bootstrap_servers = '125.6.40.186:19092'
def check_connected_kafka():
try:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
print("####SUCCESS CONNECTING KAFKA#######")
return True
except Exception as e:
print(f"Error connecting to Kafka: {str(e)}")
return False
def check_spark_connection():
try:
spark_conn = BaseHook.get_connection("spark_sql_default")
spark_master_url = spark_conn.host+spark_conn.port
spark = SparkSession.builder \
.appName("Spark Connection Test") \
.master(spark_master_url) \
.getOrCreate()
if spark:
print("Spark connection successful!")
else:
print("Failed to establish Spark connection!")
except Exception as e:
print(f"Error occurred while checking Spark connection: {str(e)}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
with DAG('spark_connection_check', default_args=default_args, schedule_interval=None) as dag:
check_connected_kafka_task = PythonOperator(
task_id='check_connected_kafka',
python_callable=check_connected_kafka,
dag=dag,
)
# Define the task to check Spark connection
check_spark_conn_task = PythonOperator(
task_id='check_spark_connection',
python_callable=check_spark_connection,
dag=dag,
)
# Set the task dependencies
check_connected_kafka_task >> check_spark_conn_task
결론
나는 외부와 통신하기 위해서는 Hostaliases를 꼭 써야하는줄 알았다..
[!tip]
The Answer is "NONONONONONO"
Project Name: Hostaliases 설정 및 spark 코드 작성
#KAFKA_SPARK_DOCKER_COMPOSE
version: '2'
networks:
common-network:
external: true
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.1.15
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- common-network
broker:
image: confluentinc/cp-kafka:6.1.15
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "19092:19092"
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'INTERNAL://broker:9092,EXTERNAL://133.186.244.202:19092'
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
networks:
- common-network
spark:
image: seunghyejeong/spark:1.0
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
- SPARK_MASTER_OPTS="-Dspark.rpc.message.maxSize=512"
ports:
- '8080:8080'
- '7077:7077'
networks:
- common-network
spark-worker:
image: seunghyejeong/spark:1.0
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://bami-cluster2:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
- SPARK_USER=spark
- SPARK_PUBLIC_DNS=bami-cluster2
- SPARK_WORKER_OPTS="-Dspark.rpc.message.maxSize=512"
networks:
- common-network
helm upgrade airflow apache-airflow/airflow --namespace airflow -f custom_values.yaml
sudo docker compose exec broker kafka-topics --create --topic devices --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
- 확인하기
sudo docker compose exec broker kafka-topics --describe --topic devices --bootstrap-server broker:9092
kafka-console-producer --topic devices --bootstrap-server broker:9092
- 예제 토픽
{"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502", "eventOffset": 10001, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D001", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode cluster \
--conf <key>=<value> \
... # 다른 옵션
<application-jar> \
[application-arguments]
BUG: FileNotFoundError: [Errno 2] No such file or directory: '/home/airflow/.local/python/pyspark/shell.py' && /home/airflow/.local/bin/load-spark-env.sh: line 68: ps: command not found
-
SPARK_HOME을 다른 경로로 수정하기
ENV SPARK_HOME=/home/airflow/.local/assembly/target/scala-2.12
airflow webserver-ui pod
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.models import Connection
from datetime import datetime
from airflow.operators.python_operator import PythonOperator
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from airflow.hooks.base_hook import BaseHook
bootstrap_servers = '125.6.40.186:19092'
def check_connected_kafka():
try:
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
print("####SUCCESS CONNECTING KAFKA#######")
return True
except Exception as e:
print(f"Error connecting to Kafka: {str(e)}")
return False
def check_spark_connection():
try:
spark_conn = BaseHook.get_connection("spark_sql_default")
spark_master_url = spark_conn.host+spark_conn.port
spark = SparkSession.builder \
.appName("Spark Connection Test") \
.master(spark_master_url) \
.getOrCreate()
if spark:
print("Spark connection successful!")
else:
print("Failed to establish Spark connection!")
except Exception as e:
print(f"Error occurred while checking Spark connection: {str(e)}")
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1,
}
with DAG('spark_connection_check', default_args=default_args, schedule_interval=None) as dag:
check_connected_kafka_task = PythonOperator(
task_id='check_connected_kafka',
python_callable=check_connected_kafka,
dag=dag,
)
# Define the task to check Spark connection
check_spark_conn_task = PythonOperator(
task_id='check_spark_connection',
python_callable=check_spark_connection,
dag=dag,
)
# Set the task dependencies
check_connected_kafka_task >> check_spark_conn_task
결론
나는 외부와 통신하기 위해서는 Hostaliases를 꼭 써야하는줄 알았다..
About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.