This is whole steps rebuilding code of Data Proccessing

Table of Contents:

  • [b] REF

    https://taaewoo.tistory.com/32?category=887744 https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/33_spark_streaming_read_from_kafka.ipynb

  • [i] version

    spark 3.4.2 java 17 scala 2.13

  • [i] version(jar)

    kafka-clients-3.4.1 spark-token-provider-kafka-0-10_2.13-3.4.2 spark-sql-kafka-0-10_2.13-3.4.2

    Spark Dockerfile / docker-compose.yaml Source

custom images
FROM bitnami/spark:3.4.2

USER root

# Install necessary packages
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        vim \
        curl && \
    rm -rf /var/lib/apt/lists/*


USER 1001

# Download Kafka client JAR
RUN curl -o /opt/bitnami/spark/jars/kafka-clients-3.4.1.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.1/kafka-clients-3.4.1.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.13-3.4.2.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.13/3.4.2/spark-token-provider-kafka-0-10_2.13-3.4.2.jar

RUN curl -o /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.13-3.4.2.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.4.2/spark-sql-kafka-0-10_2.13-3.4.2.jar

docker-compose.yaml

위에서 생성한 이미지를 사용

# Copyright VMware, Inc.
# SPDX-License-Identifier: APACHE-2.0

version: '2'

services:
  spark:
    image: seunghyejeong/spark:1.0
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_USER=spark
    ports:
      - '8080:8080'
      - '7077:7077'
  spark-worker:
    image: seunghyejeong/spark:1.0
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_USER=spark

Spark streaming Soruce

  1. 토픽 생성
    docker exec  -ti broker kafka-topics --create --topic devices --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
    
  • topic 생성
    docker compose exec broker kafka-topics --create --topic kafka-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
    
  • topic 확인
    [appuser@broker ~]$ docker compose exec broker kafka-topics --list --bootstrap-server  {EXTERNAL_IP}:19092
    __consumer_offsets
    my-topic
    

KAFKA Container

  1. Sample JSON 파일
    {"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502", "eventOffset": 10001, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D001", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    
  2. Sample JSON 'device' data
    {"eventId": "ba2ea9f4-a5d9-434e-8e4d-1c80c2d4b456", "eventOffset": 10000, "eventPublisher": "device", "customerId": "CI00119", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502", "eventOffset": 10001, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D001", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "8c202190-bc24-4485-89ec-de78e602dd68", "eventOffset": 10002, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "aa90011f-3967-496c-b94b-a0c8de19a3d3", "eventOffset": 10003, "eventPublisher": "device", "customerId": "CI00108", "data": {"devices": [{"deviceId": "D004", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "e8859641-e9ad-44f8-94ce-353b840cff73", "eventOffset": 10004, "eventPublisher": "device", "customerId": "CI00116", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "804e8fa3-307b-482e-b629-af880c52e884", "eventOffset": 10005, "eventPublisher": "device", "customerId": "CI00106", "data": {"devices": [{"deviceId": "D002", "temperature": 30, "measure": "C", "status": "ERROR"}, {"deviceId": "D001", "temperature": 10, "measure": "C", "status": "STANDBY"}, {"deviceId": "D001", "temperature": 6, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "b8675032-3fdf-4e1e-8816-3d4c1cd852cf", "eventOffset": 10006, "eventPublisher": "device", "customerId": "CI00120", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "1c8d9682-56f0-4c3d-95c8-fce1bac45a74", "eventOffset": 10007, "eventPublisher": "device", "customerId": "CI00119", "data": {"devices": [{"deviceId": "D002", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 12, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "fc625d0e-06c2-46b1-b9b5-b4a067e0a212", "eventOffset": 10008, "eventPublisher": "device", "customerId": "CI00117", "data": {"devices": [{"deviceId": "D003", "temperature": 6, "measure": "C", "status": "ERROR"}, {"deviceId": "D001", "temperature": 19, "measure": "C", "status": "ERROR"}, {"deviceId": "D005", "temperature": 0, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "7dba5625-33e9-4d9f-b767-b44bd03e098d", "eventOffset": 10009, "eventPublisher": "device", "customerId": "CI00100", "data": {"devices": [{"deviceId": "D003", "temperature": 27, "measure": "C", "status": "STANDBY"}, {"deviceId": "D001", "temperature": 24, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "209cab2d-7934-4ad2-ac36-dcae0b42d96b", "eventOffset": 10010, "eventPublisher": "device", "customerId": "CI00118", "data": {"devices": [{"deviceId": "D002", "temperature": 27, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D005", "temperature": 23, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "158c282f-3bbc-447a-9303-1e978a23274a", "eventOffset": 10011, "eventPublisher": "device", "customerId": "CI00119", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "7146c4a8-54ed-4075-b013-c2d99e65d295", "eventOffset": 10012, "eventPublisher": "device", "customerId": "CI00117", "data": {"devices": [{"deviceId": "D002", "temperature": 5, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643895"}
    {"eventId": "1ff547fd-e335-457e-9a1f-686cfbe903e3", "eventOffset": 10013, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D004", "temperature": 23, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643895"}
    {"eventId": "692e9999-1110-4441-a20e-fd76692e2c17", "eventOffset": 10014, "eventPublisher": "device", "customerId": "CI00109", "data": {"devices": [{"deviceId": "D003", "temperature": 18, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.643895"}
    {"eventId": "80101e8c-af6a-4ff5-81ae-3bf5db432811", "eventOffset": 10015, "eventPublisher": "device", "customerId": "CI00101", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "7f0b1fba-3cd1-440d-9203-5dea57057ca8", "eventOffset": 10016, "eventPublisher": "device", "customerId": "CI00102", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "cb8a6a8f-89c9-498a-9106-7d148ba998b7", "eventOffset": 10017, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": [{"deviceId": "D004", "temperature": 5, "measure": "C", "status": "STANDBY"}, {"deviceId": "D004", "temperature": 22, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D004", "temperature": 9, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "a920562e-e8c0-4884-ad28-b74d82fc9ad8", "eventOffset": 10018, "eventPublisher": "device", "customerId": "CI00118", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "87941320-3424-42dc-b853-371698b9e7dd", "eventOffset": 10019, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": [{"deviceId": "D005", "temperature": 20, "measure": "C", "status": "ERROR"}, {"deviceId": "D005", "temperature": 4, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "37b1b74d-1538-4dd2-b689-cb8f2b214a0a", "eventOffset": 10020, "eventPublisher": "device", "customerId": "CI00101", "data": {"devices": [{"deviceId": "D003", "temperature": 22, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D004", "temperature": 15, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "db78b51f-2569-49de-8931-26f5c0bd424f", "eventOffset": 10021, "eventPublisher": "device", "customerId": "CI00102", "data": {"devices": [{"deviceId": "D002", "temperature": 22, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "1a8377a8-79e2-4138-8870-ce63b1bda703", "eventOffset": 10022, "eventPublisher": "device", "customerId": "CI00116", "data": {"devices": [{"deviceId": "D003", "temperature": 20, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "e12d25f8-acc5-4331-92fb-ec25e998f243", "eventOffset": 10023, "eventPublisher": "device", "customerId": "CI00120", "data": {"devices": [{"deviceId": "D003", "temperature": 20, "measure": "C", "status": "ERROR"}, {"deviceId": "D004", "temperature": 24, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "46a14162-f772-4acf-8a12-bb90790effaa", "eventOffset": 10024, "eventPublisher": "device", "customerId": "CI00109", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "95ad03f2-46c9-4d9d-9c02-ea2d0fcf5578", "eventOffset": 10025, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": [{"deviceId": "D001", "temperature": 10, "measure": "C", "status": "ERROR"}, {"deviceId": "D001", "temperature": 4, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "0d7af0e0-9606-4130-adf2-c22c973ebf2c", "eventOffset": 10026, "eventPublisher": "device", "customerId": "CI00113", "data": {"devices": [{"deviceId": "D002", "temperature": 15, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "bee50d5e-3280-496e-a381-1eae3576d698", "eventOffset": 10027, "eventPublisher": "device", "customerId": "CI00118", "data": {"devices": [{"deviceId": "D003", "temperature": 17, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D004", "temperature": 16, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "78aad1aa-a427-4b5b-a33a-07f2effe9bab", "eventOffset": 10028, "eventPublisher": "device", "customerId": "CI00107", "data": {"devices": [{"deviceId": "D002", "temperature": 28, "measure": "C", "status": "ERROR"}, {"deviceId": "D003", "temperature": 12, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "88bb528f-d8b8-4c6f-af79-937091390440", "eventOffset": 10029, "eventPublisher": "device", "customerId": "CI00114", "data": {"devices": [{"deviceId": "D003", "temperature": 22, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D001", "temperature": 29, "measure": "C", "status": "ERROR"}, {"deviceId": "D003", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "655ea4c8-58ed-48e1-bcea-911c4b79f1bc", "eventOffset": 10030, "eventPublisher": "device", "customerId": "CI00120", "data": {"devices": [{"deviceId": "D005", "temperature": 14, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    

./post_to_kafka.py

# Method posts events to Kafka Server
# run command in kafka server to create topic : 
# ./usr/bin/kafka-topics --create --topic device_data --bootstrap-server kafka:9092 
from kafka import KafkaProducer, KafkaConsumer
import time
import random
from device_events import generate_events

__bootstrap_server = "133.186.217.113:19092"


def post_to_kafka(data):
    print('data: '+ str(data))
    producer = KafkaProducer(bootstrap_servers=__bootstrap_server)
    producer.send('devices', key=b'device', value=data)
    #producer.flush()
    producer.close()
    print("Posted to topic")


if __name__ == "__main__":
    _offset = 10000
    while True:
        post_to_kafka(bytes(str(generate_events(offset=_offset)), 'utf-8'))
        time.sleep(random.randint(0, 5))
        _offset += 1
kafka-console-producer --topic devices --bootstrap-server broker:9092

SPARK Container

  1. Spakr 세션 만들기 ```python from pyspark.sql import SparkSession from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

spark = SparkSession
.builder
.appName("Streaming from Kafka")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.0')
.config("spark.sql.shuffle.partitions", 4)
.master("local[*]")
.getOrCreate()


3. Kafka에서 데이터 읽기를 위한 스트리밍 데이터 프레임 만들기 
```python
# Create the streaming_df to read from kafka
streaming_df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "133.186.217.113:19092") \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load()
# 데이터 스키마에 kafka 메시지를 게시하고 readStream을 read로 변경합니다  
# streaming_df.printSchema()  
# streaming_df.show(truncate=false)
  1. JSON schema ```python # JSON Schema
    from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = StructType([StructField('customerId', StringType(), True), StructField('data', StructType([StructField('devices', ArrayType(StructType([ StructField('deviceId', StringType(), True),
StructField('measure', StringType(), True), StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), StructField('eventId', StringType(), True), StructField('eventOffset', LongType(), True), StructField('eventPublisher', StringType(), True), StructField('eventTime', StringType(), True)])


/opt/bitnami/spark/bin/spark-submit 

4. key-values 값에서 데이터를 읽고 JSON파일 형식으로 포맷
```python
**# Parse value from binay to string**  
json_df = streaming_df.selectExpr("cast(value as string) as value")  
  
**# Apply Schema to JSON value column and expand the value**  
from pyspark.sql.functions import from_json  
  
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")
  1. 데이터를 Array ```python # Lets explode the data as devices contains list/array of device reading
    from pyspark.sql.functions import explode, col

exploded_df = json_expanded_df \
.select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data") \
.withColumn("devices", explode("data.devices")) \
.drop("data")


6. Array를 간소화시키기
```python
flattened_df = exploded_df \  
.selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "cast(eventTime as timestamp) as eventTime",  
"devices.deviceId as deviceId", "devices.measure as measure",  
"devices.status as status", "devices.temperature as temperature")
  1. 평균값을 구하여 SUCCESS 시키기 ```python **# Aggregate the dataframes to find the average temparature

    per Customer per device throughout the day for SUCCESS events**

    from pyspark.sql.functions import to_date, avg

agg_df = flattened_df.where("STATUS = 'SUCCESS'") \
.withColumn("eventDate", to_date("eventTime", "yyyy-MM-dd")) \
.groupBy("customerId","deviceId","eventDate") \
.agg(avg("temperature").alias("avg_temp"))


8. console에 출력하기
```python
**# Write the output to console sink to check the output**  
writing_df = agg_df.writeStream \  
.format("console") \  
.option("checkpointLocation","checkpoint_dir") \  
.outputMode("complete") \  
.start()  
  
**# Start the streaming application to run until the following happens  
# 1. Exception in the running program  
# 2. Manual Interruption**  
writing_df.awaitTermination()

downgrade 3.3.0

ERROR: java.lang.NoClassDefFoundError: scala/$less$colon$less

1.spark-submit CLI 로 python을 설치 할 때 .load() 부분에서 계속 에러가 남

24/02/22 05:29:03 INFO SharedState: Warehouse path is 'file:/spark-warehouse'.
Traceback (most recent call last):
  File "/spark_streaming.py", line 19, in <module>
    .load()
     ^^^^^^
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming/readwriter.py", line 277, in load
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less

: 이 때의 버전은 3.4.0에 스칼라 2.13 버전이었고, 예제는 3.3.0에 2.12 버전이었음. 아무래도 높은 버전의 이유인거 같아서 다운 그레이드 진행

FROM bitnami/spark:3.3.0

USER root

# Install necessary packages
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        vim \
        curl \
        netcat-openbsd && \
    rm -rf /var/lib/apt/lists/*

# Download Kafka client JAR
RUN curl -o /opt/bitnami/spark/jars/kafka-clients-3.3.0.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar

# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar

USER 1001

https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar
https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar
https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar 

Python CLI 사용하기

실행한 코드

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

streaming_df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "133.186.217.113:19092") \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load()

# JSON Schema
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
json_schema = StructType([StructField('customerId', StringType(), True), \
StructField('data', StructType([StructField('devices', ArrayType(StructType([ \
StructField('deviceId', StringType(), True), \
StructField('measure', StringType(), True), \
StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), \
StructField('eventId', StringType(), True), \
StructField('eventOffset', LongType(), True), \
StructField('eventPublisher', StringType(), True), \
StructField('eventTime', StringType(), True)])

# Parse value from binay to string
json_df = streaming_df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
from pyspark.sql.functions import from_json

json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")

ERROR: ModuleNotFoundError: No module named 'py4j'

REF 에서 진행되는 프로세스를 보니 python 명령어를 사용 하길래 한번 시도해봄

CHAT GPT Said 💁‍♀️ ``` what am i use CLI for run that codes?

Launch Python interpreter python

python


> python spark_streaming.py ![[Pasted image 20240222153354.png]]

: 그러더니 모듈이 없다고 나오길래 설치 해줌

```bash
pip install py4j==0.10.9.5

그리고 python 실행

$ python spark_streaming.py

그리고 나의 OUTPUT

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /opt/bitnami/spark/.ivy2/cache
The jars for the packages stored in: /opt/bitnami/spark/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-49284453-8db1-4df1-8cc3-25cb272d9304;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.4.0/spark-sql-kafka-0-10_2.13-3.4.0.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.0!spark-sql-kafka-0-10_2.13.jar (508ms)
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.13/3.4.0/spark-token-provider-kafka-0-10_2.13-3.4.0.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.0!spark-token-provider-kafka-0-10_2.13.jar (266ms)
downloading https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parallel-collections_2.13/1.0.4/scala-parallel-collections_2.13-1.0.4.jar ...
	[SUCCESSFUL ] org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4!scala-parallel-collections_2.13.jar (420ms)
downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.2/kafka-clients-3.3.2.jar ...
	[SUCCESSFUL ] org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar (513ms)
downloading https://repo1.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar ...
	[SUCCESSFUL ] com.google.code.findbugs#jsr305;3.0.0!jsr305.jar (256ms)
downloading https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar ...
	[SUCCESSFUL ] org.apache.commons#commons-pool2;2.11.1!commons-pool2.jar (259ms)
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-client-runtime;3.3.4!hadoop-client-runtime.jar (1537ms)
downloading https://repo1.maven.org/maven2/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar ...
	[SUCCESSFUL ] org.lz4#lz4-java;1.8.0!lz4-java.jar (283ms)
downloading https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.9.1/snappy-java-1.1.9.1.jar ...
	[SUCCESSFUL ] org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle) (274ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/2.0.6/slf4j-api-2.0.6.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;2.0.6!slf4j-api.jar (276ms)
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-client-api;3.3.4!hadoop-client-api.jar (1030ms)
downloading https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar ...
	[SUCCESSFUL ] commons-logging#commons-logging;1.1.3!commons-logging.jar (278ms)
:: resolution report :: resolve 16510ms :: artifacts dl 5920ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#hadoop-client-runtime;3.3.4 from central in [default]
	org.apache.kafka#kafka-clients;3.3.2 from central in [default]
	org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.0 from central in [default]
	org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.0 from central in [default]
	org.lz4#lz4-java;1.8.0 from central in [default]
	org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 from central in [default]
	org.slf4j#slf4j-api;2.0.6 from central in [default]
	org.xerial.snappy#snappy-java;1.1.9.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   12  |   12  |   12  |   0   ||   12  |   12  |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-49284453-8db1-4df1-8cc3-25cb272d9304
	confs: [default]
	12 artifacts copied, 0 already retrieved (57458kB/125ms)
24/02/22 06:24:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

형아 아웃풋 ![[Pasted image 20240222153512.png]]

: 🧐 이거 완조니 형아가 말한 output이랑 똑같은거 ㅠ… 된거일까?

그래서 Dockerfile을 수정한다

FROM bitnami/spark:3.3.0

USER root

# Install necessary packages
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        vim \
        curl \
        netcat-openbsd && \
    rm -rf /var/lib/apt/lists/*

# Download Kafka client JAR
RUN curl -o /opt/bitnami/spark/jars/kafka-clients-3.3.0.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar

# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar

RUN pip install py4j==0.10.9.5

USER 1001
docker build --no-cache -t seunghyejeong/spark:1.0 .
docker push seunghyejeong/spark:1.0

Streaming Code를 계속 추가해봄(worker)

  • [!] 형아가 Kafka에서 메세지를 받아오려면 readstream을 read로 바꾸랬다.![[Pasted image 20240222154250.png]]
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark.sql.functions import from_json

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") `\`
    .getOrCreate()

streaming_df = spark.read\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "133.186.217.113:19092") \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load()
streaming_df.printSchema()
streaming_df.show(truncate=False)

# JSON Schema
json_schema = StructType([StructField('customerId', StringType(), True), \
StructField('data', StructType([StructField('devices', ArrayType(StructType([ \
StructField('deviceId', StringType(), True), \
StructField('measure', StringType(), True), \
StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), \
StructField('eventId', StringType(), True), \
StructField('eventOffset', LongType(), True), \
StructField('eventPublisher', StringType(), True), \
StructField('eventTime', StringType(), True)])

# Parse value from binay to string
json_df = streaming_df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")

# Validate Schema
json_expanded_df.show(10, False)
json_expanded_df.printSchema()

ERROR : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (83449ffc642fexecutor driver): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig

아 자꾸 또 class 없다고 떠..서… 보니까 spark.master라는 변수가.. local로 되어있었음. 바꿔봄..

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark.sql.functions import from_json

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("spark://spark:7077") `\`
    .getOrCreate()

했떠니~

ERROR: 24/02/22 07:46:26 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.3 executor 0): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig

다른게 떴고 뭔가 될거같음.. ip 가 나오는거 보니. . . .

Master ip를 삽입.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark.sql.functions import from_json

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("spark://133.186.217.113:7077") \
    .getOrCreate()

ERROR AsyncEventQueue: Listener AppStatusListener threw an exception java.lang.NullPointerException

docker compose file에 있는 master ip를 public ip로 연결 후 컨테이너를 다시 만듦.

 spark-worker:
    image: seunghyejeong/spark:1.0
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://133.186.217.113:7077
      - SPARK_WORKER_MEMORY=1G

몰러.. 마스터아이피가 저게 아닌가봐 .. 7077이랑 통신이 안돼 ..

  • [b] REF

    https://taaewoo.tistory.com/32?category=887744 https://github.com/subhamkharwal/ease-with-apache-spark/blob/master/33_spark_streaming_read_from_kafka.ipynb

  • [i] version

    spark 3.4.2 java 17 scala 2.13

  • [i] version(jar)

    kafka-clients-3.4.1 spark-token-provider-kafka-0-10_2.13-3.4.2 spark-sql-kafka-0-10_2.13-3.4.2

    Spark Dockerfile / docker-compose.yaml Source

custom images
FROM bitnami/spark:3.4.2

USER root

# Install necessary packages
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        vim \
        curl && \
    rm -rf /var/lib/apt/lists/*


USER 1001

# Download Kafka client JAR
RUN curl -o /opt/bitnami/spark/jars/kafka-clients-3.4.1.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.4.1/kafka-clients-3.4.1.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.13-3.4.2.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.13/3.4.2/spark-token-provider-kafka-0-10_2.13-3.4.2.jar

RUN curl -o /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.13-3.4.2.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.4.2/spark-sql-kafka-0-10_2.13-3.4.2.jar

docker-compose.yaml

위에서 생성한 이미지를 사용

# Copyright VMware, Inc.
# SPDX-License-Identifier: APACHE-2.0

version: '2'

services:
  spark:
    image: seunghyejeong/spark:1.0
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_USER=spark
    ports:
      - '8080:8080'
      - '7077:7077'
  spark-worker:
    image: seunghyejeong/spark:1.0
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_USER=spark

Spark streaming Soruce

  1. 토픽 생성
    docker exec  -ti broker kafka-topics --create --topic devices --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
    
  • topic 생성
    docker compose exec broker kafka-topics --create --topic kafka-topic --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
    
  • topic 확인
    [appuser@broker ~]$ docker compose exec broker kafka-topics --list --bootstrap-server  {EXTERNAL_IP}:19092
    __consumer_offsets
    my-topic
    

KAFKA Container

  1. Sample JSON 파일
    {"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502", "eventOffset": 10001, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D001", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    
  2. Sample JSON 'device' data
    {"eventId": "ba2ea9f4-a5d9-434e-8e4d-1c80c2d4b456", "eventOffset": 10000, "eventPublisher": "device", "customerId": "CI00119", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "e3cb26d3-41b2-49a2-84f3-0156ed8d7502", "eventOffset": 10001, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D001", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "8c202190-bc24-4485-89ec-de78e602dd68", "eventOffset": 10002, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "aa90011f-3967-496c-b94b-a0c8de19a3d3", "eventOffset": 10003, "eventPublisher": "device", "customerId": "CI00108", "data": {"devices": [{"deviceId": "D004", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "e8859641-e9ad-44f8-94ce-353b840cff73", "eventOffset": 10004, "eventPublisher": "device", "customerId": "CI00116", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "804e8fa3-307b-482e-b629-af880c52e884", "eventOffset": 10005, "eventPublisher": "device", "customerId": "CI00106", "data": {"devices": [{"deviceId": "D002", "temperature": 30, "measure": "C", "status": "ERROR"}, {"deviceId": "D001", "temperature": 10, "measure": "C", "status": "STANDBY"}, {"deviceId": "D001", "temperature": 6, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "b8675032-3fdf-4e1e-8816-3d4c1cd852cf", "eventOffset": 10006, "eventPublisher": "device", "customerId": "CI00120", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "1c8d9682-56f0-4c3d-95c8-fce1bac45a74", "eventOffset": 10007, "eventPublisher": "device", "customerId": "CI00119", "data": {"devices": [{"deviceId": "D002", "temperature": 15, "measure": "C", "status": "ERROR"}, {"deviceId": "D002", "temperature": 12, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "fc625d0e-06c2-46b1-b9b5-b4a067e0a212", "eventOffset": 10008, "eventPublisher": "device", "customerId": "CI00117", "data": {"devices": [{"deviceId": "D003", "temperature": 6, "measure": "C", "status": "ERROR"}, {"deviceId": "D001", "temperature": 19, "measure": "C", "status": "ERROR"}, {"deviceId": "D005", "temperature": 0, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "7dba5625-33e9-4d9f-b767-b44bd03e098d", "eventOffset": 10009, "eventPublisher": "device", "customerId": "CI00100", "data": {"devices": [{"deviceId": "D003", "temperature": 27, "measure": "C", "status": "STANDBY"}, {"deviceId": "D001", "temperature": 24, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "209cab2d-7934-4ad2-ac36-dcae0b42d96b", "eventOffset": 10010, "eventPublisher": "device", "customerId": "CI00118", "data": {"devices": [{"deviceId": "D002", "temperature": 27, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D005", "temperature": 23, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "158c282f-3bbc-447a-9303-1e978a23274a", "eventOffset": 10011, "eventPublisher": "device", "customerId": "CI00119", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.643364"}
    {"eventId": "7146c4a8-54ed-4075-b013-c2d99e65d295", "eventOffset": 10012, "eventPublisher": "device", "customerId": "CI00117", "data": {"devices": [{"deviceId": "D002", "temperature": 5, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643895"}
    {"eventId": "1ff547fd-e335-457e-9a1f-686cfbe903e3", "eventOffset": 10013, "eventPublisher": "device", "customerId": "CI00103", "data": {"devices": [{"deviceId": "D004", "temperature": 23, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.643895"}
    {"eventId": "692e9999-1110-4441-a20e-fd76692e2c17", "eventOffset": 10014, "eventPublisher": "device", "customerId": "CI00109", "data": {"devices": [{"deviceId": "D003", "temperature": 18, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.643895"}
    {"eventId": "80101e8c-af6a-4ff5-81ae-3bf5db432811", "eventOffset": 10015, "eventPublisher": "device", "customerId": "CI00101", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "7f0b1fba-3cd1-440d-9203-5dea57057ca8", "eventOffset": 10016, "eventPublisher": "device", "customerId": "CI00102", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "cb8a6a8f-89c9-498a-9106-7d148ba998b7", "eventOffset": 10017, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": [{"deviceId": "D004", "temperature": 5, "measure": "C", "status": "STANDBY"}, {"deviceId": "D004", "temperature": 22, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D004", "temperature": 9, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "a920562e-e8c0-4884-ad28-b74d82fc9ad8", "eventOffset": 10018, "eventPublisher": "device", "customerId": "CI00118", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "87941320-3424-42dc-b853-371698b9e7dd", "eventOffset": 10019, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": [{"deviceId": "D005", "temperature": 20, "measure": "C", "status": "ERROR"}, {"deviceId": "D005", "temperature": 4, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "37b1b74d-1538-4dd2-b689-cb8f2b214a0a", "eventOffset": 10020, "eventPublisher": "device", "customerId": "CI00101", "data": {"devices": [{"deviceId": "D003", "temperature": 22, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D004", "temperature": 15, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "db78b51f-2569-49de-8931-26f5c0bd424f", "eventOffset": 10021, "eventPublisher": "device", "customerId": "CI00102", "data": {"devices": [{"deviceId": "D002", "temperature": 22, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "1a8377a8-79e2-4138-8870-ce63b1bda703", "eventOffset": 10022, "eventPublisher": "device", "customerId": "CI00116", "data": {"devices": [{"deviceId": "D003", "temperature": 20, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "e12d25f8-acc5-4331-92fb-ec25e998f243", "eventOffset": 10023, "eventPublisher": "device", "customerId": "CI00120", "data": {"devices": [{"deviceId": "D003", "temperature": 20, "measure": "C", "status": "ERROR"}, {"deviceId": "D004", "temperature": 24, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "46a14162-f772-4acf-8a12-bb90790effaa", "eventOffset": 10024, "eventPublisher": "device", "customerId": "CI00109", "data": {"devices": []}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "95ad03f2-46c9-4d9d-9c02-ea2d0fcf5578", "eventOffset": 10025, "eventPublisher": "device", "customerId": "CI00104", "data": {"devices": [{"deviceId": "D001", "temperature": 10, "measure": "C", "status": "ERROR"}, {"deviceId": "D001", "temperature": 4, "measure": "C", "status": "ERROR"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "0d7af0e0-9606-4130-adf2-c22c973ebf2c", "eventOffset": 10026, "eventPublisher": "device", "customerId": "CI00113", "data": {"devices": [{"deviceId": "D002", "temperature": 15, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "bee50d5e-3280-496e-a381-1eae3576d698", "eventOffset": 10027, "eventPublisher": "device", "customerId": "CI00118", "data": {"devices": [{"deviceId": "D003", "temperature": 17, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D004", "temperature": 16, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "78aad1aa-a427-4b5b-a33a-07f2effe9bab", "eventOffset": 10028, "eventPublisher": "device", "customerId": "CI00107", "data": {"devices": [{"deviceId": "D002", "temperature": 28, "measure": "C", "status": "ERROR"}, {"deviceId": "D003", "temperature": 12, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "88bb528f-d8b8-4c6f-af79-937091390440", "eventOffset": 10029, "eventPublisher": "device", "customerId": "CI00114", "data": {"devices": [{"deviceId": "D003", "temperature": 22, "measure": "C", "status": "SUCCESS"}, {"deviceId": "D001", "temperature": 29, "measure": "C", "status": "ERROR"}, {"deviceId": "D003", "temperature": 16, "measure": "C", "status": "SUCCESS"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    {"eventId": "655ea4c8-58ed-48e1-bcea-911c4b79f1bc", "eventOffset": 10030, "eventPublisher": "device", "customerId": "CI00120", "data": {"devices": [{"deviceId": "D005", "temperature": 14, "measure": "C", "status": "STANDBY"}]}, "eventTime": "2023-01-05 11:13:53.649684"}
    

./post_to_kafka.py

# Method posts events to Kafka Server
# run command in kafka server to create topic : 
# ./usr/bin/kafka-topics --create --topic device_data --bootstrap-server kafka:9092 
from kafka import KafkaProducer, KafkaConsumer
import time
import random
from device_events import generate_events

__bootstrap_server = "133.186.217.113:19092"


def post_to_kafka(data):
    print('data: '+ str(data))
    producer = KafkaProducer(bootstrap_servers=__bootstrap_server)
    producer.send('devices', key=b'device', value=data)
    #producer.flush()
    producer.close()
    print("Posted to topic")


if __name__ == "__main__":
    _offset = 10000
    while True:
        post_to_kafka(bytes(str(generate_events(offset=_offset)), 'utf-8'))
        time.sleep(random.randint(0, 5))
        _offset += 1
kafka-console-producer --topic devices --bootstrap-server broker:9092

SPARK Container

  1. Spakr 세션 만들기 ```python from pyspark.sql import SparkSession from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

spark = SparkSession
.builder
.appName("Streaming from Kafka")
.config("spark.streaming.stopGracefullyOnShutdown", True)
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.0')
.config("spark.sql.shuffle.partitions", 4)
.master("local[*]")
.getOrCreate()


3. Kafka에서 데이터 읽기를 위한 스트리밍 데이터 프레임 만들기 
```python
# Create the streaming_df to read from kafka
streaming_df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "133.186.217.113:19092") \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load()
# 데이터 스키마에 kafka 메시지를 게시하고 readStream을 read로 변경합니다  
# streaming_df.printSchema()  
# streaming_df.show(truncate=false)
  1. JSON schema ```python # JSON Schema
    from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = StructType([StructField('customerId', StringType(), True), StructField('data', StructType([StructField('devices', ArrayType(StructType([ StructField('deviceId', StringType(), True),
StructField('measure', StringType(), True), StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), StructField('eventId', StringType(), True), StructField('eventOffset', LongType(), True), StructField('eventPublisher', StringType(), True), StructField('eventTime', StringType(), True)])


/opt/bitnami/spark/bin/spark-submit 

4. key-values 값에서 데이터를 읽고 JSON파일 형식으로 포맷
```python
**# Parse value from binay to string**  
json_df = streaming_df.selectExpr("cast(value as string) as value")  
  
**# Apply Schema to JSON value column and expand the value**  
from pyspark.sql.functions import from_json  
  
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")
  1. 데이터를 Array ```python # Lets explode the data as devices contains list/array of device reading
    from pyspark.sql.functions import explode, col

exploded_df = json_expanded_df \
.select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data") \
.withColumn("devices", explode("data.devices")) \
.drop("data")


6. Array를 간소화시키기
```python
flattened_df = exploded_df \  
.selectExpr("customerId", "eventId", "eventOffset", "eventPublisher", "cast(eventTime as timestamp) as eventTime",  
"devices.deviceId as deviceId", "devices.measure as measure",  
"devices.status as status", "devices.temperature as temperature")
  1. 평균값을 구하여 SUCCESS 시키기 ```python **# Aggregate the dataframes to find the average temparature

    per Customer per device throughout the day for SUCCESS events**

    from pyspark.sql.functions import to_date, avg

agg_df = flattened_df.where("STATUS = 'SUCCESS'") \
.withColumn("eventDate", to_date("eventTime", "yyyy-MM-dd")) \
.groupBy("customerId","deviceId","eventDate") \
.agg(avg("temperature").alias("avg_temp"))


8. console에 출력하기
```python
**# Write the output to console sink to check the output**  
writing_df = agg_df.writeStream \  
.format("console") \  
.option("checkpointLocation","checkpoint_dir") \  
.outputMode("complete") \  
.start()  
  
**# Start the streaming application to run until the following happens  
# 1. Exception in the running program  
# 2. Manual Interruption**  
writing_df.awaitTermination()

downgrade 3.3.0

ERROR: java.lang.NoClassDefFoundError: scala/$less$colon$less

1.spark-submit CLI 로 python을 설치 할 때 .load() 부분에서 계속 에러가 남

24/02/22 05:29:03 INFO SharedState: Warehouse path is 'file:/spark-warehouse'.
Traceback (most recent call last):
  File "/spark_streaming.py", line 19, in <module>
    .load()
     ^^^^^^
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/sql/streaming/readwriter.py", line 277, in load
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/opt/bitnami/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
  File "/opt/bitnami/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o35.load.
: java.lang.NoClassDefFoundError: scala/$less$colon$less

: 이 때의 버전은 3.4.0에 스칼라 2.13 버전이었고, 예제는 3.3.0에 2.12 버전이었음. 아무래도 높은 버전의 이유인거 같아서 다운 그레이드 진행

FROM bitnami/spark:3.3.0

USER root

# Install necessary packages
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        vim \
        curl \
        netcat-openbsd && \
    rm -rf /var/lib/apt/lists/*

# Download Kafka client JAR
RUN curl -o /opt/bitnami/spark/jars/kafka-clients-3.3.0.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar

# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar

USER 1001

https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar
https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar
https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar 

Python CLI 사용하기

실행한 코드

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") \
    .getOrCreate()

streaming_df = spark.readStream\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "133.186.217.113:19092") \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load()

# JSON Schema
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
json_schema = StructType([StructField('customerId', StringType(), True), \
StructField('data', StructType([StructField('devices', ArrayType(StructType([ \
StructField('deviceId', StringType(), True), \
StructField('measure', StringType(), True), \
StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), \
StructField('eventId', StringType(), True), \
StructField('eventOffset', LongType(), True), \
StructField('eventPublisher', StringType(), True), \
StructField('eventTime', StringType(), True)])

# Parse value from binay to string
json_df = streaming_df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
from pyspark.sql.functions import from_json

json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")

ERROR: ModuleNotFoundError: No module named 'py4j'

REF 에서 진행되는 프로세스를 보니 python 명령어를 사용 하길래 한번 시도해봄

CHAT GPT Said 💁‍♀️ ``` what am i use CLI for run that codes?

Launch Python interpreter python

python


> python spark_streaming.py !Pasted image 20240222153354.png

: 그러더니 모듈이 없다고 나오길래 설치 해줌

```bash
pip install py4j==0.10.9.5

그리고 python 실행

$ python spark_streaming.py

그리고 나의 OUTPUT

:: loading settings :: url = jar:file:/opt/bitnami/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /opt/bitnami/spark/.ivy2/cache
The jars for the packages stored in: /opt/bitnami/spark/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-49284453-8db1-4df1-8cc3-25cb272d9304;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.0 in central
	found org.apache.kafka#kafka-clients;3.3.2 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.9.1 in central
	found org.slf4j#slf4j-api;2.0.6 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.13/3.4.0/spark-sql-kafka-0-10_2.13-3.4.0.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.0!spark-sql-kafka-0-10_2.13.jar (508ms)
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.13/3.4.0/spark-token-provider-kafka-0-10_2.13-3.4.0.jar ...
	[SUCCESSFUL ] org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.0!spark-token-provider-kafka-0-10_2.13.jar (266ms)
downloading https://repo1.maven.org/maven2/org/scala-lang/modules/scala-parallel-collections_2.13/1.0.4/scala-parallel-collections_2.13-1.0.4.jar ...
	[SUCCESSFUL ] org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4!scala-parallel-collections_2.13.jar (420ms)
downloading https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.2/kafka-clients-3.3.2.jar ...
	[SUCCESSFUL ] org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar (513ms)
downloading https://repo1.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar ...
	[SUCCESSFUL ] com.google.code.findbugs#jsr305;3.0.0!jsr305.jar (256ms)
downloading https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar ...
	[SUCCESSFUL ] org.apache.commons#commons-pool2;2.11.1!commons-pool2.jar (259ms)
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.4/hadoop-client-runtime-3.3.4.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-client-runtime;3.3.4!hadoop-client-runtime.jar (1537ms)
downloading https://repo1.maven.org/maven2/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar ...
	[SUCCESSFUL ] org.lz4#lz4-java;1.8.0!lz4-java.jar (283ms)
downloading https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.9.1/snappy-java-1.1.9.1.jar ...
	[SUCCESSFUL ] org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle) (274ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/2.0.6/slf4j-api-2.0.6.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;2.0.6!slf4j-api.jar (276ms)
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.4/hadoop-client-api-3.3.4.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-client-api;3.3.4!hadoop-client-api.jar (1030ms)
downloading https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar ...
	[SUCCESSFUL ] commons-logging#commons-logging;1.1.3!commons-logging.jar (278ms)
:: resolution report :: resolve 16510ms :: artifacts dl 5920ms
	:: modules in use:
	com.google.code.findbugs#jsr305;3.0.0 from central in [default]
	commons-logging#commons-logging;1.1.3 from central in [default]
	org.apache.commons#commons-pool2;2.11.1 from central in [default]
	org.apache.hadoop#hadoop-client-api;3.3.4 from central in [default]
	org.apache.hadoop#hadoop-client-runtime;3.3.4 from central in [default]
	org.apache.kafka#kafka-clients;3.3.2 from central in [default]
	org.apache.spark#spark-sql-kafka-0-10_2.13;3.4.0 from central in [default]
	org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.4.0 from central in [default]
	org.lz4#lz4-java;1.8.0 from central in [default]
	org.scala-lang.modules#scala-parallel-collections_2.13;1.0.4 from central in [default]
	org.slf4j#slf4j-api;2.0.6 from central in [default]
	org.xerial.snappy#snappy-java;1.1.9.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   12  |   12  |   12  |   0   ||   12  |   12  |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-49284453-8db1-4df1-8cc3-25cb272d9304
	confs: [default]
	12 artifacts copied, 0 already retrieved (57458kB/125ms)
24/02/22 06:24:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

형아 아웃풋 !Pasted image 20240222153512.png

: 🧐 이거 완조니 형아가 말한 output이랑 똑같은거 ㅠ… 된거일까?

그래서 Dockerfile을 수정한다

FROM bitnami/spark:3.3.0

USER root

# Install necessary packages
RUN apt-get update && \
    apt-get install -y --no-install-recommends \
        vim \
        curl \
        netcat-openbsd && \
    rm -rf /var/lib/apt/lists/*

# Download Kafka client JAR
RUN curl -o /opt/bitnami/spark/jars/kafka-clients-3.3.0.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.3.0/kafka-clients-3.3.0.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.0/spark-token-provider-kafka-0-10_2.12-3.3.0.jar

# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.12-3.3.0.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.0/spark-sql-kafka-0-10_2.12-3.3.0.jar

RUN pip install py4j==0.10.9.5

USER 1001
docker build --no-cache -t seunghyejeong/spark:1.0 .
docker push seunghyejeong/spark:1.0

Streaming Code를 계속 추가해봄(worker)

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark.sql.functions import from_json

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("local[*]") `\`
    .getOrCreate()

streaming_df = spark.read\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "133.186.217.113:19092") \
    .option("subscribe", "devices") \
    .option("startingOffsets", "earliest") \
    .load()
streaming_df.printSchema()
streaming_df.show(truncate=False)

# JSON Schema
json_schema = StructType([StructField('customerId', StringType(), True), \
StructField('data', StructType([StructField('devices', ArrayType(StructType([ \
StructField('deviceId', StringType(), True), \
StructField('measure', StringType(), True), \
StructField('status', StringType(), True), \
StructField('temperature', LongType(), True)]), True), True)]), True), \
StructField('eventId', StringType(), True), \
StructField('eventOffset', LongType(), True), \
StructField('eventPublisher', StringType(), True), \
StructField('eventTime', StringType(), True)])

# Parse value from binay to string
json_df = streaming_df.selectExpr("cast(value as string) as value")

# Apply Schema to JSON value column and expand the value
json_expanded_df = json_df.withColumn("value", from_json(json_df["value"], json_schema)).select("value.*")

# Validate Schema
json_expanded_df.show(10, False)
json_expanded_df.printSchema()

ERROR : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (83449ffc642fexecutor driver): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig

아 자꾸 또 class 없다고 떠..서… 보니까 spark.master라는 변수가.. local로 되어있었음. 바꿔봄..

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark.sql.functions import from_json

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("spark://spark:7077") `\`
    .getOrCreate()

했떠니~

ERROR: 24/02/22 07:46:26 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.3 executor 0): java.lang.NoClassDefFoundError: org/apache/commons/pool2/impl/GenericKeyedObjectPoolConfig

다른게 떴고 뭔가 될거같음.. ip 가 나오는거 보니. . . .

Master ip를 삽입.

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType
from pyspark.sql.functions import from_json

spark = SparkSession \
    .builder \
    .appName("Streaming from Kafka") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0') \
    .config("spark.sql.shuffle.partitions", 4) \
    .master("spark://133.186.217.113:7077") \
    .getOrCreate()

ERROR AsyncEventQueue: Listener AppStatusListener threw an exception java.lang.NullPointerException

docker compose file에 있는 master ip를 public ip로 연결 후 컨테이너를 다시 만듦.

 spark-worker:
    image: seunghyejeong/spark:1.0
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://133.186.217.113:7077
      - SPARK_WORKER_MEMORY=1G

몰러.. 마스터아이피가 저게 아닌가봐 .. 7077이랑 통신이 안돼 ..

About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.