imok
article thumbnail
728x90

❄️ Snowflake summit 2023에서 들은 HOL 세션 중, 
Getting Started with Snowpipe Streaming and Amazon MSK을 진행하면서 작업한 내용을 정리하고자 합니다.

Agenda

  • Hands-On Lab: Getting Started with Snowpipe Streaming and Amazon MSK
  • Tue, 6/27, 11:30 AM - 1:30 PM PT
  • James Sun, Partner Sales Engineer, Snowflake
  • Bosco Albuquerque, Partner Solutions Architect, AWS

Getting Started with Snowpipe Streaming and Amazon MSK

HOL 과정
1. Deploy a managed Kafka Cluster in VPC
2. Use Kafka producer and consumer to build a streaming pipeline
3. Integrate Snowpipe Streaming SDK with Kafka consumer apps
4. Populate Snowflake table with time-series data

Snowpipe Streaming and Amazon MSK

스트리밍 데이터가 스마트폰, IoT, Web log와 같이 다양한 소스에서 들어오면, 기업은 이런 데이터를 신속하게 수집하고 다른 관련 비즈니스 데이터와 결합해 인사이트를 얻을 수 있어야 합니다.

이번 글에서는 Amazon MSKSnowpipe Streaming을 사용하여 스트리밍 데이터를 수집해, Snowflake에 저장하고 기업이 최종 사용자의 경험과 피드백에 대해 거의 실시간으로 인사이트를 얻을 수 있는 방법을 살펴보려고 합니다.

 

아래는 워크로드 별 AWS Service와 Snowflake 서비스 사용 예시입니다.

  • Data Engineering : Amazon MSK, Kinesis, Amazon EMR, AWS Glue,...
  • Data Lake : Amazon S3,...
  • Data Science : Amazon Forcast, Amazon Personalize, Amazon Sagemaker,... 

 

Amazon MSK

Amazon MSKApache Kafka를 사용하여 스트리밍 데이터를 처리하는 애플리케이션을 쉽게 구축하고 실행할 수 있게 해주는 완전관리형 서비스입니다. Apache Kafka는 실시간 스트리밍 데이터 파이프라인 및 애플리케이션을 구축하기 위한 오픈 소스 플랫폼입니다.

Amazon MSK 장점
1. 무제한으로 확장되는 Amazon MSK의 저비용 스토리지 계층
2. Kafka topic에서 데이터를 더 오래 유지할 수 있음
3. 기존 스트림 처리 코드를 사용해 이전 데이터를 정확한 프로덕션 순서로 재처리
4. broker과 storage 간 데이터가 인터넷을 통하지 않고, VPC 내에서 이동함
5. secondary storage의 데이터를 broker 간에 복제할 필요가 없어서 더 빠른 partition rebalancing을 함
6. 하루 이상이나, 1-2TB 이상의 데이터를 보유할 수 있는 최고의 애플리케이션

Snowpipe Streaming

Snowpipe Streaming을 사용하면 낮은 한 자릿수 초의 latency를 갖는 데이터 파이프라인을 구축할 수 있고, first-party tool을 사용하여 모든 소스의 데이터를 통합할 수 있습니다.

  • 빠른 데이터 수집 > 빠른 파이프라인 > 빠른 인사이트 확보
  • 낮은 비용, 낮은 대기 시간으로 데이터 흐름 처리

 

Snowpipe : File & Streaming

  • Snowpipe : 스테이지에서 파일을 사용할 수 있는 즉시 파일에서 데이터를 로드할 수 있습니다.
  • Snowpipe Streaming : Snowpipe Streaming API를 호출하면 Snowflake Ingest SDK와 애플리케이션 코드를 사용하여 짧은 지연 시간으로 스트리밍 데이터 행을 로딩할 수 있습니다. 스테이징 된 파일에서 데이터를 쓰는 Snowpipe 또는 대량 데이터 로드와는 달리, 스트리밍 수집 API는 Snowflake 테이블에 데이터 행을 씁니다.

Snowpipe와 Snowpipe Streaming의 비교

 

 

Amazon MSK를 사용한 Snowpipe  Streaming 아키텍쳐

 

Snowpipe Streaming의 Best Practices

 


Overview

Opensky Network의 샌프란시스코 지역에 대한 실시간 상용 비행 데이터를 사용하여 Snowflake의 Snowpipe Streaming과 Amazon MSK를 사용하는 솔루션을 구축해보고자 합니다.

