Install Kafka&Airflow by Docker
Table of Contents:
Contents:
Airflow: Docker 설치
library 설치 할 때에는 컨테이너를 접속해서 설치 해 주어야 한다.
airflow 설치
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'
- 환경 설정
mkdir -p ./dags ./logs ./plugins ./config echo -e "AIRFLOW_UID=$(id -u)" > .env
AIRFLOW_UID=50000
- 데이터베이스 초기화
sudo docker compose up airflow-init
- output ```bash airflow-init-1 | [2024-02-05T00:52:45.573+0000] {override.py:1820} INFO - Added Permission menu access on Permission Pairs to role Admin airflow-init-1 | [2024-02-05T00:52:46.675+0000] {override.py:1458} INFO - Added user airflow airflow-init-1 | User "airflow" created with role "Admin" airflow-init-1 | 2.8.1 airflow-init-1 exited with code 0
- Airflow 실행
```bash
sudo docker compose up
ID : airflow PW: airflow
CLI 설치
- 설치
sudo docker compose run airflow-worker airflow info
- 추가 스크립트
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/airflow.sh' chmod +x airflow.sh
- 사용
sudo ./airflow.sh info sudo ./airflow.sh bash sudo ./airflow.sh python
삭제
-
파일을 다운로드한 디렉터리에서 명령을 실행하세요 .
docker compose down --volumes --remove-orphans
docker-compose.yaml
-
docker-compose.yaml
파일을 다운로드한 전체 디렉터리를 제거합니다.rm -rf '<DIRECTORY>'
-
docker-compose.yaml
파일 을 다시 다운로드하여 처음부터 이 가이드를 실행해 보세요
Kakfa: Docker 설치
- docker-compose.yaml
version: '3'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
networks:
- kafka-network
- airflow-kafka
kafka1:
image: confluentinc/cp-kafka:7.3.2
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- airflow-kafka
kafka2:
image: confluentinc/cp-kafka:7.3.2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- airflow-kafka
kafka3:
image: confluentinc/cp-kafka:7.3.2
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- airflow-kafka
kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: compose-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: compose-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: compose-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components'
networks:
- kafka-network
- airflow-kafka
schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- kafka-network
- airflow-kafka
kafka-ui:
container_name: kafka-ui-1
image: provectuslabs/kafka-ui:latest
ports:
- 8888:8080 # Changed to avoid port clash with akhq
depends_on:
- kafka1
- kafka2
- kafka3
- schema-registry
- kafka-connect
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka1:19092,PLAINTEXT_HOST://kafka1:19092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
DYNAMIC_CONFIG_ENABLED: 'true'
networks:
- kafka-network
- airflow-kafka
❌ : confluentinc 사용 안 하게 됨
kafka-network
- [b] REF
kafka install by Docker
- [b] REF
git clone https://github.com/wurstmeister/kafka-docker.git
vim docker-compose-single-broker.yml
- docker-compose-single-broker.yml ```yaml version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: build: . ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 #KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://133.186.217.113:9092 #통신을 위 volumes: - /var/run/docker.sock:/var/run/docker.sock
- 컨테이너 접속
```bash
docker exec -ti {CONTAINER_NUM} /bin/bash
- advertised Listeners 확인 ```bash cd /opt/kafka/config vi server.properties
127.0.0.1 이어야 함
- Topic list 확인
```bash
kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
- topic 생성
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic msg_test
- 메세지 생성
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic msg_test
- 메세지 받기
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic msg_test --from-beginning
## Python 예제
- producer.py
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0,
compression_type="gzip",
bootstrap_servers=['133.186.217.113:9092'],
api_version=(2, 8, 1), # Set this to match your Kafka broker version
value_serializer=lambda x: dumps(x).encode("utf-8")
)
start = time.time()
for i in range(10):
data = {'str' : 'result'+str(i)}
producer.send('msg_test', value=data)
producer.flush()
print('[Done]:', time.time() - start)
- consumer.py
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'msg_test', # 토픽명
bootstrap_servers=['133.186.217.113:9092'], # 카프카 브로커 주소 리스트
auto_offset_reset='earliest', # 오프셋 위치(earliest:가장 처음, latest: 가장 최근)
enable_auto_commit=True, # 오프셋 자동 커밋 여부
group_id='test-consumer-group', # 컨슈머 그룹 식별자
value_deserializer=lambda x: loads(x.decode('utf-8')), # 메시지의 값 역직렬화
consumer_timeout_ms=1000 # 데이터를 기다리는 최대 시간
)
print('[Start] get consumer')
for message in consumer:
print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
print('[End] get consumer')
2023-02-06 할일
- [b] https://github.com/wurstmeister/kafka-docker/blob/master/README.md
-
Listener Configuration
It may be useful to have the Kafka Documentation open, to understand the various broker listener configuration options.
Since 0.9.0, Kafka has supported multiple listener configurations for brokers to help support different protocols and discriminate between internal and external traffic. Later versions of Kafka have deprecated advertised.host.name
and advertised.port
.
NOTE: advertised.host.name
and advertised.port
still work as expected, but should not be used if configuring the listeners.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
- "19092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_MESSAGE_MAX_BYTES: 2000000
KAFKA_CREATE_TOPICS: "kafkatopic:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://133.186.217.113:19092
volumes:
- /var/run/docker.sock:/var/run/docker.sock
HOSTNAME_COMMAND: curl http://169.254.169.254/latest/meta-data/public-hostname
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
daa-kafka1:
image: wurstmeister/kafka:2.13-2.8.1
hostname: daa-kafka1
container_name: daa-kafka1
ports:
- "9092:9092"
- "19092"
volumes:
- ../kafka/logs/1:/kafka/logs
networks:
- daa-kafka-cluster-network
environment:
KAFKA_BROKER_ID: 1
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 536870912
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 536870912
KAFKA_LISTENERS: INSIDE://daa-kafka1:19092,OUTSIDE://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INSIDE://daa-kafka1:19092,OUTSIDE://127.0.0.1:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: "daa-zoo1:2181,daa-zoo2:2182,daa-zoo3:2183"
KAFKA_LOG_DIRS: "/kafka/logs"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=daa-kafka1 -Dcom.sun.management.jmxremote.rmi.port=9992"
JMX_PORT: 9992
depends_on:
- daa-zoo1
- daa-zoo2
- daa-zoo3
Contents:
Airflow: Docker 설치
library 설치 할 때에는 컨테이너를 접속해서 설치 해 주어야 한다.
airflow 설치
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/docker-compose.yaml'
- 환경 설정
mkdir -p ./dags ./logs ./plugins ./config echo -e "AIRFLOW_UID=$(id -u)" > .env
AIRFLOW_UID=50000
- 데이터베이스 초기화
sudo docker compose up airflow-init
- output ```bash airflow-init-1 | [2024-02-05T00:52:45.573+0000] {override.py:1820} INFO - Added Permission menu access on Permission Pairs to role Admin airflow-init-1 | [2024-02-05T00:52:46.675+0000] {override.py:1458} INFO - Added user airflow airflow-init-1 | User "airflow" created with role "Admin" airflow-init-1 | 2.8.1 airflow-init-1 exited with code 0
- Airflow 실행
```bash
sudo docker compose up
ID : airflow PW: airflow
CLI 설치
- 설치
sudo docker compose run airflow-worker airflow info
- 추가 스크립트
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.8.1/airflow.sh' chmod +x airflow.sh
- 사용
sudo ./airflow.sh info sudo ./airflow.sh bash sudo ./airflow.sh python
삭제
-
파일을 다운로드한 디렉터리에서 명령을 실행하세요 .
docker compose down --volumes --remove-orphans
docker-compose.yaml
-
docker-compose.yaml
파일을 다운로드한 전체 디렉터리를 제거합니다.rm -rf '<DIRECTORY>'
-
docker-compose.yaml
파일 을 다시 다운로드하여 처음부터 이 가이드를 실행해 보세요
Kakfa: Docker 설치
- docker-compose.yaml
version: '3'
services:
zoo1:
image: confluentinc/cp-zookeeper:7.3.2
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_SERVERS: zoo1:2888:3888
networks:
- kafka-network
- airflow-kafka
kafka1:
image: confluentinc/cp-kafka:7.3.2
ports:
- "9092:9092"
- "29092:29092"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- airflow-kafka
kafka2:
image: confluentinc/cp-kafka:7.3.2
ports:
- "9093:9093"
- "29093:29093"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- airflow-kafka
kafka3:
image: confluentinc/cp-kafka:7.3.2
ports:
- "9094:9094"
- "29094:29094"
environment:
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
networks:
- kafka-network
- airflow-kafka
kafka-connect:
image: confluentinc/cp-kafka-connect:7.3.2
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: compose-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: compose-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: compose-connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components'
networks:
- kafka-network
- airflow-kafka
schema-registry:
image: confluentinc/cp-schema-registry:7.3.2
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- kafka-network
- airflow-kafka
kafka-ui:
container_name: kafka-ui-1
image: provectuslabs/kafka-ui:latest
ports:
- 8888:8080 # Changed to avoid port clash with akhq
depends_on:
- kafka1
- kafka2
- kafka3
- schema-registry
- kafka-connect
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka1:19092,PLAINTEXT_HOST://kafka1:19092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect:8083
DYNAMIC_CONFIG_ENABLED: 'true'
networks:
- kafka-network
- airflow-kafka
❌ : confluentinc 사용 안 하게 됨
kafka-network
- [b] REF
kafka install by Docker
- [b] REF
git clone https://github.com/wurstmeister/kafka-docker.git
vim docker-compose-single-broker.yml
- docker-compose-single-broker.yml ```yaml version: '2' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: build: . ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 #KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_CREATE_TOPICS: "test:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://133.186.217.113:9092 #통신을 위 volumes: - /var/run/docker.sock:/var/run/docker.sock
- 컨테이너 접속
```bash
docker exec -ti {CONTAINER_NUM} /bin/bash
- advertised Listeners 확인 ```bash cd /opt/kafka/config vi server.properties
127.0.0.1 이어야 함
- Topic list 확인
```bash
kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092
- topic 생성
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic msg_test
- 메세지 생성
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic msg_test
- 메세지 받기
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic msg_test --from-beginning
## Python 예제
- producer.py
from kafka import KafkaProducer
from json import dumps
import time
producer = KafkaProducer(
acks=0,
compression_type="gzip",
bootstrap_servers=['133.186.217.113:9092'],
api_version=(2, 8, 1), # Set this to match your Kafka broker version
value_serializer=lambda x: dumps(x).encode("utf-8")
)
start = time.time()
for i in range(10):
data = {'str' : 'result'+str(i)}
producer.send('msg_test', value=data)
producer.flush()
print('[Done]:', time.time() - start)
- consumer.py
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer(
'msg_test', # 토픽명
bootstrap_servers=['133.186.217.113:9092'], # 카프카 브로커 주소 리스트
auto_offset_reset='earliest', # 오프셋 위치(earliest:가장 처음, latest: 가장 최근)
enable_auto_commit=True, # 오프셋 자동 커밋 여부
group_id='test-consumer-group', # 컨슈머 그룹 식별자
value_deserializer=lambda x: loads(x.decode('utf-8')), # 메시지의 값 역직렬화
consumer_timeout_ms=1000 # 데이터를 기다리는 최대 시간
)
print('[Start] get consumer')
for message in consumer:
print(f'Topic : {message.topic}, Partition : {message.partition}, Offset : {message.offset}, Key : {message.key}, value : {message.value}')
print('[End] get consumer')
2023-02-06 할일
- [b] https://github.com/wurstmeister/kafka-docker/blob/master/README.md
-
Listener Configuration
It may be useful to have the Kafka Documentation open, to understand the various broker listener configuration options.
Since 0.9.0, Kafka has supported multiple listener configurations for brokers to help support different protocols and discriminate between internal and external traffic. Later versions of Kafka have deprecated advertised.host.name
and advertised.port
.
NOTE: advertised.host.name
and advertised.port
still work as expected, but should not be used if configuring the listeners.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
ports:
- "9092:9092"
- "19092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_MESSAGE_MAX_BYTES: 2000000
KAFKA_CREATE_TOPICS: "kafkatopic:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://133.186.217.113:19092
volumes:
- /var/run/docker.sock:/var/run/docker.sock
HOSTNAME_COMMAND: curl http://169.254.169.254/latest/meta-data/public-hostname
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
daa-kafka1:
image: wurstmeister/kafka:2.13-2.8.1
hostname: daa-kafka1
container_name: daa-kafka1
ports:
- "9092:9092"
- "19092"
volumes:
- ../kafka/logs/1:/kafka/logs
networks:
- daa-kafka-cluster-network
environment:
KAFKA_BROKER_ID: 1
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 536870912
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 536870912
KAFKA_LISTENERS: INSIDE://daa-kafka1:19092,OUTSIDE://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INSIDE://daa-kafka1:19092,OUTSIDE://127.0.0.1:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: "daa-zoo1:2181,daa-zoo2:2182,daa-zoo3:2183"
KAFKA_LOG_DIRS: "/kafka/logs"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=daa-kafka1 -Dcom.sun.management.jmxremote.rmi.port=9992"
JMX_PORT: 9992
depends_on:
- daa-zoo1
- daa-zoo2
- daa-zoo3
About Hallo. 안녕하세요! 정승혜 입니다. 개발 일지 뿐만 아니라 나의 관심 있는 모든 것을 담을거예요.