The long adventure to success Pipeline...4
Table of Contents:
Connetion type: spark_conn
from __future__ import annotations
import typing import Any, Callable
import pendulum
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_pyspark():
@task.pyspark(conn_id="spark-default")
def spark_task(spark: SparkSession, sc: SparkContext) -> pd.DataFrame:
df = spark.createDataFrame(
[
(1, "John Doe", 21),
(2, "Jane Doe", 22),
(3, "Joe Bloggs", 23),
],
["id", "name", "age"],
)
df.show()
return df.toPandas()
@task
def print_df(df: pd.DataFrame):
print(df)
df = spark_task()
print_df(df)
# work around pre-commit
dag = example_pyspark() # type: ignore
test_run = get_test_run(dag)
from __future__ import annotations
import logging
import os
from datetime import timedelta
from typing import TYPE_CHECKING, Callable
from tabulate import tabulate
from airflow.utils.state import State
if TYPE_CHECKING:
from airflow.utils.context import Context
def get_test_run(dag):
def callback(context: Context):
try:
ti = context["dag_run"].get_task_instances()
if not ti:
logging.warning("Could not retrieve tasks that ran in the DAG, cannot display a summary")
return
ti.sort(key=lambda x: x.end_date)
headers = ["Task ID", "Status", "Duration (s)"]
results = []
prev_time = ti[0].end_date - timedelta(seconds=ti[0].duration)
for t in ti:
results.append([t.task_id, t.state, f"{(t.end_date - prev_time).total_seconds():.1f}"])
prev_time = t.end_date
logging.info("EXECUTION SUMMARY:\n" + tabulate(results, headers=headers, tablefmt="fancy_grid"))
except Exception as e:
logging.error(f"Error occurred during callback execution: {e}")
def add_callback(current: List[Callable], new: Callable) -> List[Callable]:
return current + [new]
def test_run():
dag.on_failure_callback = add_callback(dag.on_failure_callback or [], callback)
dag.on_success_callback = add_callback(dag.on_success_callback or [], callback)
dag.clear(dag_run_state=State.QUEUED)
dag.run()
return test_run
def get_test_env_id(env_var_name: str = "SYSTEM_TESTS_ENV_ID"):
return os.environ.get(env_var_name)
pandas~=2.0.3
requests~=2.31.0
selenium~=4.17.2
beautifulsoup4~=4.12.3
lxml~=5.1.0
virtualenv
kafka-python~=2.0.2
apache-airflow-providers-apache-kafka~=1.3.1
confluent-kafka~=2.3.0
apache-airflow-providers-apache-spark[cncf.kubernetes]
py4j==0.10.9.5
pyspark
grpcio-status>=1.59.0
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 export PATH=$PATH:$JAVA_HOME/bin
export PYSPARK_SUBMIT_ARGS="--master local[3] pyspark-shell"
spark.. client, cluster mode,
connection type: sql, submit, jdbc..
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
def initialize_spark_session():
spark = SparkSession \
.builder \
.appName("pipeline") \
.config("spark.streaming.stopGracefullyOnShutdown", True) \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
.config("spark.sql.shuffle.partitions", 4) \
.master("local[*]") \
.getOrCreate()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 5),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Instantiate the DAG
dag = DAG(
'spark_data_pipeline',
default_args=default_args,
description='A DAG to process data from Kafka using Spark',
schedule_interval=None,
)
# Define tasks
initialize_spark_task = PythonOperator(
task_id='initialize_spark_session',
python_callable=initialize_spark_session,
dag=dag,
)
# Define Kafka connection properties
kafka_params = {
"kafka.bootstrap.servers": "125.6.40.186:19092",
"subscribe": "devices",
"startingOffsets": "earliest"
}
# Define JSON Schema
json_schema = StructType([
StructField('customerId', StringType(), True),
StructField('data', StructType([
StructField('devices', ArrayType(StructType([
StructField('deviceId', StringType(), True),
StructField('measure', StringType(), True),
StructField('status', StringType(), True),
StructField('temperature', LongType(), True)
]), True), True)
]), True),
StructField('eventId', StringType(), True),
StructField('eventOffset', LongType(), True),
StructField('eventPublisher', StringType(), True),
StructField('eventTime', StringType(), True)
])
# Read Kafka messages
streaming_df = spark \
.readStream \
.format("kafka") \
.options(**kafka_params) \
.load()
# Parse JSON messages
json_df = streaming_df.selectExpr("CAST(value AS STRING) AS value") \
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")
#print(json_schema)
#json_expanded_df.printSchema()
from pyspark.sql.functions import explode, col
exploded_df = json_expanded_df \
.select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data") \
.withColumn("devices", explode("data.devices")) \
.drop("data")
#exploded_df.printSchema()
#exploded_df.show
flattened_df = exploded_df \
.selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "cast(eventTime as timestamp) as eventTime",
"devices.deviceId as deviceId", "devices.measure as measure",
"devices.status as status", "devices.temperature as temperature")
#flattened_df.printSchema()
# Aggregate the dataframes to find the average temparature
# per Customer per device throughout the day for SUCCESS events
from pyspark.sql.functions import to_date, avg
agg_df = flattened_df.where("STATUS = 'SUCCESS'") \
.withColumn("eventDate", to_date("eventTime", "yyyy-MM-dd")) \
.groupBy("customerId","deviceId","eventDate") \
.agg(avg("temperature").alias("avg_temp"))
# Write the output to console sink to check the output
writing_df = agg_df.writeStream \
.format("console") \
.option("checkpointLocation","checkpoint_dir") \
.outputMode("complete") \
.start()
writing_df.awaitTermination()
airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
Please make a python code to connect the spark in airflow. Information about the spark is registered in the connection of airflow, so use it
Umm..
17:00 이제까지 생각을 잘 못 한듯. . . Airflow Dockerfile 이미지를 생성 할 때부터 이미 Airflow의 web-server에는 Spark가 설치 된 것..? 그래서 Kafka가 설치된 node2에 대해서 연결을 하려고 하면 port에 대한 관련 오류가 뜨기도 하고 .. Java가 어쩌고 저쩌고 ($JAVA_HOME이 잘 못 설정되어 있기도 함.) 문제는 나는 node2에 spark를 띄워 놓고 또 띄우려고 한 것인 셈이다. Kafka를 node2에 설치하고 거기서 메세지를 받아오는 형식으로 구축하다 보니 헷갈린 것 같다. 지금도 Dag pipeline을 실행하려니 오류가 뜨기는 하지만. .
17:20 master를 node2의 7077으로 하니까 Fail connect to master가 뜸. 그러면 Spark master, Spark worker는 띄워져 있어야 한다는 건데 ,
Caused by: java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.netty.RpcEndpointVerifier$CheckExistence; local class incompatible: stream classdesc serialVersionUID = 5378738997755484868, local class serialVersionUID = 7789290765573734431
spark가 겹칠 때
https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.12/3.3.0/spark-core_2.12-3.3.0.jar
아 아무튼 버전 묹네인거같다 자바 버전
Connetion type: spark_conn
from __future__ import annotations
import typing import Any, Callable
import pendulum
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_pyspark():
@task.pyspark(conn_id="spark-default")
def spark_task(spark: SparkSession, sc: SparkContext) -> pd.DataFrame:
df = spark.createDataFrame(
[
(1, "John Doe", 21),
(2, "Jane Doe", 22),
(3, "Joe Bloggs", 23),
],
["id", "name", "age"],
)
df.show()
return df.toPandas()
@task
def print_df(df: pd.DataFrame):
print(df)
df = spark_task()
print_df(df)
# work around pre-commit
dag = example_pyspark() # type: ignore
test_run = get_test_run(dag)
from __future__ import annotations
import logging
import os
from datetime import timedelta
from typing import TYPE_CHECKING, Callable
from tabulate import tabulate
from airflow.utils.state import State
if TYPE_CHECKING:
from airflow.utils.context import Context
def get_test_run(dag):
def callback(context: Context):
try:
ti = context["dag_run"].get_task_instances()
if not ti:
logging.warning("Could not retrieve tasks that ran in the DAG, cannot display a summary")
return
ti.sort(key=lambda x: x.end_date)
headers = ["Task ID", "Status", "Duration (s)"]
results = []
prev_time = ti[0].end_date - timedelta(seconds=ti[0].duration)
for t in ti:
results.append([t.task_id, t.state, f"{(t.end_date - prev_time).total_seconds():.1f}"])
prev_time = t.end_date
logging.info("EXECUTION SUMMARY:\n" + tabulate(results, headers=headers, tablefmt="fancy_grid"))
except Exception as e:
logging.error(f"Error occurred during callback execution: {e}")
def add_callback(current: List[Callable], new: Callable) -> List[Callable]:
return current + [new]
def test_run():
dag.on_failure_callback = add_callback(dag.on_failure_callback or [], callback)
dag.on_success_callback = add_callback(dag.on_success_callback or [], callback)
dag.clear(dag_run_state=State.QUEUED)
dag.run()
return test_run
def get_test_env_id(env_var_name: str = "SYSTEM_TESTS_ENV_ID"):
return os.environ.get(env_var_name)
pandas~=2.0.3
requests~=2.31.0
selenium~=4.17.2
beautifulsoup4~=4.12.3
lxml~=5.1.0
virtualenv
kafka-python~=2.0.2
apache-airflow-providers-apache-kafka~=1.3.1
confluent-kafka~=2.3.0
apache-airflow-providers-apache-spark[cncf.kubernetes]
py4j==0.10.9.5
pyspark
grpcio-status>=1.59.0
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64 export PATH=$PATH:$JAVA_HOME/bin
export PYSPARK_SUBMIT_ARGS="--master local[3] pyspark-shell"
spark.. client, cluster mode,
connection type: sql, submit, jdbc..
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
def initialize_spark_session():
spark = SparkSession \
.builder \
.appName("pipeline") \
.config("spark.streaming.stopGracefullyOnShutdown", True) \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
.config("spark.sql.shuffle.partitions", 4) \
.master("local[*]") \
.getOrCreate()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 5),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Instantiate the DAG
dag = DAG(
'spark_data_pipeline',
default_args=default_args,
description='A DAG to process data from Kafka using Spark',
schedule_interval=None,
)
# Define tasks
initialize_spark_task = PythonOperator(
task_id='initialize_spark_session',
python_callable=initialize_spark_session,
dag=dag,
)
# Define Kafka connection properties
kafka_params = {
"kafka.bootstrap.servers": "125.6.40.186:19092",
"subscribe": "devices",
"startingOffsets": "earliest"
}
# Define JSON Schema
json_schema = StructType([
StructField('customerId', StringType(), True),
StructField('data', StructType([
StructField('devices', ArrayType(StructType([
StructField('deviceId', StringType(), True),
StructField('measure', StringType(), True),
StructField('status', StringType(), True),
StructField('temperature', LongType(), True)
]), True), True)
]), True),
StructField('eventId', StringType(), True),
StructField('eventOffset', LongType(), True),
StructField('eventPublisher', StringType(), True),
StructField('eventTime', StringType(), True)
])
# Read Kafka messages
streaming_df = spark \
.readStream \
.format("kafka") \
.options(**kafka_params) \
.load()
# Parse JSON messages
json_df = streaming_df.selectExpr("CAST(value AS STRING) AS value") \
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")
#print(json_schema)
#json_expanded_df.printSchema()
from pyspark.sql.functions import explode, col
exploded_df = json_expanded_df \
.select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data") \
.withColumn("devices", explode("data.devices")) \
.drop("data")
#exploded_df.printSchema()
#exploded_df.show
flattened_df = exploded_df \
.selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "cast(eventTime as timestamp) as eventTime",
"devices.deviceId as deviceId", "devices.measure as measure",
"devices.status as status", "devices.temperature as temperature")
#flattened_df.printSchema()
# Aggregate the dataframes to find the average temparature
# per Customer per device throughout the day for SUCCESS events
from pyspark.sql.functions import to_date, avg
agg_df = flattened_df.where("STATUS = 'SUCCESS'") \
.withColumn("eventDate", to_date("eventTime", "yyyy-MM-dd")) \
.groupBy("customerId","deviceId","eventDate") \
.agg(avg("temperature").alias("avg_temp"))
# Write the output to console sink to check the output
writing_df = agg_df.writeStream \
.format("console") \
.option("checkpointLocation","checkpoint_dir") \
.outputMode("complete") \
.start()
writing_df.awaitTermination()
airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
Please make a python code to connect the spark in airflow. Information about the spark is registered in the connection of airflow, so use it
Umm..
17:00 이제까지 생각을 잘 못 한듯. . . Airflow Dockerfile 이미지를 생성 할 때부터 이미 Airflow의 web-server에는 Spark가 설치 된 것..? 그래서 Kafka가 설치된 node2에 대해서 연결을 하려고 하면 port에 대한 관련 오류가 뜨기도 하고 .. Java가 어쩌고 저쩌고 ($JAVA_HOME이 잘 못 설정되어 있기도 함.) 문제는 나는 node2에 spark를 띄워 놓고 또 띄우려고 한 것인 셈이다. Kafka를 node2에 설치하고 거기서 메세지를 받아오는 형식으로 구축하다 보니 헷갈린 것 같다. 지금도 Dag pipeline을 실행하려니 오류가 뜨기는 하지만. .
17:20 master를 node2의 7077으로 하니까 Fail connect to master가 뜸. 그러면 Spark master, Spark worker는 띄워져 있어야 한다는 건데 ,
Caused by: java.lang.RuntimeException: java.io.InvalidClassException: org.apache.spark.rpc.netty.RpcEndpointVerifier$CheckExistence; local class incompatible: stream classdesc serialVersionUID = 5378738997755484868, local class serialVersionUID = 7789290765573734431
spark가 겹칠 때
https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.12/3.3.0/spark-core_2.12-3.3.0.jar
아 아무튼 버전 묹네인거같다 자바 버전
About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.