1.  MSK Cluster와 EC2 인스턴스는 AWS VPC의 Privata Subnet에서 프로비저닝 됩니다.

2. EC2(점프 호스트)는 Kafka Connector를 통해 Kafka Producer와 Snowpipe Streaming을 호스팅 합니다.

3. Kafka Producer는 Rest API를 호출하고 시계열 데이터를 json 형태로 수신합니다.

4. 데이터는 Kafka Connector에서 선택되어 Snowflake 테이블로 전달되기 전에 Kafka Cluster로 수집됩니다.

5. Snowflake 테이블의 데이터는 Amazon Managed GrafanaStreamlit을 사용해 실시간으로 시각화할 수 있고, 과거 데이터는 Amazon Quicksight와 같은 BI 도구로 분석할 수 있습니다.


Create a provisioned Kafka cluster and a Linux jumphost in AWS

MSK Cluster 및 EC2 인스턴스 생성

AWS Cloudformation에서 아래 S3 URL 주소로 Stack을 생성합니다. (이 링크를 클릭해도 됩니다.)

기본 환경은 us-west-2 (Oregon)에서 실행됩니다.

https://snowflake-corp-se-workshop.s3.us-west-1.amazonaws.com/VHOL_Snowflake_Snowpipe_Streaming_MSK/msk-CFT-for-SE-Sandbox.json

Subnet1, Subnet2를 각각 선택하고, MSK SG는 기본으로 선택합니다.

Cloudformation Stack의 배포가 완료되면, 2개의 broker가 있는 MSK Cluster와 선택한 Subnet에서 EC2 인스턴스를 프로비저닝 합니다. (약 20~30분 소요)

 

Session Manager Linux 세션 구성

MSK Cluster와 상호 작용하기 위해 EC2 인스턴스에 연결합니다.

AWS System Manager 서비스에 접속해 Session Manager > Configure Preferences 클릭

  • Idle session timeout : 60

  • Linux Shell profile : /bin/bash

Linux EC2 인스턴스 콘솔에 연결

Start session 클릭

jumphost라고 되어있는 instance를 선택하고 Start session 클릭

 

Snowflake 인증에 사용할 키 쌍 생성

아래 명령어로 AWS Session Manager 콘솔에서 키 페어를 생성합니다. 

cd $HOME
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8

아래 명령어로 공개 키를 생성합니다.

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

Snowflake에서 사용할 수 있는 형식으로 공개 키 문자열을 인쇄합니다.

grep -v KEY rsa_key.pub | tr -d '\n' | awk '{print $1}' > pub.Key
cat pub.Key

 

Snowpipe 스트리밍용 Kafka 커넥터 설치

아래 명령어로 Kafka Connector 및 Snowflake Streaming SDK를 설치합니다.

passwd=changeit  # Use the default password for the Java keystore, you should chang it after finishing the lab
directory=/home/ssm-user/snowpipe-streaming # Installation directory

cd $HOME
mkdir -p $directory
cd $directory
pwd=`pwd`
sudo yum clean all
sudo yum -y install openssl vim-common java-1.8.0-openjdk-devel.x86_64 gzip tar jq python3-pip
wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar xvfz kafka_2.12-2.8.1.tgz -C $pwd
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar -O $pwd/kafka_2.12-2.8.1/libs/aws-msk-iam-auth-1.1.1-all.jar
rm -rf $pwd/kafka_2.12-2.8.1.tgz
cd /tmp && cp /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts kafka.client.truststore.jks
cd /tmp && keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass $passwd -keypass $passwd -dname "CN=snowflake.com" -alias snowflake -storetype pkcs12

#Snowflake kafka connector
wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.9.1/snowflake-kafka-connector-1.9.1.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-kafka-connector-1.9.1.jar

#Snowpipe streaming SDK
wget https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/1.1.0/snowflake-ingest-sdk-1.1.0.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-ingest-sdk-1.1.0.jar
wget https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.15/snowflake-jdbc-3.13.15.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-jdbc-3.13.15.jar
wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar -O $pwd/kafka_2.12-2.8.1/libs/bc-fips-1.0.1.jar
wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar -O $pwd/kafka_2.12-2.8.1/libs/bcpkix-fips-1.0.3.jar

 

MSK Cluster에서 broker 문자열을 검색

MSKCluster 클릭 > View client information 클릭 > Private endpoint 복사

