Workflow

Table of Contents:

Project Name:

Date: 2024-04-08 09:12

Tag:


Contents:

| bami-node1 | Ubuntu Server 22.04 LTS | 192.168.0.70, 180.210.80.246 | | ———- | ———————– | —————————— | | bami-node2 | Ubuntu Server 22.04 LTS | 192.168.0.107, 133.186.215.112 | | bami-node3 | Ubuntu Server 22.04 LTS | 192.168.0.24, 180.210.80.215 |

ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDQNJMvwKEfN/5Z8JzkcXDSnTuu6tzsIB2V/IkJfeVcQ3wyi4e/O6zDQ3s8uD53l7TirWCOzYIk9g2RxY8iu91VlFWbY1mk284/dSAugktRcIe+T/vlRBT8yUJ9Jm3rMjpSEYRcKN/+U3d5W9H2DXNBW7bHwYB7mxLDIebexyH+d9Ln9rPXQXEI++GGx7f4vcPW2Nqzg4NWWOHnMUWSkmer6XOB0O5+9U9wuZJCj+4vhdmJJ3OsbjUDFX6ShouY306pNbMBJlNYVnVYwVyeZiG8YwOqDX3PjFlX10HTfofX8i1fYaWB/sOsFLd1+vBAH2BiSexSOvzu5g724NC4bzBCR2sQTqkEZQQKE2/KFzNs6VebrL2y5rPyjeEAah0n0Ne2QNb5NS/G47J3rvkQtjPuWE3rHUeu5COAGs1JN5CLOdCPs4wqWFFaDX3DfFN+3zVTxveNXPT4gct7RIOjBt8msLmru0MWgeGCraYOYutMcc0NJXMowix9430MHNDldM0= ubuntu@bami-node1

/home/share/nfs 192.168.0.70(rw,no_root_squash,async) /home/share/nfs 192.168.0.170(rw,no_root_squash,async) /home/share/nfs 192.168.0.24(rw,no_root_squash,async)

    YXBSVWxaWkNu
    SkJXR1JhVVV0Q1owaFdhR0ZzT1VoR2RsWnNZakZRUVRGU1JUQkdlRnB5V2pKcGJtcEdNRGxhVW05
    UFF6WmtVbWd4WjNFelRuUndXWFpCTkdKT2JsQUthSFpGWldGc1ExWkNaVzloVVUweE1IbFFiSFZ0
    VWtjMFptUllSV05ZYVhOUWFWRmxlVXhJTlZwWlJWWlZWRGhuVWxoSlpsWTVTMFZzUkUweU0zcFpN
    UXBhZEUxUE1sZHBTR3RLWkRSc1VtOWtPRlUxVFdSVFNWTkhRVXQyVjNacFZubFJTM1JJVVUxbGNX
    bGtTRzlCY2s5dlIzcDRDaTB0TFMwdFJVNUVJRkpUUVNCUVVrbFdRVlJGSUV0RldTMHRMUzB0Q2c9
    PQo=
180.210.80.246

volume만들면 pvc로 붙여넣기 . (statefulset)


pip 추가

pip install kafka-python

kafka 관련 pip 패키지 추가 도커파일 생성

FROM apache/airflow:2.8.1

ENV AIRFLOW_HOME=/home/airflow/.local
ENV JAVA_HOME=/usr/lib/jvm/jdk1.8.0_401
ENV SPARK_HOME=/home/airflow/.local/assembly/target/scala-2.12
ENV PATH="$PATH:${AIRFLOW_HOME}/bin:${JAVA_HOME}/bin:${SPARK_HOME}/bin"

USER root

RUN mkdir /usr/lib/jvm

COPY jdk-1.8.tar.gz /opt/airflow

RUN tar xvf jdk-1.8.tar.gz -C /usr/lib/jvm \
	&& rm jdk-1.8.tar.gz

COPY requirements.txt /

RUN apt-get update \
  && apt-get install -y --no-install-recommends \
        vim \
        wget \
        net-tools \
        dnsutils \
        iputils-ping \
        netcat-openbsd \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*

RUN curl -o  /opt/airflow/jars/kafka-clients-3.5.1.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.1/spark-token-provider-kafka-0-10_2.12-3.5.1.jar

# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.1/spark-sql-kafka-0-10_2.12-3.5.1.jar

RUN curl -o /opt/airflow/jars/commons-pool2-2.11.0.jar https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar

USER airflow

COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==2.8.1" -r /requirements.txt

client 배포

Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
{file_task_handler.py:560} ERROR - Could not read served logs
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 549, in _read_from_logs_server
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
  1. secreat 배포 ?
    • [b] REF

      https://serverfault.com/questions/947745/in-airflows-configuration-file-airflow-cfg-what-is-the-secret-key-and-do-i https://weejw.tistory.com/553 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key

apiVersion: v1
kind: Secret
metadata:
  name: kubeconfig-secret
  namespace: airflow
type: Opaque
data:
 airflow-webserver-secret-key: Sy1QYWFT

  • 배포: kubectl apply -f ~
  • 추가 ```yaml webserver: env:
    • name: AIRFLOW__WEBSERVER__SECRET_KEY value: airflow-webserver-secret-key

> 이거아니고 value.yaml에 값이 있음
> https://github.com/airflow-helm/charts/blob/main/charts/airflow/docs/faq/security/set-webserver-secret-key.md
# workers node select
> kubectl 쓸 줄 알아야함 

master1에 배포되게 지정함

kubectl_task = KubernetesPodOperator( namespace='your_namespace', cmds=["kubectl", "kubectl apply -f /home/ubuntu/kafka/kafka-topic.yaml"], name="kubectl-task", task_id="kubectl-task", get_logs=True, dag=dag )


# 하나ㅏㄹ도 성공시켜
...


```py
from datetime import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from kubernetes.client import models as k8s
import pendulum

local_tz = pendulum.timezone("Asia/Seoul")

def apply_kubectl():
    # kubectl apply 명령어 실행
    kubectl_apply_command = "kubectl apply -f /path/to/your/kubernetes/resources"
    subprocess.run(kubectl_apply_command, shell=True)

default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': dt(2024, 1, 1, tzinfo=local_tz),
    'retries': 1,
    'provide_context': True,
    'retry_delay': timedelta(minutes=1)
}
with DAG(
    dag_id='pipeline',
    default_args=default_args,
    schedule_interval=None
    )as dag:

    deploy_kafka_topic = PythonOperator(
        task_id='deploy_kafka_topic',
        python_callable=apply_kubectl,
        dag=dag,
    )


Project Name:

Date: 2024-04-08 09:12

Tag:


Contents:

| bami-node1 | Ubuntu Server 22.04 LTS | 192.168.0.70, 180.210.80.246 | | ———- | ———————– | —————————— | | bami-node2 | Ubuntu Server 22.04 LTS | 192.168.0.107, 133.186.215.112 | | bami-node3 | Ubuntu Server 22.04 LTS | 192.168.0.24, 180.210.80.215 |

ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDQNJMvwKEfN/5Z8JzkcXDSnTuu6tzsIB2V/IkJfeVcQ3wyi4e/O6zDQ3s8uD53l7TirWCOzYIk9g2RxY8iu91VlFWbY1mk284/dSAugktRcIe+T/vlRBT8yUJ9Jm3rMjpSEYRcKN/+U3d5W9H2DXNBW7bHwYB7mxLDIebexyH+d9Ln9rPXQXEI++GGx7f4vcPW2Nqzg4NWWOHnMUWSkmer6XOB0O5+9U9wuZJCj+4vhdmJJ3OsbjUDFX6ShouY306pNbMBJlNYVnVYwVyeZiG8YwOqDX3PjFlX10HTfofX8i1fYaWB/sOsFLd1+vBAH2BiSexSOvzu5g724NC4bzBCR2sQTqkEZQQKE2/KFzNs6VebrL2y5rPyjeEAah0n0Ne2QNb5NS/G47J3rvkQtjPuWE3rHUeu5COAGs1JN5CLOdCPs4wqWFFaDX3DfFN+3zVTxveNXPT4gct7RIOjBt8msLmru0MWgeGCraYOYutMcc0NJXMowix9430MHNDldM0= ubuntu@bami-node1

