Workflow
Table of Contents:
Project Name:
Date: 2024-04-08 09:12
Tag:
Contents:
| bami-node1 | Ubuntu Server 22.04 LTS | 192.168.0.70, 180.210.80.246 | | ———- | ———————– | —————————— | | bami-node2 | Ubuntu Server 22.04 LTS | 192.168.0.107, 133.186.215.112 | | bami-node3 | Ubuntu Server 22.04 LTS | 192.168.0.24, 180.210.80.215 |
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDQNJMvwKEfN/5Z8JzkcXDSnTuu6tzsIB2V/IkJfeVcQ3wyi4e/O6zDQ3s8uD53l7TirWCOzYIk9g2RxY8iu91VlFWbY1mk284/dSAugktRcIe+T/vlRBT8yUJ9Jm3rMjpSEYRcKN/+U3d5W9H2DXNBW7bHwYB7mxLDIebexyH+d9Ln9rPXQXEI++GGx7f4vcPW2Nqzg4NWWOHnMUWSkmer6XOB0O5+9U9wuZJCj+4vhdmJJ3OsbjUDFX6ShouY306pNbMBJlNYVnVYwVyeZiG8YwOqDX3PjFlX10HTfofX8i1fYaWB/sOsFLd1+vBAH2BiSexSOvzu5g724NC4bzBCR2sQTqkEZQQKE2/KFzNs6VebrL2y5rPyjeEAah0n0Ne2QNb5NS/G47J3rvkQtjPuWE3rHUeu5COAGs1JN5CLOdCPs4wqWFFaDX3DfFN+3zVTxveNXPT4gct7RIOjBt8msLmru0MWgeGCraYOYutMcc0NJXMowix9430MHNDldM0= ubuntu@bami-node1
/home/share/nfs 192.168.0.70(rw,no_root_squash,async) /home/share/nfs 192.168.0.170(rw,no_root_squash,async) /home/share/nfs 192.168.0.24(rw,no_root_squash,async)
YXBSVWxaWkNu
SkJXR1JhVVV0Q1owaFdhR0ZzT1VoR2RsWnNZakZRUVRGU1JUQkdlRnB5V2pKcGJtcEdNRGxhVW05
UFF6WmtVbWd4WjNFelRuUndXWFpCTkdKT2JsQUthSFpGWldGc1ExWkNaVzloVVUweE1IbFFiSFZ0
VWtjMFptUllSV05ZYVhOUWFWRmxlVXhJTlZwWlJWWlZWRGhuVWxoSlpsWTVTMFZzUkUweU0zcFpN
UXBhZEUxUE1sZHBTR3RLWkRSc1VtOWtPRlUxVFdSVFNWTkhRVXQyVjNacFZubFJTM1JJVVUxbGNX
bGtTRzlCY2s5dlIzcDRDaTB0TFMwdFJVNUVJRkpUUVNCUVVrbFdRVlJGSUV0RldTMHRMUzB0Q2c9
PQo=
180.210.80.246
volume만들면 pvc로 붙여넣기 . (statefulset)
pip 추가
pip install kafka-python
kafka 관련 pip 패키지 추가 도커파일 생성
FROM apache/airflow:2.8.1
ENV AIRFLOW_HOME=/home/airflow/.local
ENV JAVA_HOME=/usr/lib/jvm/jdk1.8.0_401
ENV SPARK_HOME=/home/airflow/.local/assembly/target/scala-2.12
ENV PATH="$PATH:${AIRFLOW_HOME}/bin:${JAVA_HOME}/bin:${SPARK_HOME}/bin"
USER root
RUN mkdir /usr/lib/jvm
COPY jdk-1.8.tar.gz /opt/airflow
RUN tar xvf jdk-1.8.tar.gz -C /usr/lib/jvm \
&& rm jdk-1.8.tar.gz
COPY requirements.txt /
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
vim \
wget \
net-tools \
dnsutils \
iputils-ping \
netcat-openbsd \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN curl -o /opt/airflow/jars/kafka-clients-3.5.1.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar
# Download Spark Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.1/spark-token-provider-kafka-0-10_2.12-3.5.1.jar
# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.1/spark-sql-kafka-0-10_2.12-3.5.1.jar
RUN curl -o /opt/airflow/jars/commons-pool2-2.11.0.jar https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar
USER airflow
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==2.8.1" -r /requirements.txt
client 배포
Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
{file_task_handler.py:560} ERROR - Could not read served logs
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 549, in _read_from_logs_server
response.raise_for_status()
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
- secreat 배포 ?
- [b] REF
https://serverfault.com/questions/947745/in-airflows-configuration-file-airflow-cfg-what-is-the-secret-key-and-do-i https://weejw.tistory.com/553 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key
- [b] REF
apiVersion: v1
kind: Secret
metadata:
name: kubeconfig-secret
namespace: airflow
type: Opaque
data:
airflow-webserver-secret-key: Sy1QYWFT
- 배포: kubectl apply -f ~
- 추가
```yaml
webserver:
env:
- name: AIRFLOW__WEBSERVER__SECRET_KEY value: airflow-webserver-secret-key
> 이거아니고 value.yaml에 값이 있음
> https://github.com/airflow-helm/charts/blob/main/charts/airflow/docs/faq/security/set-webserver-secret-key.md
# workers node select
> kubectl 쓸 줄 알아야함
master1에 배포되게 지정함
kubectl_task = KubernetesPodOperator( namespace='your_namespace', cmds=["kubectl", "kubectl apply -f /home/ubuntu/kafka/kafka-topic.yaml"], name="kubectl-task", task_id="kubectl-task", get_logs=True, dag=dag )
# 하나ㅏㄹ도 성공시켜
...
```py
from datetime import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from kubernetes.client import models as k8s
import pendulum
local_tz = pendulum.timezone("Asia/Seoul")
def apply_kubectl():
# kubectl apply 명령어 실행
kubectl_apply_command = "kubectl apply -f /path/to/your/kubernetes/resources"
subprocess.run(kubectl_apply_command, shell=True)
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': dt(2024, 1, 1, tzinfo=local_tz),
'retries': 1,
'provide_context': True,
'retry_delay': timedelta(minutes=1)
}
with DAG(
dag_id='pipeline',
default_args=default_args,
schedule_interval=None
)as dag:
deploy_kafka_topic = PythonOperator(
task_id='deploy_kafka_topic',
python_callable=apply_kubectl,
dag=dag,
)
Project Name:
Date: 2024-04-08 09:12
Tag:
Contents:
| bami-node1 | Ubuntu Server 22.04 LTS | 192.168.0.70, 180.210.80.246 | | ———- | ———————– | —————————— | | bami-node2 | Ubuntu Server 22.04 LTS | 192.168.0.107, 133.186.215.112 | | bami-node3 | Ubuntu Server 22.04 LTS | 192.168.0.24, 180.210.80.215 |
ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDQNJMvwKEfN/5Z8JzkcXDSnTuu6tzsIB2V/IkJfeVcQ3wyi4e/O6zDQ3s8uD53l7TirWCOzYIk9g2RxY8iu91VlFWbY1mk284/dSAugktRcIe+T/vlRBT8yUJ9Jm3rMjpSEYRcKN/+U3d5W9H2DXNBW7bHwYB7mxLDIebexyH+d9Ln9rPXQXEI++GGx7f4vcPW2Nqzg4NWWOHnMUWSkmer6XOB0O5+9U9wuZJCj+4vhdmJJ3OsbjUDFX6ShouY306pNbMBJlNYVnVYwVyeZiG8YwOqDX3PjFlX10HTfofX8i1fYaWB/sOsFLd1+vBAH2BiSexSOvzu5g724NC4bzBCR2sQTqkEZQQKE2/KFzNs6VebrL2y5rPyjeEAah0n0Ne2QNb5NS/G47J3rvkQtjPuWE3rHUeu5COAGs1JN5CLOdCPs4wqWFFaDX3DfFN+3zVTxveNXPT4gct7RIOjBt8msLmru0MWgeGCraYOYutMcc0NJXMowix9430MHNDldM0= ubuntu@bami-node1
/home/share/nfs 192.168.0.70(rw,no_root_squash,async) /home/share/nfs 192.168.0.170(rw,no_root_squash,async) /home/share/nfs 192.168.0.24(rw,no_root_squash,async)
YXBSVWxaWkNu
SkJXR1JhVVV0Q1owaFdhR0ZzT1VoR2RsWnNZakZRUVRGU1JUQkdlRnB5V2pKcGJtcEdNRGxhVW05
UFF6WmtVbWd4WjNFelRuUndXWFpCTkdKT2JsQUthSFpGWldGc1ExWkNaVzloVVUweE1IbFFiSFZ0
VWtjMFptUllSV05ZYVhOUWFWRmxlVXhJTlZwWlJWWlZWRGhuVWxoSlpsWTVTMFZzUkUweU0zcFpN
UXBhZEUxUE1sZHBTR3RLWkRSc1VtOWtPRlUxVFdSVFNWTkhRVXQyVjNacFZubFJTM1JJVVUxbGNX
bGtTRzlCY2s5dlIzcDRDaTB0TFMwdFJVNUVJRkpUUVNCUVVrbFdRVlJGSUV0RldTMHRMUzB0Q2c9
PQo=
180.210.80.246
volume만들면 pvc로 붙여넣기 . (statefulset)
pip 추가
pip install kafka-python
kafka 관련 pip 패키지 추가 도커파일 생성
FROM apache/airflow:2.8.1
ENV AIRFLOW_HOME=/home/airflow/.local
ENV JAVA_HOME=/usr/lib/jvm/jdk1.8.0_401
ENV SPARK_HOME=/home/airflow/.local/assembly/target/scala-2.12
ENV PATH="$PATH:${AIRFLOW_HOME}/bin:${JAVA_HOME}/bin:${SPARK_HOME}/bin"
USER root
RUN mkdir /usr/lib/jvm
COPY jdk-1.8.tar.gz /opt/airflow
RUN tar xvf jdk-1.8.tar.gz -C /usr/lib/jvm \
&& rm jdk-1.8.tar.gz
COPY requirements.txt /
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
vim \
wget \
net-tools \
dnsutils \
iputils-ping \
netcat-openbsd \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN curl -o /opt/airflow/jars/kafka-clients-3.5.1.jar https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar
# Download Spark Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.5.1/spark-token-provider-kafka-0-10_2.12-3.5.1.jar
# Download Spark SQL Kafka connector JAR
RUN curl -o /opt/airflow/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.5.1/spark-sql-kafka-0-10_2.12-3.5.1.jar
RUN curl -o /opt/airflow/jars/commons-pool2-2.11.0.jar https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.0/commons-pool2-2.11.0.jar
USER airflow
COPY requirements.txt /
RUN pip install --no-cache-dir "apache-airflow==2.8.1" -r /requirements.txt
client 배포
Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
{file_task_handler.py:560} ERROR - Could not read served logs
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 549, in _read_from_logs_server
response.raise_for_status()
File "/home/airflow/.local/lib/python3.10/site-packages/httpx/_models.py", line 749, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '403 FORBIDDEN' for url 'http://airflow-scheduler-0.airflow-scheduler.airflow.svc.cluster.local:8793/log/dag_id=pipeline/run_id=manual__2024-04-08T06:01:38.569688+00:00/task_id=deploy_kafka_client/attempt=1.log'
- secreat 배포 ?
- [b] REF
https://serverfault.com/questions/947745/in-airflows-configuration-file-airflow-cfg-what-is-the-secret-key-and-do-i https://weejw.tistory.com/553 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key
- [b] REF
apiVersion: v1
kind: Secret
metadata:
name: kubeconfig-secret
namespace: airflow
type: Opaque
data:
airflow-webserver-secret-key: Sy1QYWFT
- 배포: kubectl apply -f ~
- 추가
```yaml
webserver:
env:
- name: AIRFLOW__WEBSERVER__SECRET_KEY value: airflow-webserver-secret-key
> 이거아니고 value.yaml에 값이 있음
> https://github.com/airflow-helm/charts/blob/main/charts/airflow/docs/faq/security/set-webserver-secret-key.md
# workers node select
> kubectl 쓸 줄 알아야함
master1에 배포되게 지정함
kubectl_task = KubernetesPodOperator( namespace='your_namespace', cmds=["kubectl", "kubectl apply -f /home/ubuntu/kafka/kafka-topic.yaml"], name="kubectl-task", task_id="kubectl-task", get_logs=True, dag=dag )
# 하나ㅏㄹ도 성공시켜
...
```py
from datetime import datetime as dt
from datetime import timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from kubernetes.client import models as k8s
import pendulum
local_tz = pendulum.timezone("Asia/Seoul")
def apply_kubectl():
# kubectl apply 명령어 실행
kubectl_apply_command = "kubectl apply -f /path/to/your/kubernetes/resources"
subprocess.run(kubectl_apply_command, shell=True)
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': dt(2024, 1, 1, tzinfo=local_tz),
'retries': 1,
'provide_context': True,
'retry_delay': timedelta(minutes=1)
}
with DAG(
dag_id='pipeline',
default_args=default_args,
schedule_interval=None
)as dag:
deploy_kafka_topic = PythonOperator(
task_id='deploy_kafka_topic',
python_callable=apply_kubectl,
dag=dag,
)
About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.