session 창에서 복사한 값으로 아래 명령어 실행 > BS 환경 변수 추가

export BS=<broker string>
echo "export BS=$BS" >> ~/.bashrc

 

Kafka 커넥터용 properties 파일 생성

클라이언트가 Kafka Cluster로 인증할 수 있도록 connect-standalone.properties 디렉터리에 구성 파일을 생성합니다.

dir=/home/ssm-user/snowpipe-streaming/scripts
mkdir -p $dir && cd $dir
cat << EOF > $dir/connect-standalone.properties
#************CREATING SNOWFLAKE Connector****************
bootstrap.servers=$BS

#************SNOWFLAKE VALUE CONVERSION****************
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#************SNOWFLAKE ****************

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

#*********** FOR SSL  ****************
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks
ssl.truststore.password=changeit
ssl.enabled.protocols=TLSv1.1,TLSv1.2

consumer.security.protocol=SSL
consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks
consumer.ssl.truststore.password=changeit
consumer.ssl.enabled.protocols=TLSv1.1,TLSv1.2
EOF

 

producer를 위해 client.properties 보안 구성 파일 생성

 

아래 명령어를 실행해 MSK Cluster에 대한 client.properties라는 이름의 보안 구성 파일을 생성합니다.

dir=/home/ssm-user/snowpipe-streaming/scripts
cat << EOF > $dir/client.properties
security.protocol=SSL
ssl.truststore.location=/tmp/kafka.client.truststore.jks
ssl.truststore.password=changeit
ssl.enabled.protocols=TLSv1.1,TLSv1.2
EOF

 

MSK Cluster에서 "streaming"이라는 topic 생성

 

아래 명령어로 Kafka topic 생성

$HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server $BS --command-config $HOME/snowpipe-streaming/scripts/client.properties --create --topic streaming --partitions 2 --replication-factor 2

 

아래 명령어로 topic의 describe를 확인합니다.

$HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server $BS --command-config $HOME/snowpipe-streaming/scripts/client.properties --describe --topic streaming

replicationFactor가 2개인 두 개의 partition을 확인할 수 있습니다.

 


 Prepare the Snowflake cluster for streaming

User, Role, DB 생성

 

ACCOUNTADMIN 역할을 가진 User로 Snowflake 계정에 로그인합니다.

아래 SQL 명령어를 worksheet에 입력합니다.

-- Set default value for multiple variables
-- For purpose of this workshop, it is recommended to use these defaults during the exercise to avoid errors
-- You should change them after the workshop
SET PWD = 'Test1234567';
SET USER = 'STREAMING_USER';
SET DB = 'MSK_STREAMING_DB';
SET WH = 'MSK_STREAMING_WH';
SET ROLE = 'MSK_STREAMING_RL';

USE ROLE ACCOUNTADMIN;

-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD  COMMENT='STREAMING USER';

-- CREATE ROLES
CREATE OR REPLACE ROLE IDENTIFIER($ROLE);

-- CREATE DATABASE AND WAREHOUSE
CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB);
USE IDENTIFIER($DB);
CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL';

-- GRANTS
GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE);
GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER);
GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE);
GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE);

-- SET DEFAULTS
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE;
ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;


-- RUN FOLLOWING COMMANDS TO FIND YOUR ACCOUNT IDENTIFIER, COPY IT DOWN FOR USE LATER
-- IT WILL BE SOMETHING LIKE <organization_name>-<account_name>
-- e.g. ykmxgak-wyb52636

WITH HOSTLIST AS 
(SELECT * FROM TABLE(FLATTEN(INPUT => PARSE_JSON(SYSTEM$allowlist()))))
SELECT REPLACE(VALUE:host,'.snowflakecomputing.com','') AS ACCOUNT_IDENTIFIER
FROM HOSTLIST
WHERE VALUE:type = 'SNOWFLAKE_DEPLOYMENT_REGIONLESS';

 

SQL문을 모두 실행하고, 마지막 SQL 결과는 따로 저장해 놓습니다.

다음은 Snowflake에 액세스 할 수 있도록 공개 키를 구성합니다.

위 단계에서 pub.key에 대한 내용을 복사해 아래 명령어를 worksheet에서 실행합니다.

use role accountadmin;
alter user streaming_user set rsa_public_key='<pubKey>';

 

Snowflake에서 로그아웃하고, 위의 SQL에서 세팅한 비밀번호를 사용해 streaming_user로 다시 로그인합니다.