/home/share/nfs 192.168.0.70(rw,no_root_squash,async) /home/share/nfs 192.168.0.170(rw,no_root_squash,async) /home/share/nfs 192.168.0.24(rw,no_root_squash,async)

    YXBSVWxaWkNu
    SkJXR1JhVVV0Q1owaFdhR0ZzT1VoR2RsWnNZakZRUVRGU1JUQkdlRnB5V2pKcGJtcEdNRGxhVW05
    UFF6WmtVbWd4WjNFelRuUndXWFpCTkdKT2JsQUthSFpGWldGc1ExWkNaVzloVVUweE1IbFFiSFZ0
    VWtjMFptUllSV05ZYVhOUWFWRmxlVXhJTlZwWlJWWlZWRGhuVWxoSlpsWTVTMFZzUkUweU0zcFpN
    UXBhZEUxUE1sZHBTR3RLWkRSc1VtOWtPRlUxVFdSVFNWTkhRVXQyVjNacFZubFJTM1JJVVUxbGNX
    bGtTRzlCY2s5dlIzcDRDaTB0TFMwdFJVNUVJRkpUUVNCUVVrbFdRVlJGSUV0RldTMHRMUzB0Q2c9
    PQo=
180.210.80.246

volume만들면 pvc로 붙여넣기 . (statefulset)


pip 추가

pip install kafka-python

kafka 관련 pip 패키지 추가 도커파일 생성

FROM apache/airflow:2.8.1

ENV AIRFLOW_HOME=/home/airflow/.local
ENV JAVA_HOME=/usr/lib/jvm/jdk1.8.0_401
ENV SPARK_HOME=/home/airflow/.local/assembly/target/scala-2.12
ENV PATH="$PATH:${AIRFLOW_HOME}/bin:${JAVA_HOME}/bin:${SPARK_HOME}/bin"

USER root

RUN mkdir /usr/lib/jvm

COPY jdk-1.8.tar.gz /opt/airflow

RUN tar xvf jdk-1.8.tar.gz -C /usr/lib/jvm \
	&& rm jdk-1.8.tar.gz

COPY requirements.txt /

RUN apt-get update \
  && apt-get install -y --no-install-recommends \
        vim \
        wget \
        net-tools \
        dnsutils \
        iputils-ping \
        netcat-openbsd \
  && apt-get clean \
  && rm -rf /var/lib/apt/lists/*

RUN curl -o  /opt/airflow/jars/kafka-clients-3.5.1.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar

# Download Spark Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.1/spark-token-provider-kafka-0-10_2.12-3.5.1.jar

# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.1/spark-sql-kafka-0-10_2.12-3.5.1.jar

RUN curl -o /opt/airflow/jars/commons-pool2-2.11.0.jar https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar

USER airflow

COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==2.8.1" -r /requirements.txt

client 배포

Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
{file_task_handler.py:560} ERROR - Could not read served logs
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 549, in _read_from_logs_server
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
    raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
  1. secreat 배포 ?
    • [b] REF

      https://serverfault.com/questions/947745/in-airflows-configuration-file-airflow-cfg-what-is-the-secret-key-and-do-i https://weejw.tistory.com/553 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key

apiVersion: v1
kind: Secret
metadata:
  name: kubeconfig-secret
  namespace: airflow
type: Opaque
data:
 airflow-webserver-secret-key: Sy1QYWFT

  • 배포: kubectl apply -f ~
  • 추가 ```yaml webserver: env:
    • name: AIRFLOW__WEBSERVER__SECRET_KEY value: airflow-webserver-secret-key

> 이거아니고 value.yaml에 값이 있음
> https://github.com/airflow-helm/charts/blob/main/charts/airflow/docs/faq/security/set-webserver-secret-key.md
# workers node select
> kubectl 쓸 줄 알아야함 

master1에 배포되게 지정함

kubectl_task = KubernetesPodOperator( namespace='your_namespace', cmds=["kubectl", "kubectl apply -f /home/ubuntu/kafka/kafka-topic.yaml"], name="kubectl-task", task_id="kubectl-task", get_logs=True, dag=dag )


# 하나ㅏㄹ도 성공시켜
...


```py
from datetime import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from kubernetes.client import models as k8s
import pendulum

local_tz = pendulum.timezone("Asia/Seoul")

def apply_kubectl():
    # kubectl apply 명령어 실행
    kubectl_apply_command = "kubectl apply -f /path/to/your/kubernetes/resources"
    subprocess.run(kubectl_apply_command, shell=True)

default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': dt(2024, 1, 1, tzinfo=local_tz),
    'retries': 1,
    'provide_context': True,
    'retry_delay': timedelta(minutes=1)
}
with DAG(
    dag_id='pipeline',
    default_args=default_args,
    schedule_interval=None
    )as dag:

    deploy_kafka_topic = PythonOperator(
        task_id='deploy_kafka_topic',
        python_callable=apply_kubectl,
        dag=dag,
    )


About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.