worksheet에서 아래 SQL 명령어를 실행해 DB와 Schema를 생성합니다.

 

SnowSQL 설치

SnowSQL은 Snowflake에 연결하여 SQL 쿼리 실행, 테이블에 데이터 로드 및 언로드를 포함한 모든 DDL 및 DML 작업을 수행하기 위한 command line client입니다.

 

Session Manager 콘솔에서 아래 명령어를 실행합니다.

curl https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.24-linux_x86_64.bash -o /tmp/snowsql-1.2.24-linux_x86_64.bash
echo -e "~/bin \n y" > /tmp/ans
bash /tmp/snowsql-1.2.24-linux_x86_64.bash < /tmp/ans

아래 명령어로 Snowflake Private Key Phrase에 대한 환경 변수를 설정합니다.

  • 여기서 SNOWSQL_PRIVATE_KEY_PASSPHRASE는 전 단계에서 openssl 명령어로 생성한 rsa_key.p8에 대한 password입니다. 
export SNOWSQL_PRIVATE_KEY_PASSPHRASE= <key phrase you set up when running openssl previously>

~/.bashrc 파일에 위의 명령을 추가합니다.

  • 여기서 Snowflake Account Identifier는 Snowflake worksheet에서 실행한 SQL의 결과물 중 Account Identifier를 복사해서 입력합니다.

$HOME/bin/snowsql -a <Snowflake Account Identifier> -u streaming_user --private-key-path $HOME/rsa_key.p8 -d msk_streaming_db -s msk_streaming_schema

이 명령어로 Snowflake와 상호작용 할 수 있습니다.(빠져나오는 명령어 : ctrl + D)


Configure Kafka connector for Snowpipe Streaming

Kafka Connector에 대한 다양한 매개변수 수집

cd $HOME
outf=/tmp/params
cat << EOF > /tmp/get_params
a=''
until [ ! -z \$a ]
do
 read -p "Input Snowflake account identifier: e.g. ylmxgak-wyb53646 ==> " a
done

echo export clstr_url=\$a.snowflakecomputing.com > $outf
export clstr_url=\$a.snowflakecomputing.com

read -p "Snowflake cluster user name: default: streaming_user ==> " user
if [[ \$user == "" ]]
then
   user="streaming_user"
fi

echo export user=\$user >> $outf
export user=\$user

pass=''
until [ ! -z \$pass ]
do
  read -p "Private key passphrase ==> " pass
done

echo export key_pass=\$pass >> $outf
export key_pass=\$pass

read -p "Full path to your Snowflake private key file, default: /home/ssm-user/rsa_key.p8 ==> " p8
if [[ \$p8 == "" ]]
then
   p8="/home/ssm-user/rsa_key.p8"
fi

priv_key=\`cat \$p8 | grep -v PRIVATE | tr -d '\n'\`
echo export priv_key=\$priv_key  >> $outf
export priv_key=\$priv_key
cat $outf >> $HOME/.bashrc
EOF
. /tmp/get_params

Snowflake Kafka 연결 속성 구성 파일 생성

dir=/home/ssm-user/snowpipe-streaming/scripts
cat << EOF > $dir/snowflakeconnectorMSK.properties
name=snowpipeStreaming
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=4
topics=streaming
snowflake.private.key.passphrase=$key_pass
snowflake.database.name=MSK_STREAMING_DB
snowflake.schema.name=MSK_STREAMING_SCHEMA
snowflake.topic2table.map=streaming:MSK_STREAMING_TBL
buffer.count.records=10000
buffer.flush.time=5
buffer.size.bytes=20000000
snowflake.url.name=$clstr_url
snowflake.user.name=$user
snowflake.private.key=$priv_key
snowflake.role.name=MSK_STREAMING_RL
snowflake.ingestion.method=snowpipe_streaming
value.converter.schemas.enable=false
jmx=true
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
errors.tolerance=all
EOF

Putting it all together

데이터를 Snowflake 테이블로 수집할 준비가 끝났습니다.

 

Snowpipe Streaming 용 Kafka Connector 시작

아래 명령을 실행하여 Kafka Connector를 시작합니다.

$HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/connect-standalone.sh $HOME/snowpipe-streaming/scripts/connect-standalone.properties $HOME/snowpipe-streaming/scripts/snowflakeconnectorMSK.properties

아래 화면과 같은 내용이 표시되면 성공입니다.

 

 

실시간 데이터를 MSK Cluster에 수집할 producer 시작

새 system manager 콘솔창을 켜고, 아래 명령어를 입력합니다.

curl --connect-timeout 5 http://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky | $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list $BS --producer.config $HOME/snowpipe-streaming/scripts/client.properties --topic streaming

위 스크립트에서는 샌프란시스코 베이 지역에 대한 실시간 비행 데이터를 JSON 형식으로 제공하는 Rest API를 쿼리 합니다.

데이터에는 위도, 경도, 항공기 고도, 목적지 공항, 항공편 ID, timestamp, icao 번호와 같은 정보가 포함됩니다.

데이터는 Streaming MSK Cluster의 topic으로 수집되고, Snowpipe Streaming Kafka Connector에서 선택되어 Snowflake 테이블로 전달됩니다.


Query ingested data in Snowflake

 

Raw data 쿼리

데이터가 Snowflake로 스트리밍 되었는지 아래 SQL 명령어로 확인합니다.

use msk_streaming_db;
use schema msk_streaming_schema;
show channels in table msk_streaming_tbl;

두 파티션에 해당하는 두 개의 채널이 있음을 확인합니다.

테이블에서 아래 쿼리를 실행합니다.

select * from msk_streaming_tbl;

아래 캡처처럼 RECORD_METADATA, RECORD_CONTENT 두 개의 열이 있어야 합니다.

 

Raw JSON 데이터 Flatten

아래 SQL 명령을 실행해 Raw JSON 데이터를 평면화하고, key를 기반으로 여러 열이 있는 view를 만듭니다.

create or replace view flights_vw
  as select
    f.value:utc::timestamp_ntz ts_utc,
    CONVERT_TIMEZONE('UTC','America/Los_Angeles',ts_utc::timestamp_ntz) as ts_pt,
    f.value:alt::integer alt,
    f.value:dest::string dest,
    f.value:orig::string orig,
    f.value:id::string id,
    f.value:icao::string icao,
    f.value:lat::float lat,
    f.value:lon::float lon,
    st_geohash(to_geography(ST_MAKEPOINT(lon, lat)),12) geohash,
    year(ts_pt) yr,
    month(ts_pt) mo,
    day(ts_pt) dd,
    hour(ts_pt) hr
FROM   msk_streaming_tbl,
       Table(Flatten(msk_streaming_tbl.record_content)) f;
  • view 생성
  • timestamp 다른 시간대로 변경
  • Snowflake의 ST_GEOHASH 함수를 사용해서 Grafana와 같은 시계열 시각화 도구에서 사용할 수 있는 geohash 생성

 

실시간 데이터를 Snowflake에 지속적으로 Streaming

 

아래 스크립트를 실행해서 데이터 원본의 빈도를 자유롭게 조정해 봅니다.

while true
do
  curl --connect-timeout 5 http://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky | $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list $BS --producer.config $HOME/snowpipe-streaming/scripts/client.properties --topic streaming
  sleep 10
done

Snowflake worksheet에서 아래 쿼리를 10초마다 실행하면 실제로 행수가 증가하는 것을 볼 수 있습니다.

 


Cleanup

Cloudformation 콘솔에서 Stack 삭제.

Snowflake worksheet에서 아래 명령어 실행.

USE ROLE ACCOUNTADMIN;

DROP DATABASE MSK_STREAMING_DB;
DROP WAREHOUSE MSK_STREAMING_WH;
DROP ROLE MSK_STREAMING_RL;

-- Drop the streaming user
DROP USER IF EXISTS STREAMING_USER;

마치며

Snowflake Summit 2023에 여러 가지 유익한 Snowflake 기능을 알아볼 수 있는 HOL 세션이 있었지만, 저는 AWS Service와 Snowflake를 함께 활용할 수 있는 세션이 제일 흥미가 생겼습니다.

 

Snowflake는 Kinesis, Amazon MSK, AWS Glue 같은 Data Engineering Service 나 Amazon Forcast, Amazon Personalize, Amazon Sagemaker와 같은 ML Service와도 주로 사용됩니다.

 

Snowflake에 쌓인 데이터는 Amazon Managed Grafana 나 Amazon Quicksight와도 연동하여 실시간 시각화와 과거 데이터 시각화도 가능합니다. 아래와 같은 AWS Blog 내용을 따라 하면 쉽게 연동해서 쓸 수 있습니다.

 

 

 

728x90
profile

imok

@imok2

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!